diff options
Diffstat (limited to 'src/backend/replication/logical/slotsync.c')
| -rw-r--r-- | src/backend/replication/logical/slotsync.c | 93 |
1 files changed, 70 insertions, 23 deletions
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc9..1f4f06d467b 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, TransactionIdPrecedes(remote_slot->catalog_xmin, slot->data.catalog_xmin)) { + /* Update slot sync skip stats */ + pgstat_report_replslotsync(slot); + /* * This can happen in following situations: * @@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.", LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), LSN_FORMAT_ARGS(slot->data.confirmed_flush))); + + /* + * If we can't reach a consistent snapshot, the slot won't be + * persisted. See update_and_persist_local_synced_slot(). + */ + if (found_consistent_snapshot && !(*found_consistent_snapshot)) + pgstat_report_replslotsync(slot); } updated_xmin_or_lsn = true; @@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) bool found_consistent_snapshot = false; bool remote_slot_precedes = false; + /* Slotsync skip stats are handled in function update_local_synced_slot() */ (void) update_local_synced_slot(remote_slot, remote_dbid, &found_consistent_snapshot, &remote_slot_precedes); @@ -624,31 +635,9 @@ static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot; - XLogRecPtr latestFlushPtr; + XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); bool slot_updated = false; - /* - * Make sure that concerned WAL is received and flushed before syncing - * slot to target lsn received from the primary server. - */ - latestFlushPtr = GetStandbyFlushRecPtr(NULL); - if (remote_slot->confirmed_lsn > latestFlushPtr) - { - /* - * Can get here only if GUC 'synchronized_standby_slots' on the - * primary server was not configured correctly. - */ - ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("skipping slot synchronization because the received slot sync" - " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), - remote_slot->name, - LSN_FORMAT_ARGS(latestFlushPtr))); - - return false; - } - /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { @@ -707,10 +696,40 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Skip the sync of an invalidated slot */ if (slot->data.invalidated != RS_INVAL_NONE) { + pgstat_report_replslotsync(slot); + ReplicationSlotRelease(); return slot_updated; } + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + * + * Report statistics only after the slot has been acquired, ensuring + * it cannot be dropped during the reporting process. + */ + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + pgstat_report_replslotsync(slot); + + /* + * Can get here only if GUC 'synchronized_standby_slots' on the + * primary server was not configured correctly. + */ + ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization because the received slot sync" + " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + ReplicationSlotRelease(); + + return slot_updated; + } + /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { @@ -784,6 +803,34 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + * + * Report statistics only after the slot has been acquired, ensuring + * it cannot be dropped during the reporting process. + */ + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + pgstat_report_replslotsync(slot); + + /* + * Can get here only if GUC 'synchronized_standby_slots' on the + * primary server was not configured correctly. + */ + ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization because the received slot sync" + " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + ReplicationSlotRelease(); + + return false; + } + update_and_persist_local_synced_slot(remote_slot, remote_dbid); slot_updated = true; |
