diff options
| author | Amit Kapila <akapila@postgresql.org> | 2025-11-25 06:47:49 +0000 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2025-11-25 07:06:02 +0000 |
| commit | 76b78721ca49c16dfcbfbd2f4f87fcdb2db2bbb6 (patch) | |
| tree | bf239b845d97ce9385c6d7b03b8ba52e035895c3 /src/backend | |
| parent | c581c9a7ac2af2c75567013f25125bd294d49ff2 (diff) | |
Add slotsync skip statistics.
This patch adds two new columns to the pg_stat_replication_slots view:
slotsync_skip_count - the total number of times a slotsync operation was
skipped.
slotsync_skip_at - the timestamp of the most recent skip.
These additions provide better visibility into replication slot
synchronization behavior.
A future patch will introduce the slotsync_skip_reason column in
pg_replication_slots to capture the reason for skip.
Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAE9k0PkhfKrTEAsGz4DjOhEj1nQ+hbQVfvWUxNacD38ibW3a1g@mail.gmail.com
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/system_views.sql | 2 | ||||
| -rw-r--r-- | src/backend/replication/logical/slotsync.c | 89 | ||||
| -rw-r--r-- | src/backend/utils/activity/pgstat_replslot.c | 32 | ||||
| -rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 18 |
4 files changed, 113 insertions, 28 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 95ad29a64b9..6fffdb9398e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1076,6 +1076,8 @@ CREATE VIEW pg_stat_replication_slots AS s.mem_exceeded_count, s.total_txns, s.total_bytes, + s.slotsync_skip_count, + s.slotsync_skip_at, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc9..7e9dc7f18bd 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, TransactionIdPrecedes(remote_slot->catalog_xmin, slot->data.catalog_xmin)) { + /* Update slot sync skip stats */ + pgstat_report_replslotsync(slot); + /* * This can happen in following situations: * @@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.", LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), LSN_FORMAT_ARGS(slot->data.confirmed_flush))); + + /* + * If we can't reach a consistent snapshot, the slot won't be + * persisted. See update_and_persist_local_synced_slot(). + */ + if (found_consistent_snapshot && !(*found_consistent_snapshot)) + pgstat_report_replslotsync(slot); } updated_xmin_or_lsn = true; @@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) bool found_consistent_snapshot = false; bool remote_slot_precedes = false; + /* Slotsync skip stats are handled in function update_local_synced_slot() */ (void) update_local_synced_slot(remote_slot, remote_dbid, &found_consistent_snapshot, &remote_slot_precedes); @@ -624,31 +635,9 @@ static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot; - XLogRecPtr latestFlushPtr; + XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); bool slot_updated = false; - /* - * Make sure that concerned WAL is received and flushed before syncing - * slot to target lsn received from the primary server. - */ - latestFlushPtr = GetStandbyFlushRecPtr(NULL); - if (remote_slot->confirmed_lsn > latestFlushPtr) - { - /* - * Can get here only if GUC 'synchronized_standby_slots' on the - * primary server was not configured correctly. - */ - ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("skipping slot synchronization because the received slot sync" - " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), - remote_slot->name, - LSN_FORMAT_ARGS(latestFlushPtr))); - - return false; - } - /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { @@ -707,10 +696,38 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Skip the sync of an invalidated slot */ if (slot->data.invalidated != RS_INVAL_NONE) { + pgstat_report_replslotsync(slot); + ReplicationSlotRelease(); return slot_updated; } + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + * + * Report statistics only after the slot has been acquired, ensuring + * it cannot be dropped during the reporting process. + */ + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + pgstat_report_replslotsync(slot); + + /* + * Can get here only if GUC 'synchronized_standby_slots' on the + * primary server was not configured correctly. + */ + ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization because the received slot sync" + " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return slot_updated; + } + /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { @@ -784,6 +801,32 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + * + * Report statistics only after the slot has been acquired, ensuring + * it cannot be dropped during the reporting process. + */ + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + pgstat_report_replslotsync(slot); + + /* + * Can get here only if GUC 'synchronized_standby_slots' on the + * primary server was not configured correctly. + */ + ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization because the received slot sync" + " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return false; + } + update_and_persist_local_synced_slot(remote_slot, remote_dbid); slot_updated = true; diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index d210c261ac6..f93179146c2 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -103,6 +103,36 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re } /* + * Report replication slot sync skip statistics. + * + * Similar to pgstat_report_replslot(), we can rely on the stats for the + * slot to exist and to belong to this slot. + */ +void +pgstat_report_replslotsync(ReplicationSlot *slot) +{ + PgStat_EntryRef *entry_ref; + PgStatShared_ReplSlot *shstatent; + PgStat_StatReplSlotEntry *statent; + + /* Slot sync stats are valid only for logical slots on standby. */ + Assert(SlotIsLogical(slot)); + Assert(RecoveryInProgress()); + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot), false); + Assert(entry_ref != NULL); + + shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; + statent = &shstatent->stats; + + statent->slotsync_skip_count += 1; + statent->slotsync_skip_at = GetCurrentTimestamp(); + + pgstat_unlock_entry(entry_ref); +} + +/* * Report replication slot creation. * * NB: This gets called with ReplicationSlotAllocationLock already held, be @@ -133,7 +163,7 @@ pgstat_create_replslot(ReplicationSlot *slot) * Report replication slot has been acquired. * * This guarantees that a stats entry exists during later - * pgstat_report_replslot() calls. + * pgstat_report_replslot() or pgstat_report_replslotsync() calls. * * If we previously crashed, no stats data exists. But if we did not crash, * the stats do belong to this slot: diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 3d98d064a94..7e2ed69138a 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 11 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 13 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2160,7 +2160,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_at", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2186,11 +2190,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[7] = Int64GetDatum(slotent->mem_exceeded_count); values[8] = Int64GetDatum(slotent->total_txns); values[9] = Int64GetDatum(slotent->total_bytes); + values[10] = Int64GetDatum(slotent->slotsync_skip_count); + + if (slotent->slotsync_skip_at == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(slotent->slotsync_skip_at); if (slotent->stat_reset_timestamp == 0) - nulls[10] = true; + nulls[12] = true; else - values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); |
