diff options
author | Michael Paquier <michael@paquier.xyz> | 2018-07-12 10:20:27 +0900 |
---|---|---|
committer | Michael Paquier <michael@paquier.xyz> | 2018-07-12 10:20:27 +0900 |
commit | 98e2c298c2f5fef158a6bf0d543a84098671ae7d (patch) | |
tree | 96e20b332f1a91d5dde6df9bac38776b80c3bdec | |
parent | d80ec868fa3aa61a8f2566ec0244c45a87a62e20 (diff) |
Make logical WAL sender report streaming state appropriately
WAL senders sending logically-decoded data fail to properly report in
"streaming" state when starting up, hence as long as one extra record is
not replayed, such WAL senders would remain in a "catchup" state, which
is inconsistent with the physical cousin.
This can be easily reproduced by for example using pg_recvlogical and
restarting the upstream server. The TAP tests have been slightly
modified to detect the failure and strengthened so as future tests also
make sure that a node is in streaming state when waiting for its
catchup.
Backpatch down to 9.4 where this code has been introduced.
Reported-by: Sawada Masahiko
Author: Simon Riggs, Sawada Masahiko
Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran
Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
-rw-r--r-- | src/backend/replication/walsender.c | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3a06a4a307a..8f720521abf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1914,8 +1914,8 @@ WalSndLoop(WalSndSendDataCallback send_data) if (MyWalSnd->state == WALSNDSTATE_CATCHUP) { ereport(DEBUG1, - (errmsg("standby \"%s\" has now caught up with primary", - application_name))); + (errmsg("\"%s\" has now caught up with upstream server", + application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } @@ -2482,10 +2482,10 @@ XLogSendLogical(void) char *errm; /* - * Don't know whether we've caught up yet. We'll set it to true in - * WalSndWaitForWal, if we're actually waiting. We also set to true if - * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - - * i.e. when we're shutting down. + * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to + * true in WalSndWaitForWal, if we're actually waiting. We also set to + * true if XLogReadRecord() had to stop reading but WalSndWaitForWal + * didn't wait - i.e. when we're shutting down. */ WalSndCaughtUp = false; @@ -2498,9 +2498,19 @@ XLogSendLogical(void) if (record != NULL) { + /* XXX: Note that logical decoding cannot be used while in recovery */ + XLogRecPtr flushPtr = GetFlushRecPtr(); + LogicalDecodingProcessRecord(logical_decoding_ctx, record); sentPtr = logical_decoding_ctx->reader->EndRecPtr; + + /* + * If we have sent a record that is at or beyond the flushed point, we + * have caught up. + */ + if (sentPtr >= flushPtr) + WalSndCaughtUp = true; } else { |