diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/system_views.sql | 1 | ||||
| -rw-r--r-- | src/backend/replication/logical/sequencesync.c | 3 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 3 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 18 | ||||
| -rw-r--r-- | src/backend/utils/activity/pgstat_subscription.c | 27 | ||||
| -rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 27 |
6 files changed, 52 insertions, 27 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee..059e8778ca7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index a8a39bec508..e093e65e540 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -732,6 +732,9 @@ start_sequence_sync() * idle state. */ AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + PG_RE_THROW(); } } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e5a2856fd17..dcc6124cc73 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1530,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 28f61f96a1a..93970c6af29 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5606,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + MyLogicalRepWorker->type); PG_RE_THROW(); } @@ -5953,15 +5954,12 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - if (am_leader_apply_worker() || am_tablesync_worker()) - { - /* - * Report the worker failed during either table synchronization or - * apply. - */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - } + /* + * Report the worker failed during sequence synchronization, table + * synchronization, or apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->type); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07..35916772b9d 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->seq_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(seq_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e..1521d6e2ab4 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* seq_sync_error_count */ + values[i++] = Int64GetDatum(subentry->seq_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); |
