summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/logical/sequencesync.c3
-rw-r--r--src/backend/replication/logical/tablesync.c3
-rw-r--r--src/backend/replication/logical/worker.c18
-rw-r--r--src/backend/utils/activity/pgstat_subscription.c27
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c27
6 files changed, 52 insertions, 27 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index dec8df4f8ee..059e8778ca7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.subid,
s.subname,
ss.apply_error_count,
+ ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index a8a39bec508..e093e65e540 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -732,6 +732,9 @@ start_sequence_sync()
* idle state.
*/
AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_SEQUENCESYNC);
+
PG_RE_THROW();
}
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e5a2856fd17..dcc6124cc73 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1530,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, false);
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_TABLESYNC);
PG_RE_THROW();
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 28f61f96a1a..93970c6af29 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5606,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos)
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ MyLogicalRepWorker->type);
PG_RE_THROW();
}
@@ -5953,15 +5954,12 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
- if (am_leader_apply_worker() || am_tablesync_worker())
- {
- /*
- * Report the worker failed during either table synchronization or
- * apply.
- */
- pgstat_report_subscription_error(MyLogicalRepWorker->subid,
- !am_tablesync_worker());
- }
+ /*
+ * Report the worker failed during sequence synchronization, table
+ * synchronization, or apply.
+ */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->type);
/* Disable the subscription */
StartTransactionCommand();
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index f9a1c831a07..35916772b9d 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -17,6 +17,7 @@
#include "postgres.h"
+#include "replication/worker_internal.h"
#include "utils/pgstat_internal.h"
@@ -24,7 +25,7 @@
* Report a subscription error.
*/
void
-pgstat_report_subscription_error(Oid subid, bool is_apply_error)
+pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
@@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
InvalidOid, subid, NULL);
pending = entry_ref->pending;
- if (is_apply_error)
- pending->apply_error_count++;
- else
- pending->sync_error_count++;
+ switch (wtype)
+ {
+ case WORKERTYPE_APPLY:
+ pending->apply_error_count++;
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ pending->seq_sync_error_count++;
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ pending->sync_error_count++;
+ break;
+
+ default:
+ /* Should never happen. */
+ Assert(0);
+ break;
+ }
}
/*
@@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
+ SUB_ACC(seq_sync_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a710508979e..1521d6e2ab4 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
/* apply_error_count */
values[i++] = Int64GetDatum(subentry->apply_error_count);
+ /* seq_sync_error_count */
+ values[i++] = Int64GetDatum(subentry->seq_sync_error_count);
+
/* sync_error_count */
values[i++] = Int64GetDatum(subentry->sync_error_count);