summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/slotsync.c93
1 files changed, 70 insertions, 23 deletions
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b4afd87dc9..1f4f06d467b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
TransactionIdPrecedes(remote_slot->catalog_xmin,
slot->data.catalog_xmin))
{
+ /* Update slot sync skip stats */
+ pgstat_report_replslotsync(slot);
+
/*
* This can happen in following situations:
*
@@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+
+ /*
+ * If we can't reach a consistent snapshot, the slot won't be
+ * persisted. See update_and_persist_local_synced_slot().
+ */
+ if (found_consistent_snapshot && !(*found_consistent_snapshot))
+ pgstat_report_replslotsync(slot);
}
updated_xmin_or_lsn = true;
@@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
bool found_consistent_snapshot = false;
bool remote_slot_precedes = false;
+ /* Slotsync skip stats are handled in function update_local_synced_slot() */
(void) update_local_synced_slot(remote_slot, remote_dbid,
&found_consistent_snapshot,
&remote_slot_precedes);
@@ -624,31 +635,9 @@ static bool
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
- XLogRecPtr latestFlushPtr;
+ XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
bool slot_updated = false;
- /*
- * Make sure that concerned WAL is received and flushed before syncing
- * slot to target lsn received from the primary server.
- */
- latestFlushPtr = GetStandbyFlushRecPtr(NULL);
- if (remote_slot->confirmed_lsn > latestFlushPtr)
- {
- /*
- * Can get here only if GUC 'synchronized_standby_slots' on the
- * primary server was not configured correctly.
- */
- ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr)));
-
- return false;
- }
-
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
@@ -707,10 +696,40 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
+ pgstat_report_replslotsync(slot);
+
ReplicationSlotRelease();
return slot_updated;
}
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ *
+ * Report statistics only after the slot has been acquired, ensuring
+ * it cannot be dropped during the reporting process.
+ */
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ {
+ pgstat_report_replslotsync(slot);
+
+ /*
+ * Can get here only if GUC 'synchronized_standby_slots' on the
+ * primary server was not configured correctly.
+ */
+ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization because the received slot sync"
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ ReplicationSlotRelease();
+
+ return slot_updated;
+ }
+
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
@@ -784,6 +803,34 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ *
+ * Report statistics only after the slot has been acquired, ensuring
+ * it cannot be dropped during the reporting process.
+ */
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ {
+ pgstat_report_replslotsync(slot);
+
+ /*
+ * Can get here only if GUC 'synchronized_standby_slots' on the
+ * primary server was not configured correctly.
+ */
+ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization because the received slot sync"
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ ReplicationSlotRelease();
+
+ return false;
+ }
+
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
slot_updated = true;