diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 40 |
1 files changed, 39 insertions, 1 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 79735da21a9..799242128a2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -53,6 +53,7 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); +static void update_replication_progress(LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -277,7 +278,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -385,6 +386,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationSyncEntry *relentry; Relation ancestor = NULL; + update_replication_progress(ctx); + if (!is_publishable_relation(relation)) return; @@ -514,6 +517,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelids; Oid *relids; + update_replication_progress(ctx); + old = MemoryContextSwitchTo(data->context); relids = palloc0(nrelations * sizeof(Oid)); @@ -912,3 +917,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) entry->replicate_valid = false; } + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are not published. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx); + changes_count = 0; + } +} |