diff options
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 409 |
1 files changed, 394 insertions, 15 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 8c166e5e161..7264d2c7272 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -41,6 +41,7 @@ #include "catalog/catalog.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" #include "common/ip.h" #include "executor/instrument.h" #include "libpq/libpq.h" @@ -105,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_SUBWORKER_HASH_SIZE 32 #define PGSTAT_REPLSLOT_HASH_SIZE 32 @@ -320,10 +322,14 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); +static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, + Oid subid, Oid subrelid, + bool create); static void pgstat_write_statsfiles(bool permanent, bool allDbs); static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); -static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent); +static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, + HTAB *subworkerhash, bool permanent); static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); @@ -335,6 +341,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); +static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); static void pgstat_report_disconnect(Oid dboid); @@ -373,6 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); +static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1302,6 +1311,74 @@ pgstat_vacuum_stat(void) hash_destroy(htab); } + + /* + * Repeat for subscription workers. Similarly, we needn't bother in the + * common case where no subscription workers' stats are being collected. + */ + if (dbentry->subworkers != NULL && + hash_get_num_entries(dbentry->subworkers) > 0) + { + PgStat_StatSubWorkerEntry *subwentry; + PgStat_MsgSubscriptionPurge spmsg; + + /* + * Read pg_subscription and make a list of OIDs of all existing + * subscriptions + */ + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + spmsg.m_databaseid = MyDatabaseId; + spmsg.m_nentries = 0; + + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + bool exists = false; + Oid subid = subwentry->key.subid; + + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL) + continue; + + /* + * It is possible that we have multiple entries for the + * subscription corresponding to apply worker and tablesync + * workers. In such cases, we don't need to add the same subid + * again. + */ + for (int i = 0; i < spmsg.m_nentries; i++) + { + if (spmsg.m_subids[i] == subid) + { + exists = true; + break; + } + } + + if (exists) + continue; + + /* This subscription is dead, add the subid to the message */ + spmsg.m_subids[spmsg.m_nentries++] = subid; + + /* + * If the message is full, send it out and reinitialize to empty + */ + if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) + { + pgstat_send_subscription_purge(&spmsg); + spmsg.m_nentries = 0; + } + } + + /* Send the rest of dead subscriptions */ + if (spmsg.m_nentries > 0) + pgstat_send_subscription_purge(&spmsg); + + hash_destroy(htab); + } } @@ -1474,7 +1551,8 @@ pgstat_reset_shared_counters(const char *target) * ---------- */ void -pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) +pgstat_reset_single_counter(Oid objoid, Oid subobjoid, + PgStat_Single_Reset_Type type) { PgStat_MsgResetsinglecounter msg; @@ -1485,6 +1563,7 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) msg.m_databaseid = MyDatabaseId; msg.m_resettype = type; msg.m_objectid = objoid; + msg.m_subobjectid = subobjoid; pgstat_send(&msg, sizeof(msg)); } @@ -1870,6 +1949,51 @@ pgstat_report_replslot_drop(const char *slotname) } /* ---------- + * pgstat_report_subworker_error() - + * + * Tell the collector about the subscription worker error. + * ---------- + */ +void +pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, TransactionId xid, + const char *errmsg) +{ + PgStat_MsgSubWorkerError msg; + int len; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); + msg.m_databaseid = MyDatabaseId; + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_relid = relid; + msg.m_command = command; + msg.m_xid = xid; + msg.m_timestamp = GetCurrentTimestamp(); + strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); + + len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1; + pgstat_send(&msg, len); +} + +/* ---------- + * pgstat_report_subscription_drop() - + * + * Tell the collector about dropping the subscription. + * ---------- + */ +void +pgstat_report_subscription_drop(Oid subid) +{ + PgStat_MsgSubscriptionPurge msg; + + msg.m_databaseid = MyDatabaseId; + msg.m_subids[0] = subid; + msg.m_nentries = 1; + pgstat_send_subscription_purge(&msg); +} + +/* ---------- * pgstat_ping() - * * Send some junk data to the collector to increase traffic. @@ -2874,6 +2998,35 @@ pgstat_fetch_stat_funcentry(Oid func_id) return funcentry; } +/* + * --------- + * pgstat_fetch_stat_subworker_entry() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for subscription worker or NULL. + * --------- + */ +PgStat_StatSubWorkerEntry * +pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *wentry = NULL; + + /* Load the stats file if needed */ + backend_read_statsfile(); + + /* + * Lookup our database, then find the requested subscription worker stats. + */ + dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); + if (dbentry != NULL && dbentry->subworkers != NULL) + { + wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid, + false); + } + + return wentry; +} /* * --------- @@ -3312,6 +3465,23 @@ pgstat_send_slru(void) } } +/* -------- + * pgstat_send_subscription_purge() - + * + * Send a subscription purge message to the collector + * -------- + */ +static void +pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg) +{ + int len; + + len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) + + msg->m_nentries * sizeof(Oid); + + pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + pgstat_send(msg, len); +} /* ---------- * PgstatCollectorMain() - @@ -3568,6 +3738,14 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; + case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: + pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + break; + + case PGSTAT_MTYPE_SUBWORKERERROR: + pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + break; + default: break; } @@ -3613,7 +3791,8 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Subroutine to clear stats in a database entry * - * Tables and functions hashes are initialized to empty. + * Tables, functions, and subscription workers hashes are initialized + * to empty. */ static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry) @@ -3666,6 +3845,13 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) PGSTAT_FUNCTION_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS); + + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); } /* @@ -3690,7 +3876,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) /* * If not found, initialize the new one. This creates empty hash tables - * for tables and functions, too. + * for tables, functions, and subscription workers, too. */ if (!found) reset_dbentry_counters(result); @@ -3748,6 +3934,47 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) return result; } +/* ---------- + * pgstat_get_subworker_entry + * + * Return subscription worker entry with the given subscription OID and + * relation OID. If subrelid is InvalidOid, it returns an entry of the + * apply worker otherwise returns an entry of the table sync worker + * associated with subrelid. If no subscription worker entry exists, + * initialize it, if the create parameter is true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubWorkerEntry * +pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, + bool create) +{ + PgStat_StatSubWorkerEntry *subwentry; + PgStat_StatSubWorkerKey key; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + key.subid = subid; + key.subrelid = subrelid; + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers, + (void *) &key, + action, &found); + + if (!create && !found) + return NULL; + + /* If not found, initialize the new one */ + if (!found) + { + subwentry->last_error_relid = InvalidOid; + subwentry->last_error_command = 0; + subwentry->last_error_xid = InvalidTransactionId; + subwentry->last_error_count = 0; + subwentry->last_error_time = 0; + subwentry->last_error_message[0] = '\0'; + } + + return subwentry; +} /* ---------- * pgstat_write_statsfiles() - @@ -3832,8 +4059,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) { /* - * Write out the table and function stats for this DB into the - * appropriate per-DB stat file, if required. + * Write out the table, function, and subscription-worker stats for + * this DB into the appropriate per-DB stat file, if required. */ if (allDbs || pgstat_db_requested(dbentry->databaseid)) { @@ -3947,8 +4174,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) { HASH_SEQ_STATUS tstat; HASH_SEQ_STATUS fstat; + HASH_SEQ_STATUS sstat; PgStat_StatTabEntry *tabentry; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpout; int32 format_id; Oid dbid = dbentry->databaseid; @@ -4004,6 +4233,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) } /* + * Walk through the database's subscription worker stats table. + */ + hash_seq_init(&sstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error * after each individual fputc or fwrite above. @@ -4061,8 +4301,9 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) * files after reading; the in-memory status is now authoritative, and the * files would be out of date in case somebody else reads them. * - * If a 'deep' read is requested, table/function stats are read, otherwise - * the table/function hash tables remain empty. + * If a 'deep' read is requested, table/function/subscription-worker stats are + * read, otherwise the table/function/subscription-worker hash tables remain + * empty. * ---------- */ static HTAB * @@ -4241,6 +4482,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * In the collector, disregard the timestamp we read from the @@ -4252,8 +4494,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbentry->stats_timestamp = 0; /* - * Don't create tables/functions hashtables for uninteresting - * databases. + * Don't create tables/functions/subworkers hashtables for + * uninteresting databases. */ if (onlydb != InvalidOid) { @@ -4278,6 +4520,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + hash_ctl.hcxt = pgStatLocalContext; + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* * If requested, read the data from the database-specific * file. Otherwise we just leave the hashtables empty. @@ -4286,6 +4536,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) pgstat_read_db_statsfile(dbentry->databaseid, dbentry->tables, dbentry->functions, + dbentry->subworkers, permanent); break; @@ -4363,19 +4614,21 @@ done: * As in pgstat_read_statsfiles, if the permanent file is requested, it is * removed after reading. * - * Note: this code has the ability to skip storing per-table or per-function - * data, if NULL is passed for the corresponding hashtable. That's not used - * at the moment though. + * Note: this code has the ability to skip storing per-table, per-function, or + * per-subscription-worker data, if NULL is passed for the corresponding hashtable. + * That's not used at the moment though. * ---------- */ static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - bool permanent) + HTAB *subworkerhash, bool permanent) { PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; PgStat_StatFuncEntry funcbuf; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry subwbuf; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpin; int32 format_id; bool found; @@ -4490,6 +4743,41 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, break; /* + * 'S' A PgStat_StatSubWorkerEntry struct describing + * subscription worker statistics. + */ + case 'S': + if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry), + fpin) != sizeof(PgStat_StatSubWorkerEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* + * Skip if subscription worker data not wanted. + */ + if (subworkerhash == NULL) + break; + + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash, + (void *) &subwbuf.key, + HASH_ENTER, &found); + + if (found) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + memcpy(subwentry, &subwbuf, sizeof(subwbuf)); + break; + + /* * 'E' The EOF marker of a complete stats file. */ case 'E': @@ -5162,6 +5450,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); if (hash_search(pgStatDBHash, (void *) &dbid, @@ -5199,13 +5489,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * Reset database-level stats, too. This creates empty hash tables for - * tables and functions. + * tables, functions, and subscription workers. */ reset_dbentry_counters(dbentry); } @@ -5274,6 +5567,14 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) else if (msg->m_resettype == RESET_FUNCTION) (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), HASH_REMOVE, NULL); + else if (msg->m_resettype == RESET_SUBWORKER) + { + PgStat_StatSubWorkerKey key; + + key.subid = msg->m_objectid; + key.subrelid = msg->m_subobjectid; + (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL); + } } /* ---------- @@ -5817,6 +6118,84 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } /* ---------- + * pgstat_recv_subscription_purge() - + * + * Process a SUBSCRIPTIONPURGE message. + * ---------- + */ +static void +pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +{ + HASH_SEQ_STATUS hstat; + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, false); + + /* No need to purge if we don't even know the database */ + if (!dbentry || !dbentry->subworkers) + return; + + /* Remove all subscription worker statistics for the given subscriptions */ + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + for (int i = 0; i < msg->m_nentries; i++) + { + if (subwentry->key.subid == msg->m_subids[i]) + { + (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key), + HASH_REMOVE, NULL); + break; + } + } + } +} + +/* ---------- + * pgstat_recv_subworker_error() - + * + * Process a SUBWORKERERROR message. + * ---------- + */ +static void +pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + + /* Get the subscription worker stats */ + subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, + msg->m_subrelid, true); + Assert(subwentry); + + if (subwentry->last_error_relid == msg->m_relid && + subwentry->last_error_command == msg->m_command && + subwentry->last_error_xid == msg->m_xid && + strcmp(subwentry->last_error_message, msg->m_message) == 0) + { + /* + * The same error occurred again in succession, just update its + * timestamp and count. + */ + subwentry->last_error_count++; + subwentry->last_error_time = msg->m_timestamp; + return; + } + + /* Otherwise, update the error information */ + subwentry->last_error_relid = msg->m_relid; + subwentry->last_error_command = msg->m_command; + subwentry->last_error_xid = msg->m_xid; + subwentry->last_error_count = 1; + subwentry->last_error_time = msg->m_timestamp; + strlcpy(subwentry->last_error_message, msg->m_message, + PGSTAT_SUBWORKERERROR_MSGLEN); +} + +/* ---------- * pgstat_write_statsfile_needed() - * * Do we need to write out any stats files? |