diff options
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r-- | src/backend/replication/syncrep.c | 245 |
1 files changed, 191 insertions, 54 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 864c6cba055..c53b3d61c1f 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); static List *SyncRepGetSyncStandbysPriority(bool *am_sync); static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -399,9 +403,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -452,7 +457,11 @@ SyncRepReleaseWaiters(void) /* * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * positions among all sync standbys. (Note: although this step does not + * of itself require holding SyncRepLock, it seems like a good idea to do + * it after acquiring the lock. This ensures that the WAL pointers we use + * to release waiters are newer than any previous execution of this + * routine used.) */ got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); @@ -527,25 +536,41 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; *am_sync = false; + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) + { + if (sync_standbys[i].is_me) + { + *am_sync = true; + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. */ if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -565,15 +590,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -581,27 +607,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; - - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -617,38 +640,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; - - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + int i; - foreach(cell, sync_standbys) - { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + /* Should have enough candidates, or somebody messed up */ + Assert(nth > 0 && nth <= num_standbys); - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - i++; + for (i = 0; i < num_standbys; i++) + { + write_array[i] = sync_standbys[i].write; + flush_array[i] = sync_standbys[i].flush; + apply_array[i] = sync_standbys[i].apply; } /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -678,12 +699,121 @@ cmp_lsn(const void *a, const void *b) } /* + * Return data about walsenders that are candidates to be sync standbys. + * + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. + * (This might be more or fewer than num_sync; caller must check.) + */ +int +SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys) +{ + int i; + int n; + + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return 0; + + /* Collect raw data from shared memory */ + n = 0; + for (i = 0; i < max_wal_senders; i++) + { + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ + + walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; + + SpinLockAcquire(&walsnd->mutex); + stby->pid = walsnd->pid; + state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; + SpinLockRelease(&walsnd->mutex); + + /* Must be active */ + if (stby->pid == 0) + continue; + + /* Must be streaming or stopping */ + if (state != WALSNDSTATE_STREAMING && + state != WALSNDSTATE_STOPPING) + continue; + + /* Must be synchronous */ + if (stby->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(stby->flush)) + continue; + + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; + } + + /* + * In quorum mode, we return all the candidates. In priority mode, if we + * have too many candidates then return only the num_sync ones of highest + * priority. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY && + n > SyncRepConfig->num_sync) + { + /* Sort by priority ... */ + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); + /* ... then report just the first num_sync ones */ + n = SyncRepConfig->num_sync; + } + + return n; +} + +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; + + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; + + /* + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) + */ + return sa->walsnd_index - sb->walsnd_index; +} + + +/* * Return the list of sync standbys, or NIL if no sync standby is connected. * * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. + * + * XXX This function is BROKEN and should not be used in new code. It has + * an inherent race condition, since the returned list of integer indexes + * might no longer correspond to reality. */ List * SyncRepGetSyncStandbys(bool *am_sync) @@ -943,8 +1073,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) priority = next_highest_priority; } - /* never reached, but keep compiler quiet */ - Assert(false); + /* + * We might get here if the set of sync_standby_priority values in shared + * memory is inconsistent, as can happen transiently after a change in the + * synchronous_standby_names setting. In that case, just return the + * incomplete list we have so far. That will cause the caller to decide + * there aren't enough synchronous candidates, which should be a safe + * choice until the priority values become consistent again. + */ + list_free(pending); return result; } |