summaryrefslogtreecommitdiff
path: root/src/include/pgstat.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/pgstat.h')
-rw-r--r--src/include/pgstat.h109
1 files changed, 104 insertions, 5 deletions
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bcd3588ea29..5b51b58e5a8 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/hsearch.h"
@@ -83,6 +84,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_REPLSLOT,
PGSTAT_MTYPE_CONNECT,
PGSTAT_MTYPE_DISCONNECT,
+ PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
+ PGSTAT_MTYPE_SUBWORKERERROR,
} StatMsgType;
/* ----------
@@ -145,7 +148,8 @@ typedef enum PgStat_Shared_Reset_Target
typedef enum PgStat_Single_Reset_Type
{
RESET_TABLE,
- RESET_FUNCTION
+ RESET_FUNCTION,
+ RESET_SUBWORKER
} PgStat_Single_Reset_Type;
/* ------------------------------------------------------------
@@ -364,6 +368,7 @@ typedef struct PgStat_MsgResetsinglecounter
Oid m_databaseid;
PgStat_Single_Reset_Type m_resettype;
Oid m_objectid;
+ Oid m_subobjectid;
} PgStat_MsgResetsinglecounter;
/* ----------
@@ -536,6 +541,54 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
+/* ----------
+ * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the
+ * collector about the dead subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONPURGE \
+ ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionPurge
+{
+ PgStat_MsgHdr m_hdr;
+ Oid m_databaseid;
+ int m_nentries;
+ Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
+} PgStat_MsgSubscriptionPurge;
+
+/* ----------
+ * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync
+ * worker to report the error occurred while
+ * processing changes.
+ * ----------
+ */
+#define PGSTAT_SUBWORKERERROR_MSGLEN 256
+typedef struct PgStat_MsgSubWorkerError
+{
+ PgStat_MsgHdr m_hdr;
+
+ /*
+ * m_subid and m_subrelid are used to determine the subscription and the
+ * reporter of the error. m_subrelid is InvalidOid if reported by an apply
+ * worker otherwise reported by a table sync worker.
+ */
+ Oid m_databaseid;
+ Oid m_subid;
+ Oid m_subrelid;
+
+ /*
+ * Oid of the table that the reporter was actually processing. m_relid can
+ * be InvalidOid if an error occurred during worker applying a
+ * non-data-modification message such as RELATION.
+ */
+ Oid m_relid;
+
+ LogicalRepMsgType m_command;
+ TransactionId m_xid;
+ TimestampTz m_timestamp;
+ char m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_MsgSubWorkerError;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
@@ -714,6 +767,8 @@ typedef union PgStat_Msg
PgStat_MsgReplSlot msg_replslot;
PgStat_MsgConnect msg_connect;
PgStat_MsgDisconnect msg_disconnect;
+ PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
+ PgStat_MsgSubWorkerError msg_subworkererror;
} PgStat_Msg;
@@ -725,7 +780,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
-#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA4
+#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5
/* ----------
* PgStat_StatDBEntry The collector's data per database
@@ -768,11 +823,16 @@ typedef struct PgStat_StatDBEntry
TimestampTz stats_timestamp; /* time of db stats file update */
/*
- * tables and functions must be last in the struct, because we don't write
- * the pointers out to the stats file.
+ * tables, functions, and subscription workers must be last in the struct,
+ * because we don't write the pointers out to the stats file.
+ *
+ * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores
+ * statistics of logical replication workers: apply worker and table sync
+ * worker.
*/
HTAB *tables;
HTAB *functions;
+ HTAB *subworkers;
} PgStat_StatDBEntry;
@@ -929,6 +989,38 @@ typedef struct PgStat_StatReplSlotEntry
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
+/* The lookup key for subscription worker hash table */
+typedef struct PgStat_StatSubWorkerKey
+{
+ Oid subid;
+
+ /*
+ * Oid of the table for which tablesync worker will copy the initial data.
+ * An InvalidOid will be assigned for apply workers.
+ */
+ Oid subrelid;
+} PgStat_StatSubWorkerKey;
+
+/*
+ * Logical replication apply worker and table sync worker statistics kept in the
+ * stats collector.
+ */
+typedef struct PgStat_StatSubWorkerEntry
+{
+ PgStat_StatSubWorkerKey key; /* hash key (must be first) */
+
+ /*
+ * Subscription worker error statistics representing an error that
+ * occurred during application of changes or the initial table
+ * synchronization.
+ */
+ Oid last_error_relid;
+ LogicalRepMsgType last_error_command;
+ TransactionId last_error_xid;
+ PgStat_Counter last_error_count;
+ TimestampTz last_error_time;
+ char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_StatSubWorkerEntry;
/*
* Working state needed to accumulate per-function-call timing statistics.
@@ -1019,7 +1111,8 @@ extern void pgstat_drop_database(Oid databaseid);
extern void pgstat_clear_snapshot(void);
extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
-extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
+extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid,
+ PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
extern void pgstat_reset_replslot_counter(const char *name);
@@ -1038,6 +1131,10 @@ extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+ LogicalRepMsgType command,
+ TransactionId xid, const char *errmsg);
+extern void pgstat_report_subscription_drop(Oid subid);
extern void pgstat_initialize(void);
@@ -1129,6 +1226,8 @@ extern void pgstat_send_wal(bool force);
extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid);
+extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid,
+ Oid subrelid);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void);
extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);