diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3876c0188df..e95398db05a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -122,8 +122,8 @@ typedef enum WalRcvWakeupReason WALRCV_WAKEUP_TERMINATE, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, - WALRCV_WAKEUP_HSFEEDBACK, - NUM_WALRCV_WAKEUPS + WALRCV_WAKEUP_HSFEEDBACK +#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1) } WalRcvWakeupReason; /* @@ -206,8 +206,6 @@ WalReceiverMain(void) */ Assert(walrcv != NULL); - now = GetCurrentTimestamp(); - /* * Mark walreceiver as running in shared memory. * @@ -261,6 +259,7 @@ WalReceiverMain(void) Assert(!is_temp_slot || (slotname[0] == '\0')); /* Initialise to a sanish value */ + now = GetCurrentTimestamp(); walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; @@ -464,6 +463,7 @@ WalReceiverMain(void) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + /* recompute wakeup times */ now = GetCurrentTimestamp(); for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) WalRcvComputeNextWakeup(i, now); @@ -472,7 +472,6 @@ WalReceiverMain(void) /* See if we can read data immediately */ len = walrcv_receive(wrconn, &buf, &wait_fd); - now = GetCurrentTimestamp(); if (len != 0) { /* @@ -487,6 +486,7 @@ WalReceiverMain(void) * Something was received from primary, so adjust * the ping and terminate wakeup times. */ + now = GetCurrentTimestamp(); WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE, now); WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now); @@ -506,7 +506,6 @@ WalReceiverMain(void) break; } len = walrcv_receive(wrconn, &buf, &wait_fd); - now = GetCurrentTimestamp(); } /* Let the primary know that we received some data. */ @@ -525,7 +524,7 @@ WalReceiverMain(void) break; /* Find the soonest wakeup time, to limit our nap. */ - nextWakeup = PG_INT64_MAX; + nextWakeup = TIMESTAMP_INFINITY; for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) nextWakeup = Min(wakeup[i], nextWakeup); @@ -536,6 +535,7 @@ WalReceiverMain(void) * millisecond to avoid waking up too early and spinning until * one of the wakeup times. */ + now = GetCurrentTimestamp(); nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); /* @@ -556,7 +556,6 @@ WalReceiverMain(void) wait_fd, nap, WAIT_EVENT_WAL_RECEIVER_MAIN); - now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -592,19 +591,20 @@ WalReceiverMain(void) * Check if time since last receive from primary has * reached the configured limit. */ + now = GetCurrentTimestamp(); if (now >= wakeup[WALRCV_WAKEUP_TERMINATE]) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout"))); /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. + * If we didn't receive anything new for half of receiver + * replication timeout, then ping the server. */ if (now >= wakeup[WALRCV_WAKEUP_PING]) { requestReply = true; - wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX; + wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY; } XLogWalRcvSendReply(requestReply, requestReply); @@ -1266,7 +1266,6 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; - TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); /* Update shared-memory status */ @@ -1310,7 +1309,10 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) /* * Compute the next wakeup time for a given wakeup reason. Can be called to * initialize a wakeup time, to adjust it for the next wakeup, or to - * reinitialize it when GUCs have changed. + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. */ static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) @@ -1319,30 +1321,29 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) { case WALRCV_WAKEUP_TERMINATE: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); break; case WALRCV_WAKEUP_PING: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); break; case WALRCV_WAKEUP_HSFEEDBACK: if (!hot_standby_feedback || wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; case WALRCV_WAKEUP_REPLY: if (wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); - break; - default: + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; + /* there's intentionally no default: here */ } } |