From bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 16 Feb 2011 19:29:37 +0000 Subject: Hot Standby feedback for avoidance of cleanup conflicts on standby. Standby optionally sends back information about oldestXmin of queries which is then checked and applied to the WALSender's proc->xmin. GetOldestXmin() is modified slightly to agree with GetSnapshotData(), so that all backends on primary include WALSender within their snapshots. Note this does nothing to change the snapshot xmin on either master or standby. Feedback piggybacks on the standby reply message. vacuum_defer_cleanup_age is no longer used on standby, though parameter still exists on primary, since some use cases still exist. Simon Riggs, review comments from Fujii Masao, Heikki Linnakangas, Robert Haas --- src/backend/replication/walreceiver.c | 36 ++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) (limited to 'src/backend/replication/walreceiver.c') diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1e5247f12a..ee09468db17 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include #include +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -45,6 +46,7 @@ #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -56,6 +58,7 @@ bool am_walreceiver; /* GUC variable */ int wal_receiver_status_interval; +bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; @@ -610,16 +613,43 @@ XLogWalRcvSendReply(void) wal_receiver_status_interval * 1000)) return; - /* Construct a new message. */ + /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + /* + * Get the OldestXmin and its associated epoch + */ + if (hot_standby_feedback && HotStandbyActive()) + { + TransactionId nextXid; + uint32 nextEpoch; + + reply_message.xmin = GetOldestXmin(true, false); + + /* + * Get epoch and adjust if nextXid and oldestXmin are different + * sides of the epoch boundary. + */ + GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (nextXid < reply_message.xmin) + nextEpoch--; + reply_message.epoch = nextEpoch; + } + else + { + reply_message.xmin = InvalidTransactionId; + reply_message.epoch = 0; + } + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff); + reply_message.apply.xlogid, reply_message.apply.xrecoff, + reply_message.xmin, + reply_message.epoch); /* Prepend with the message type and send it. */ buf[0] = 'r'; -- cgit v1.2.3