summaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c40
1 files changed, 39 insertions, 1 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 79735da21a9..799242128a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -53,6 +53,7 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
+static void update_replication_progress(LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -277,7 +278,7 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ update_replication_progress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -385,6 +386,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
RelationSyncEntry *relentry;
Relation ancestor = NULL;
+ update_replication_progress(ctx);
+
if (!is_publishable_relation(relation))
return;
@@ -514,6 +517,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelids;
Oid *relids;
+ update_replication_progress(ctx);
+
old = MemoryContextSwitchTo(data->context);
relids = palloc0(nrelations * sizeof(Oid));
@@ -912,3 +917,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
entry->replicate_valid = false;
}
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are not published.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx)
+{
+ static int changes_count = 0;
+
+ /*
+ * We don't want to try sending a keepalive message after processing each
+ * change as that can have overhead. Tests revealed that there is no
+ * noticeable overhead in doing it after continuously processing 100 or so
+ * changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ /*
+ * If we are at the end of transaction LSN, update progress tracking.
+ * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+ * try to send a keepalive message if required.
+ */
+ if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+ {
+ OutputPluginUpdateProgress(ctx);
+ changes_count = 0;
+ }
+}