diff options
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r-- | src/backend/replication/syncrep.c | 289 |
1 files changed, 243 insertions, 46 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ce2009882d9..9143c47f92d 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -30,23 +30,34 @@ * searching the through all waiters each time we receive a reply. * * In 9.5 or before only a single standby could be considered as - * synchronous. In 9.6 we support multiple synchronous standbys. - * The number of synchronous standbys that transactions must wait for - * replies from is specified in synchronous_standby_names. - * This parameter also specifies a list of standby names, - * which determines the priority of each standby for being chosen as - * a synchronous standby. The standbys whose names appear earlier - * in the list are given higher priority and will be considered as - * synchronous. Other standby servers appearing later in this list - * represent potential synchronous standbys. If any of the current - * synchronous standbys disconnects for whatever reason, it will be - * replaced immediately with the next-highest-priority standby. + * synchronous. In 9.6 we support a priority-based multiple synchronous + * standbys. In 10.0 a quorum-based multiple synchronous standbys is also + * supported. The number of synchronous standbys that transactions + * must wait for replies from is specified in synchronous_standby_names. + * This parameter also specifies a list of standby names and the method + * (FIRST and ANY) to choose synchronous standbys from the listed ones. + * + * The method FIRST specifies a priority-based synchronous replication + * and makes transaction commits wait until their WAL records are + * replicated to the requested number of synchronous standbys chosen based + * on their priorities. The standbys whose names appear earlier in the list + * are given higher priority and will be considered as synchronous. + * Other standby servers appearing later in this list represent potential + * synchronous standbys. If any of the current synchronous standbys + * disconnects for whatever reason, it will be replaced immediately with + * the next-highest-priority standby. + * + * The method ANY specifies a quorum-based synchronous replication + * and makes transaction commits wait until their WAL records are + * replicated to at least the requested number of synchronous standbys + * in the list. All the standbys appearing in the list are considered as + * candidates for quorum synchronous standbys. * * Before the standbys chosen from synchronous_standby_names can * become the synchronous standbys they must have caught up with * the primary; that may take some time. Once caught up, - * the current higher priority standbys which are considered as - * synchronous at that moment will release waiters from the queue. + * the standbys which are considered as synchronous at that moment + * will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * @@ -79,18 +90,29 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; -static SyncRepConfigData *SyncRepConfig = NULL; +SyncRepConfigData *SyncRepConfig = NULL; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); -static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); +static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + bool *am_sync); +static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys); +static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys, uint8 nth); static int SyncRepGetStandbyPriority(void); +static List *SyncRepGetSyncStandbysPriority(bool *am_sync); +static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); @@ -386,7 +408,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_oldest; + bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; @@ -413,11 +435,10 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the oldest + * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ - got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, - &applyPtr, &am_sync); + got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, @@ -426,16 +447,22 @@ SyncRepReleaseWaiters(void) if (announce_next_takeover && am_sync) { announce_next_takeover = false; - ereport(LOG, - (errmsg("standby \"%s\" is now a synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); + + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, MyWalSnd->sync_standby_priority))); + else + ereport(LOG, + (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", + application_name))); } /* * If the number of sync standbys is less than requested or we aren't * managing a sync standby then just leave. */ - if (!got_oldest || !am_sync) + if (!got_recptr || !am_sync) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; @@ -471,21 +498,20 @@ SyncRepReleaseWaiters(void) } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the synced Write, Flush and Apply positions among sync standbys. * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and - * store the oldest positions into *writePtr, *flushPtr and *applyPtr. + * store the positions into *writePtr, *flushPtr and *applyPtr. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ static bool -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, +SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; - ListCell *cell; *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; @@ -508,12 +534,49 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, } /* - * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * In a priority-based sync replication, the synced positions are the + * oldest ones among sync standbys. In a quorum-based, they are the Nth + * latest ones. + * + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions. + * But we use SyncRepGetOldestSyncRecPtr() for that calculation because + * it's a bit more efficient. + * + * XXX If the numbers of current and requested sync standbys are the same, + * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced + * positions even in a quorum-based sync replication. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys); + } + else + { + SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, SyncRepConfig->num_sync); + } + + list_free(sync_standbys); + return true; +} + +/* + * Calculate the oldest Write, Flush and Apply positions among sync standbys. + */ +static void +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, List *sync_standbys) +{ + ListCell *cell; + + /* + * Scan through all sync standbys and calculate the oldest + * Write, Flush and Apply positions. */ - foreach(cell, sync_standbys) + foreach (cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; @@ -531,23 +594,163 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) *applyPtr = apply; } +} - list_free(sync_standbys); - return true; +/* + * Calculate the Nth latest Write, Flush and Apply positions among sync + * standbys. + */ +static void +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +{ + ListCell *cell; + XLogRecPtr *write_array; + XLogRecPtr *flush_array; + XLogRecPtr *apply_array; + int len; + int i = 0; + + len = list_length(sync_standbys); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + + foreach (cell, sync_standbys) + { + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + + SpinLockAcquire(&walsnd->mutex); + write_array[i] = walsnd->write; + flush_array[i] = walsnd->flush; + apply_array[i] = walsnd->apply; + SpinLockRelease(&walsnd->mutex); + + i++; + } + + qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + + /* Get Nth latest Write, Flush, Apply positions */ + *writePtr = write_array[nth - 1]; + *flushPtr = flush_array[nth - 1]; + *applyPtr = apply_array[nth - 1]; + + pfree(write_array); + pfree(flush_array); + pfree(apply_array); +} + +/* + * Compare lsn in order to sort array in descending order. + */ +static int +cmp_lsn(const void *a, const void *b) +{ + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + + if (lsn1 > lsn2) + return -1; + else if (lsn1 == lsn2) + return 0; + else + return 1; } /* * Return the list of sync standbys, or NIL if no sync standby is connected. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ List * -SyncRepGetSyncStandbys(bool *am_sync) +SyncRepGetSyncStandbys(bool *am_sync) +{ + /* Set default result */ + if (am_sync != NULL) + *am_sync = false; + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return NIL; + + return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? + SyncRepGetSyncStandbysPriority(am_sync) : + SyncRepGetSyncStandbysQuorum(am_sync); +} + +/* + * Return the list of all the candidates for quorum sync standbys, + * or NIL if no such standby is connected. + * + * The caller must hold SyncRepLock. This function must be called only in + * a quorum-based sync replication. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static List * +SyncRepGetSyncStandbysQuorum(bool *am_sync) +{ + List *result = NIL; + int i; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + + Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + + for (i = 0; i < max_wal_senders; i++) + { + walsnd = &WalSndCtl->walsnds[i]; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * Consider this standby as a candidate for quorum sync standbys + * and append it to the result. + */ + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + } + + return result; +} + +/* + * Return the list of sync standbys chosen based on their priorities, + * or NIL if no sync standby is connected. + * + * If there are multiple standbys with the same priority, + * the first one found is selected preferentially. + * + * The caller must hold SyncRepLock. This function must be called only in + * a priority-based sync replication. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static List * +SyncRepGetSyncStandbysPriority(bool *am_sync) { List *result = NIL; List *pending = NIL; @@ -560,13 +763,7 @@ SyncRepGetSyncStandbys(bool *am_sync) volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; - - /* Quick exit if sync replication is not requested */ - if (SyncRepConfig == NULL) - return NIL; + Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; |