summaryrefslogtreecommitdiff
path: root/src/backend/postmaster/pgstat.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-11-30 08:54:30 +0530
committerAmit Kapila <akapila@postgresql.org>2021-11-30 08:54:30 +0530
commit8d74fc96db5fd547e077bf9bf4c3b67f821d71cd (patch)
tree3037345a7edabd025edcc5d9b431fb14f780e817 /src/backend/postmaster/pgstat.c
parent98105e53e0ab472b7721a3e8d7b9f1750a635120 (diff)
Add a view to show the stats of subscription workers.
This commit adds a new system view pg_stat_subscription_workers, that shows information about any errors which occur during the application of logical replication changes as well as during performing initial table synchronization. The subscription statistics entries are removed when the corresponding subscription is removed. It also adds an SQL function pg_stat_reset_subscription_worker() to reset single subscription errors. The contents of this view can be used by an upcoming patch that skips the particular transaction that conflicts with the existing data on the subscriber. This view can be extended in the future to track other xact related statistics like the number of xacts committed/aborted for subscription workers. Author: Masahiko Sawada Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r--src/backend/postmaster/pgstat.c409
1 files changed, 394 insertions, 15 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8c166e5e161..7264d2c7272 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,7 @@
#include "catalog/catalog.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
#include "common/ip.h"
#include "executor/instrument.h"
#include "libpq/libpq.h"
@@ -105,6 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
+#define PGSTAT_SUBWORKER_HASH_SIZE 32
#define PGSTAT_REPLSLOT_HASH_SIZE 32
@@ -320,10 +322,14 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
+static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
+ Oid subid, Oid subrelid,
+ bool create);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
-static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
+static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
+ HTAB *subworkerhash, bool permanent);
static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
@@ -335,6 +341,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
+static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
static bool pgstat_should_report_connstat(void);
static void pgstat_report_disconnect(Oid dboid);
@@ -373,6 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
+static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@@ -1302,6 +1311,74 @@ pgstat_vacuum_stat(void)
hash_destroy(htab);
}
+
+ /*
+ * Repeat for subscription workers. Similarly, we needn't bother in the
+ * common case where no subscription workers' stats are being collected.
+ */
+ if (dbentry->subworkers != NULL &&
+ hash_get_num_entries(dbentry->subworkers) > 0)
+ {
+ PgStat_StatSubWorkerEntry *subwentry;
+ PgStat_MsgSubscriptionPurge spmsg;
+
+ /*
+ * Read pg_subscription and make a list of OIDs of all existing
+ * subscriptions
+ */
+ htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+ spmsg.m_databaseid = MyDatabaseId;
+ spmsg.m_nentries = 0;
+
+ hash_seq_init(&hstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ bool exists = false;
+ Oid subid = subwentry->key.subid;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL)
+ continue;
+
+ /*
+ * It is possible that we have multiple entries for the
+ * subscription corresponding to apply worker and tablesync
+ * workers. In such cases, we don't need to add the same subid
+ * again.
+ */
+ for (int i = 0; i < spmsg.m_nentries; i++)
+ {
+ if (spmsg.m_subids[i] == subid)
+ {
+ exists = true;
+ break;
+ }
+ }
+
+ if (exists)
+ continue;
+
+ /* This subscription is dead, add the subid to the message */
+ spmsg.m_subids[spmsg.m_nentries++] = subid;
+
+ /*
+ * If the message is full, send it out and reinitialize to empty
+ */
+ if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
+ {
+ pgstat_send_subscription_purge(&spmsg);
+ spmsg.m_nentries = 0;
+ }
+ }
+
+ /* Send the rest of dead subscriptions */
+ if (spmsg.m_nentries > 0)
+ pgstat_send_subscription_purge(&spmsg);
+
+ hash_destroy(htab);
+ }
}
@@ -1474,7 +1551,8 @@ pgstat_reset_shared_counters(const char *target)
* ----------
*/
void
-pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
+pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
+ PgStat_Single_Reset_Type type)
{
PgStat_MsgResetsinglecounter msg;
@@ -1485,6 +1563,7 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
msg.m_databaseid = MyDatabaseId;
msg.m_resettype = type;
msg.m_objectid = objoid;
+ msg.m_subobjectid = subobjoid;
pgstat_send(&msg, sizeof(msg));
}
@@ -1870,6 +1949,51 @@ pgstat_report_replslot_drop(const char *slotname)
}
/* ----------
+ * pgstat_report_subworker_error() -
+ *
+ * Tell the collector about the subscription worker error.
+ * ----------
+ */
+void
+pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+ LogicalRepMsgType command, TransactionId xid,
+ const char *errmsg)
+{
+ PgStat_MsgSubWorkerError msg;
+ int len;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
+ msg.m_databaseid = MyDatabaseId;
+ msg.m_subid = subid;
+ msg.m_subrelid = subrelid;
+ msg.m_relid = relid;
+ msg.m_command = command;
+ msg.m_xid = xid;
+ msg.m_timestamp = GetCurrentTimestamp();
+ strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
+
+ len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1;
+ pgstat_send(&msg, len);
+}
+
+/* ----------
+ * pgstat_report_subscription_drop() -
+ *
+ * Tell the collector about dropping the subscription.
+ * ----------
+ */
+void
+pgstat_report_subscription_drop(Oid subid)
+{
+ PgStat_MsgSubscriptionPurge msg;
+
+ msg.m_databaseid = MyDatabaseId;
+ msg.m_subids[0] = subid;
+ msg.m_nentries = 1;
+ pgstat_send_subscription_purge(&msg);
+}
+
+/* ----------
* pgstat_ping() -
*
* Send some junk data to the collector to increase traffic.
@@ -2874,6 +2998,35 @@ pgstat_fetch_stat_funcentry(Oid func_id)
return funcentry;
}
+/*
+ * ---------
+ * pgstat_fetch_stat_subworker_entry() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * the collected statistics for subscription worker or NULL.
+ * ---------
+ */
+PgStat_StatSubWorkerEntry *
+pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid)
+{
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *wentry = NULL;
+
+ /* Load the stats file if needed */
+ backend_read_statsfile();
+
+ /*
+ * Lookup our database, then find the requested subscription worker stats.
+ */
+ dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+ if (dbentry != NULL && dbentry->subworkers != NULL)
+ {
+ wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid,
+ false);
+ }
+
+ return wentry;
+}
/*
* ---------
@@ -3312,6 +3465,23 @@ pgstat_send_slru(void)
}
}
+/* --------
+ * pgstat_send_subscription_purge() -
+ *
+ * Send a subscription purge message to the collector
+ * --------
+ */
+static void
+pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
+{
+ int len;
+
+ len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+ + msg->m_nentries * sizeof(Oid);
+
+ pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+ pgstat_send(msg, len);
+}
/* ----------
* PgstatCollectorMain() -
@@ -3568,6 +3738,14 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_disconnect(&msg.msg_disconnect, len);
break;
+ case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+ pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_SUBWORKERERROR:
+ pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+ break;
+
default:
break;
}
@@ -3613,7 +3791,8 @@ PgstatCollectorMain(int argc, char *argv[])
/*
* Subroutine to clear stats in a database entry
*
- * Tables and functions hashes are initialized to empty.
+ * Tables, functions, and subscription workers hashes are initialized
+ * to empty.
*/
static void
reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
@@ -3666,6 +3845,13 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
+
+ hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+ dbentry->subworkers = hash_create("Per-database subscription worker",
+ PGSTAT_SUBWORKER_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
}
/*
@@ -3690,7 +3876,7 @@ pgstat_get_db_entry(Oid databaseid, bool create)
/*
* If not found, initialize the new one. This creates empty hash tables
- * for tables and functions, too.
+ * for tables, functions, and subscription workers, too.
*/
if (!found)
reset_dbentry_counters(result);
@@ -3748,6 +3934,47 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result;
}
+/* ----------
+ * pgstat_get_subworker_entry
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * relation OID. If subrelid is InvalidOid, it returns an entry of the
+ * apply worker otherwise returns an entry of the table sync worker
+ * associated with subrelid. If no subscription worker entry exists,
+ * initialize it, if the create parameter is true. Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubWorkerEntry *
+pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
+ bool create)
+{
+ PgStat_StatSubWorkerEntry *subwentry;
+ PgStat_StatSubWorkerKey key;
+ bool found;
+ HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
+
+ key.subid = subid;
+ key.subrelid = subrelid;
+ subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers,
+ (void *) &key,
+ action, &found);
+
+ if (!create && !found)
+ return NULL;
+
+ /* If not found, initialize the new one */
+ if (!found)
+ {
+ subwentry->last_error_relid = InvalidOid;
+ subwentry->last_error_command = 0;
+ subwentry->last_error_xid = InvalidTransactionId;
+ subwentry->last_error_count = 0;
+ subwentry->last_error_time = 0;
+ subwentry->last_error_message[0] = '\0';
+ }
+
+ return subwentry;
+}
/* ----------
* pgstat_write_statsfiles() -
@@ -3832,8 +4059,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
- * Write out the table and function stats for this DB into the
- * appropriate per-DB stat file, if required.
+ * Write out the table, function, and subscription-worker stats for
+ * this DB into the appropriate per-DB stat file, if required.
*/
if (allDbs || pgstat_db_requested(dbentry->databaseid))
{
@@ -3947,8 +4174,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
HASH_SEQ_STATUS tstat;
HASH_SEQ_STATUS fstat;
+ HASH_SEQ_STATUS sstat;
PgStat_StatTabEntry *tabentry;
PgStat_StatFuncEntry *funcentry;
+ PgStat_StatSubWorkerEntry *subwentry;
FILE *fpout;
int32 format_id;
Oid dbid = dbentry->databaseid;
@@ -4004,6 +4233,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
}
/*
+ * Walk through the database's subscription worker stats table.
+ */
+ hash_seq_init(&sstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
+ {
+ fputc('S', fpout);
+ rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
+ /*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
* after each individual fputc or fwrite above.
@@ -4061,8 +4301,9 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
* files after reading; the in-memory status is now authoritative, and the
* files would be out of date in case somebody else reads them.
*
- * If a 'deep' read is requested, table/function stats are read, otherwise
- * the table/function hash tables remain empty.
+ * If a 'deep' read is requested, table/function/subscription-worker stats are
+ * read, otherwise the table/function/subscription-worker hash tables remain
+ * empty.
* ----------
*/
static HTAB *
@@ -4241,6 +4482,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
dbentry->tables = NULL;
dbentry->functions = NULL;
+ dbentry->subworkers = NULL;
/*
* In the collector, disregard the timestamp we read from the
@@ -4252,8 +4494,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbentry->stats_timestamp = 0;
/*
- * Don't create tables/functions hashtables for uninteresting
- * databases.
+ * Don't create tables/functions/subworkers hashtables for
+ * uninteresting databases.
*/
if (onlydb != InvalidOid)
{
@@ -4278,6 +4520,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+ hash_ctl.hcxt = pgStatLocalContext;
+ dbentry->subworkers = hash_create("Per-database subscription worker",
+ PGSTAT_SUBWORKER_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
/*
* If requested, read the data from the database-specific
* file. Otherwise we just leave the hashtables empty.
@@ -4286,6 +4536,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
pgstat_read_db_statsfile(dbentry->databaseid,
dbentry->tables,
dbentry->functions,
+ dbentry->subworkers,
permanent);
break;
@@ -4363,19 +4614,21 @@ done:
* As in pgstat_read_statsfiles, if the permanent file is requested, it is
* removed after reading.
*
- * Note: this code has the ability to skip storing per-table or per-function
- * data, if NULL is passed for the corresponding hashtable. That's not used
- * at the moment though.
+ * Note: this code has the ability to skip storing per-table, per-function, or
+ * per-subscription-worker data, if NULL is passed for the corresponding hashtable.
+ * That's not used at the moment though.
* ----------
*/
static void
pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- bool permanent)
+ HTAB *subworkerhash, bool permanent)
{
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
PgStat_StatFuncEntry funcbuf;
PgStat_StatFuncEntry *funcentry;
+ PgStat_StatSubWorkerEntry subwbuf;
+ PgStat_StatSubWorkerEntry *subwentry;
FILE *fpin;
int32 format_id;
bool found;
@@ -4490,6 +4743,41 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
break;
/*
+ * 'S' A PgStat_StatSubWorkerEntry struct describing
+ * subscription worker statistics.
+ */
+ case 'S':
+ if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry),
+ fpin) != sizeof(PgStat_StatSubWorkerEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ /*
+ * Skip if subscription worker data not wanted.
+ */
+ if (subworkerhash == NULL)
+ break;
+
+ subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash,
+ (void *) &subwbuf.key,
+ HASH_ENTER, &found);
+
+ if (found)
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ memcpy(subwentry, &subwbuf, sizeof(subwbuf));
+ break;
+
+ /*
* 'E' The EOF marker of a complete stats file.
*/
case 'E':
@@ -5162,6 +5450,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
+ if (dbentry->subworkers != NULL)
+ hash_destroy(dbentry->subworkers);
if (hash_search(pgStatDBHash,
(void *) &dbid,
@@ -5199,13 +5489,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
+ if (dbentry->subworkers != NULL)
+ hash_destroy(dbentry->subworkers);
dbentry->tables = NULL;
dbentry->functions = NULL;
+ dbentry->subworkers = NULL;
/*
* Reset database-level stats, too. This creates empty hash tables for
- * tables and functions.
+ * tables, functions, and subscription workers.
*/
reset_dbentry_counters(dbentry);
}
@@ -5274,6 +5567,14 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
else if (msg->m_resettype == RESET_FUNCTION)
(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
HASH_REMOVE, NULL);
+ else if (msg->m_resettype == RESET_SUBWORKER)
+ {
+ PgStat_StatSubWorkerKey key;
+
+ key.subid = msg->m_objectid;
+ key.subrelid = msg->m_subobjectid;
+ (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL);
+ }
}
/* ----------
@@ -5817,6 +6118,84 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
}
/* ----------
+ * pgstat_recv_subscription_purge() -
+ *
+ * Process a SUBSCRIPTIONPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+{
+ HASH_SEQ_STATUS hstat;
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *subwentry;
+
+ dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
+
+ /* No need to purge if we don't even know the database */
+ if (!dbentry || !dbentry->subworkers)
+ return;
+
+ /* Remove all subscription worker statistics for the given subscriptions */
+ hash_seq_init(&hstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ for (int i = 0; i < msg->m_nentries; i++)
+ {
+ if (subwentry->key.subid == msg->m_subids[i])
+ {
+ (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key),
+ HASH_REMOVE, NULL);
+ break;
+ }
+ }
+ }
+}
+
+/* ----------
+ * pgstat_recv_subworker_error() -
+ *
+ * Process a SUBWORKERERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+{
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *subwentry;
+
+ dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+
+ /* Get the subscription worker stats */
+ subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+ msg->m_subrelid, true);
+ Assert(subwentry);
+
+ if (subwentry->last_error_relid == msg->m_relid &&
+ subwentry->last_error_command == msg->m_command &&
+ subwentry->last_error_xid == msg->m_xid &&
+ strcmp(subwentry->last_error_message, msg->m_message) == 0)
+ {
+ /*
+ * The same error occurred again in succession, just update its
+ * timestamp and count.
+ */
+ subwentry->last_error_count++;
+ subwentry->last_error_time = msg->m_timestamp;
+ return;
+ }
+
+ /* Otherwise, update the error information */
+ subwentry->last_error_relid = msg->m_relid;
+ subwentry->last_error_command = msg->m_command;
+ subwentry->last_error_xid = msg->m_xid;
+ subwentry->last_error_count = 1;
+ subwentry->last_error_time = msg->m_timestamp;
+ strlcpy(subwentry->last_error_message, msg->m_message,
+ PGSTAT_SUBWORKERERROR_MSGLEN);
+}
+
+/* ----------
* pgstat_write_statsfile_needed() -
*
* Do we need to write out any stats files?