diff options
Diffstat (limited to 'src/include/pgstat.h')
-rw-r--r-- | src/include/pgstat.h | 109 |
1 files changed, 104 insertions, 5 deletions
diff --git a/src/include/pgstat.h b/src/include/pgstat.h index bcd3588ea29..5b51b58e5a8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/logicalproto.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/hsearch.h" @@ -83,6 +84,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_REPLSLOT, PGSTAT_MTYPE_CONNECT, PGSTAT_MTYPE_DISCONNECT, + PGSTAT_MTYPE_SUBSCRIPTIONPURGE, + PGSTAT_MTYPE_SUBWORKERERROR, } StatMsgType; /* ---------- @@ -145,7 +148,8 @@ typedef enum PgStat_Shared_Reset_Target typedef enum PgStat_Single_Reset_Type { RESET_TABLE, - RESET_FUNCTION + RESET_FUNCTION, + RESET_SUBWORKER } PgStat_Single_Reset_Type; /* ------------------------------------------------------------ @@ -364,6 +368,7 @@ typedef struct PgStat_MsgResetsinglecounter Oid m_databaseid; PgStat_Single_Reset_Type m_resettype; Oid m_objectid; + Oid m_subobjectid; } PgStat_MsgResetsinglecounter; /* ---------- @@ -536,6 +541,54 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; +/* ---------- + * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the + * collector about the dead subscriptions. + * ---------- + */ +#define PGSTAT_NUM_SUBSCRIPTIONPURGE \ + ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid)) + +typedef struct PgStat_MsgSubscriptionPurge +{ + PgStat_MsgHdr m_hdr; + Oid m_databaseid; + int m_nentries; + Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE]; +} PgStat_MsgSubscriptionPurge; + +/* ---------- + * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync + * worker to report the error occurred while + * processing changes. + * ---------- + */ +#define PGSTAT_SUBWORKERERROR_MSGLEN 256 +typedef struct PgStat_MsgSubWorkerError +{ + PgStat_MsgHdr m_hdr; + + /* + * m_subid and m_subrelid are used to determine the subscription and the + * reporter of the error. m_subrelid is InvalidOid if reported by an apply + * worker otherwise reported by a table sync worker. + */ + Oid m_databaseid; + Oid m_subid; + Oid m_subrelid; + + /* + * Oid of the table that the reporter was actually processing. m_relid can + * be InvalidOid if an error occurred during worker applying a + * non-data-modification message such as RELATION. + */ + Oid m_relid; + + LogicalRepMsgType m_command; + TransactionId m_xid; + TimestampTz m_timestamp; + char m_message[PGSTAT_SUBWORKERERROR_MSGLEN]; +} PgStat_MsgSubWorkerError; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -714,6 +767,8 @@ typedef union PgStat_Msg PgStat_MsgReplSlot msg_replslot; PgStat_MsgConnect msg_connect; PgStat_MsgDisconnect msg_disconnect; + PgStat_MsgSubscriptionPurge msg_subscriptionpurge; + PgStat_MsgSubWorkerError msg_subworkererror; } PgStat_Msg; @@ -725,7 +780,7 @@ typedef union PgStat_Msg * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA4 +#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5 /* ---------- * PgStat_StatDBEntry The collector's data per database @@ -768,11 +823,16 @@ typedef struct PgStat_StatDBEntry TimestampTz stats_timestamp; /* time of db stats file update */ /* - * tables and functions must be last in the struct, because we don't write - * the pointers out to the stats file. + * tables, functions, and subscription workers must be last in the struct, + * because we don't write the pointers out to the stats file. + * + * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores + * statistics of logical replication workers: apply worker and table sync + * worker. */ HTAB *tables; HTAB *functions; + HTAB *subworkers; } PgStat_StatDBEntry; @@ -929,6 +989,38 @@ typedef struct PgStat_StatReplSlotEntry TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; +/* The lookup key for subscription worker hash table */ +typedef struct PgStat_StatSubWorkerKey +{ + Oid subid; + + /* + * Oid of the table for which tablesync worker will copy the initial data. + * An InvalidOid will be assigned for apply workers. + */ + Oid subrelid; +} PgStat_StatSubWorkerKey; + +/* + * Logical replication apply worker and table sync worker statistics kept in the + * stats collector. + */ +typedef struct PgStat_StatSubWorkerEntry +{ + PgStat_StatSubWorkerKey key; /* hash key (must be first) */ + + /* + * Subscription worker error statistics representing an error that + * occurred during application of changes or the initial table + * synchronization. + */ + Oid last_error_relid; + LogicalRepMsgType last_error_command; + TransactionId last_error_xid; + PgStat_Counter last_error_count; + TimestampTz last_error_time; + char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN]; +} PgStat_StatSubWorkerEntry; /* * Working state needed to accumulate per-function-call timing statistics. @@ -1019,7 +1111,8 @@ extern void pgstat_drop_database(Oid databaseid); extern void pgstat_clear_snapshot(void); extern void pgstat_reset_counters(void); extern void pgstat_reset_shared_counters(const char *); -extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); +extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid, + PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); @@ -1038,6 +1131,10 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); +extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, + TransactionId xid, const char *errmsg); +extern void pgstat_report_subscription_drop(Oid subid); extern void pgstat_initialize(void); @@ -1129,6 +1226,8 @@ extern void pgstat_send_wal(bool force); extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid); extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid); extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid); +extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid, + Oid subrelid); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void); extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void); |