summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
authorÁlvaro Herrera <alvherre@kurilemu.de>2025-11-06 20:33:57 +0100
committerÁlvaro Herrera <alvherre@kurilemu.de>2025-11-06 20:33:57 +0100
commita2b02293bc65dbb2401cb19c724f52c6ee0f2faf (patch)
treef982dafa5b106905027b0e11fe9cdee9fc0bab6f /src/backend/replication
parent06edbed478625829b19c35d0c17d805be588afa6 (diff)
Use XLogRecPtrIsValid() in various places
Now that commit 06edbed47862 has introduced XLogRecPtrIsValid(), we can use that instead of: - XLogRecPtrIsInvalid() - direct comparisons with InvalidXLogRecPtr - direct comparisons with literal 0 This makes the code more consistent. Author: Bertrand Drouvot <bertranddrouvot.pg@gmail.com> Discussion: https://postgr.es/m/aQB7EvGqrbZXrMlg@ip-10-97-1-34.eu-west-3.compute.internal
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c4
-rw-r--r--src/backend/replication/logical/launcher.c4
-rw-r--r--src/backend/replication/logical/logical.c32
-rw-r--r--src/backend/replication/logical/logicalfuncs.c6
-rw-r--r--src/backend/replication/logical/origin.c18
-rw-r--r--src/backend/replication/logical/proto.c18
-rw-r--r--src/backend/replication/logical/reorderbuffer.c38
-rw-r--r--src/backend/replication/logical/slotsync.c6
-rw-r--r--src/backend/replication/logical/snapbuild.c14
-rw-r--r--src/backend/replication/logical/worker.c16
-rw-r--r--src/backend/replication/slot.c28
-rw-r--r--src/backend/replication/slotfuncs.c22
-rw-r--r--src/backend/replication/syncrep.c10
-rw-r--r--src/backend/replication/walreceiver.c8
-rw-r--r--src/backend/replication/walreceiverfuncs.c2
-rw-r--r--src/backend/replication/walsender.c30
16 files changed, 128 insertions, 128 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 14325581afc..baa68c1ab6c 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -299,7 +299,7 @@ pa_can_start(void)
* STREAM START message, and it doesn't seem worth sending the extra eight
* bytes with the STREAM START to enable parallelism for this case.
*/
- if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
+ if (XLogRecPtrIsValid(MySubscription->skiplsn))
return false;
/*
@@ -1640,7 +1640,7 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
*/
pa_wait_for_xact_finish(winfo);
- if (!XLogRecPtrIsInvalid(remote_lsn))
+ if (XLogRecPtrIsValid(remote_lsn))
store_flush_position(remote_lsn, winfo->shared->last_commit_end);
pa_free_worker(winfo);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6c6d4015ba7..6214028eda9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1661,7 +1661,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
else
nulls[3] = true;
- if (XLogRecPtrIsInvalid(worker.last_lsn))
+ if (!XLogRecPtrIsValid(worker.last_lsn))
nulls[4] = true;
else
values[4] = LSNGetDatum(worker.last_lsn);
@@ -1673,7 +1673,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
nulls[6] = true;
else
values[6] = TimestampTzGetDatum(worker.last_recv_time);
- if (XLogRecPtrIsInvalid(worker.reply_lsn))
+ if (!XLogRecPtrIsValid(worker.reply_lsn))
nulls[7] = true;
else
values[7] = LSNGetDatum(worker.reply_lsn);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 93ed2eb368e..866f92cf799 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -388,7 +388,7 @@ CreateInitDecodingContext(const char *plugin,
slot->data.plugin = plugin_name;
SpinLockRelease(&slot->mutex);
- if (XLogRecPtrIsInvalid(restart_lsn))
+ if (!XLogRecPtrIsValid(restart_lsn))
ReplicationSlotReserveWal();
else
{
@@ -546,9 +546,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
/* slot must be valid to allow decoding */
Assert(slot->data.invalidated == RS_INVAL_NONE);
- Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(slot->data.restart_lsn));
- if (start_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(start_lsn))
{
/* continue from last position */
start_lsn = slot->data.confirmed_flush;
@@ -757,7 +757,7 @@ output_plugin_error_callback(void *arg)
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
/* not all callbacks have an associated LSN */
- if (state->report_location != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(state->report_location))
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
@@ -1711,7 +1711,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly.
*/
- else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
+ else if (!XLogRecPtrIsValid(slot->candidate_xmin_lsn))
{
slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = current_lsn;
@@ -1749,8 +1749,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
slot = MyReplicationSlot;
Assert(slot != NULL);
- Assert(restart_lsn != InvalidXLogRecPtr);
- Assert(current_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(restart_lsn));
+ Assert(XLogRecPtrIsValid(current_lsn));
SpinLockAcquire(&slot->mutex);
@@ -1779,7 +1779,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
* might never end up updating if the receiver acks too slowly. A missed
* value here will just cause some extra effort after reconnecting.
*/
- else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+ else if (!XLogRecPtrIsValid(slot->candidate_restart_valid))
{
slot->candidate_restart_valid = current_lsn;
slot->candidate_restart_lsn = restart_lsn;
@@ -1819,11 +1819,11 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
void
LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
- Assert(lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(lsn));
/* Do an unlocked check for candidate_lsn first. */
- if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
- MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) ||
+ XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid))
{
bool updated_xmin = false;
bool updated_restart = false;
@@ -1849,7 +1849,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
MyReplicationSlot->data.confirmed_flush = lsn;
/* if we're past the location required for bumping xmin, do so */
- if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) &&
MyReplicationSlot->candidate_xmin_lsn <= lsn)
{
/*
@@ -1871,10 +1871,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
}
}
- if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid) &&
MyReplicationSlot->candidate_restart_valid <= lsn)
{
- Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_lsn));
MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
@@ -2089,7 +2089,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
XLogRecPtr retlsn;
- Assert(moveto != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(moveto));
if (found_consistent_snapshot)
*found_consistent_snapshot = false;
@@ -2163,7 +2163,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
if (found_consistent_snapshot && DecodingContextReady(ctx))
*found_consistent_snapshot = true;
- if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(ctx->reader->EndRecPtr))
{
LogicalConfirmReceivedLocation(moveto);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 25f890ddeed..49b2aef3c74 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -229,7 +229,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* Wait for specified streaming replication standby servers (if any)
* to confirm receipt of WAL up to wait_for_wal_lsn.
*/
- if (XLogRecPtrIsInvalid(upto_lsn))
+ if (!XLogRecPtrIsValid(upto_lsn))
wait_for_wal_lsn = end_of_wal;
else
wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
@@ -276,7 +276,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
}
/* check limits */
- if (upto_lsn != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(upto_lsn) &&
upto_lsn <= ctx->reader->EndRecPtr)
break;
if (upto_nchanges != 0 &&
@@ -289,7 +289,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* Next time, start where we left off. (Hunting things, the family
* business..)
*/
- if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
+ if (XLogRecPtrIsValid(ctx->reader->EndRecPtr) && confirm)
{
LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index bcd5d9aad62..4632aa8115d 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -984,8 +984,8 @@ replorigin_advance(RepOriginId node,
/* initialize new slot */
LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
replication_state = free_state;
- Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
- Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
+ Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
replication_state->roident = node;
}
@@ -1020,7 +1020,7 @@ replorigin_advance(RepOriginId node,
*/
if (go_backward || replication_state->remote_lsn < remote_commit)
replication_state->remote_lsn = remote_commit;
- if (local_commit != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(local_commit) &&
(go_backward || replication_state->local_lsn < local_commit))
replication_state->local_lsn = local_commit;
LWLockRelease(&replication_state->lock);
@@ -1064,7 +1064,7 @@ replorigin_get_progress(RepOriginId node, bool flush)
LWLockRelease(ReplicationOriginLock);
- if (flush && local_lsn != InvalidXLogRecPtr)
+ if (flush && XLogRecPtrIsValid(local_lsn))
XLogFlush(local_lsn);
return remote_lsn;
@@ -1197,8 +1197,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
- Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
- Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
+ Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
+ Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
session_replication_state->roident = node;
}
@@ -1282,7 +1282,7 @@ replorigin_session_get_progress(bool flush)
local_lsn = session_replication_state->local_lsn;
LWLockRelease(&session_replication_state->lock);
- if (flush && local_lsn != InvalidXLogRecPtr)
+ if (flush && XLogRecPtrIsValid(local_lsn))
XLogFlush(local_lsn);
return remote_lsn;
@@ -1454,7 +1454,7 @@ pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
remote_lsn = replorigin_session_get_progress(flush);
- if (remote_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(remote_lsn))
PG_RETURN_NULL();
PG_RETURN_LSN(remote_lsn);
@@ -1543,7 +1543,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS)
remote_lsn = replorigin_get_progress(roident, flush);
- if (remote_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(remote_lsn))
PG_RETURN_NULL();
PG_RETURN_LSN(remote_lsn);
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ed62888764c..f0a913892b9 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -64,7 +64,7 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
{
/* read fields */
begin_data->final_lsn = pq_getmsgint64(in);
- if (begin_data->final_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(begin_data->final_lsn))
elog(ERROR, "final_lsn not set in begin message");
begin_data->committime = pq_getmsgint64(in);
begin_data->xid = pq_getmsgint(in, 4);
@@ -135,10 +135,10 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
{
/* read fields */
begin_data->prepare_lsn = pq_getmsgint64(in);
- if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
elog(ERROR, "prepare_lsn not set in begin prepare message");
begin_data->end_lsn = pq_getmsgint64(in);
- if (begin_data->end_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(begin_data->end_lsn))
elog(ERROR, "end_lsn not set in begin prepare message");
begin_data->prepare_time = pq_getmsgint64(in);
begin_data->xid = pq_getmsgint(in, 4);
@@ -207,10 +207,10 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype,
/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
- if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
prepare_data->end_lsn = pq_getmsgint64(in);
- if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(prepare_data->end_lsn))
elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
@@ -274,10 +274,10 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *
/* read fields */
prepare_data->commit_lsn = pq_getmsgint64(in);
- if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
elog(ERROR, "commit_lsn is not set in commit prepared message");
prepare_data->end_lsn = pq_getmsgint64(in);
- if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(prepare_data->end_lsn))
elog(ERROR, "end_lsn is not set in commit prepared message");
prepare_data->commit_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
@@ -333,10 +333,10 @@ logicalrep_read_rollback_prepared(StringInfo in,
/* read fields */
rollback_data->prepare_end_lsn = pq_getmsgint64(in);
- if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
rollback_data->rollback_end_lsn = pq_getmsgint64(in);
- if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
rollback_data->prepare_time = pq_getmsgint64(in);
rollback_data->rollback_time = pq_getmsgint64(in);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b57aef9916d..eb6a84554b7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -701,7 +701,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
{
/* initialize the new entry, if creation was requested */
Assert(ent != NULL);
- Assert(lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(lsn));
ent->txn = ReorderBufferAllocTXN(rb);
ent->txn->xid = xid;
@@ -849,7 +849,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
change->lsn = lsn;
change->txn = txn;
- Assert(InvalidXLogRecPtr != lsn);
+ Assert(XLogRecPtrIsValid(lsn));
dlist_push_tail(&txn->changes, &change->node);
txn->nentries++;
txn->nentries_mem++;
@@ -966,14 +966,14 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
iter.cur);
/* start LSN must be set */
- Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(cur_txn->first_lsn));
/* If there is an end LSN, it must be higher than start LSN */
- if (cur_txn->end_lsn != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(cur_txn->end_lsn))
Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
/* Current initial LSN must be strictly higher than previous */
- if (prev_first_lsn != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(prev_first_lsn))
Assert(prev_first_lsn < cur_txn->first_lsn);
/* known-as-subtxn txns must not be listed */
@@ -990,10 +990,10 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
/* base snapshot (and its LSN) must be set */
Assert(cur_txn->base_snapshot != NULL);
- Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(cur_txn->base_snapshot_lsn));
/* current LSN must be strictly higher than previous */
- if (prev_base_snap_lsn != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(prev_base_snap_lsn))
Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
/* known-as-subtxn txns must not be listed */
@@ -1022,11 +1022,11 @@ AssertChangeLsnOrder(ReorderBufferTXN *txn)
cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(cur_change->lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->first_lsn));
+ Assert(XLogRecPtrIsValid(cur_change->lsn));
Assert(txn->first_lsn <= cur_change->lsn);
- if (txn->end_lsn != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(txn->end_lsn))
Assert(cur_change->lsn <= txn->end_lsn);
Assert(prev_lsn <= cur_change->lsn);
@@ -1053,7 +1053,7 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
Assert(!rbtxn_is_known_subxact(txn));
- Assert(txn->first_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->first_lsn));
return txn;
}
@@ -2276,7 +2276,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* We can't call start stream callback before processing first
* change.
*/
- if (prev_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(prev_lsn))
{
if (streaming)
{
@@ -2291,7 +2291,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* subtransactions. The changes may have the same LSN due to
* MULTI_INSERT xlog records.
*/
- Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
+ Assert(!XLogRecPtrIsValid(prev_lsn) || prev_lsn <= change->lsn);
prev_lsn = change->lsn;
@@ -2975,7 +2975,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
* have been updated in it by now.
*/
Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->final_lsn));
txn->gid = pstrdup(gid);
@@ -3041,7 +3041,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
*/
Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
(RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
- Assert(txn->final_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->final_lsn));
/*
* By this time the txn has the prepare record information and it is
@@ -4552,8 +4552,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_mutable_iter cleanup_iter;
File *fd = &file->vfd;
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->first_lsn));
+ Assert(XLogRecPtrIsValid(txn->final_lsn));
/* free current entries, so we have memory for more */
dlist_foreach_modify(cleanup_iter, &txn->changes)
@@ -4860,8 +4860,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogSegNo cur;
XLogSegNo last;
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(txn->first_lsn));
+ Assert(XLogRecPtrIsValid(txn->final_lsn));
XLByteToSeg(txn->first_lsn, first, wal_segment_size);
XLByteToSeg(txn->final_lsn, last, wal_segment_size);
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index b122d99b009..8b4afd87dc9 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -493,7 +493,7 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL);
- Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
+ Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
while (true)
{
@@ -899,8 +899,8 @@ synchronize_slots(WalReceiverConn *wrconn)
* pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
* slots in the first place.
*/
- if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
- XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
+ if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) ||
+ !XLogRecPtrIsValid(remote_slot->confirmed_lsn) ||
!TransactionIdIsValid(remote_slot->catalog_xmin)) &&
remote_slot->invalidated == RS_INVAL_NONE)
pfree(remote_slot);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 98ddee20929..6e18baa33cb 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1210,7 +1210,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
* oldest ongoing txn might have started when we didn't yet serialize
* anything because we hadn't reached a consistent state yet.
*/
- if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+ if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
/*
@@ -1218,8 +1218,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
* we have one.
*/
else if (txn == NULL &&
- builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
- builder->last_serialized_snapshot != InvalidXLogRecPtr)
+ XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
+ XLogRecPtrIsValid(builder->last_serialized_snapshot))
LogicalIncreaseRestartDecodingForSlot(lsn,
builder->last_serialized_snapshot);
}
@@ -1293,7 +1293,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
*/
if (running->oldestRunningXid == running->nextXid)
{
- if (builder->start_decoding_at == InvalidXLogRecPtr ||
+ if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
builder->start_decoding_at <= lsn)
/* can decode everything after this */
builder->start_decoding_at = lsn + 1;
@@ -1509,8 +1509,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
struct stat stat_buf;
Size sz;
- Assert(lsn != InvalidXLogRecPtr);
- Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
+ Assert(XLogRecPtrIsValid(lsn));
+ Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
builder->last_serialized_snapshot <= lsn);
/*
@@ -2029,7 +2029,7 @@ CheckPointSnapBuild(void)
lsn = ((uint64) hi) << 32 | lo;
/* check whether we still need it */
- if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
+ if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
{
elog(DEBUG1, "removing snapbuild snapshot %s", path);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e1c757c911e..28f61f96a1a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -514,7 +514,7 @@ bool InitializingApplyWorker = false;
* by the user.
*/
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
-#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -4126,7 +4126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* due to a bug, we don't want to proceed as it can
* incorrectly advance oldest_nonremovable_xid.
*/
- if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+ if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
elog(ERROR, "cannot get the latest WAL position from the publisher");
maybe_advance_nonremovable_xid(&rdt_data, true);
@@ -4613,7 +4613,7 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
static void
wait_for_local_flush(RetainDeadTuplesData *rdt_data)
{
- Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+ Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
TransactionIdIsValid(rdt_data->candidate_xid));
/*
@@ -6031,7 +6031,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
* function is called for every remote transaction and we assume that
* skipping the transaction is not used often.
*/
- if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+ if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
MySubscription->skiplsn != finish_lsn))
return;
@@ -6077,7 +6077,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
XLogRecPtr myskiplsn = MySubscription->skiplsn;
bool started_tx = false;
- if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+ if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
return;
if (!IsTransactionState())
@@ -6173,7 +6173,7 @@ apply_error_callback(void *arg)
errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
errarg->origin_name,
logicalrep_message_type(errarg->command));
- else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ else if (!XLogRecPtrIsValid(errarg->finish_lsn))
errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
errarg->origin_name,
logicalrep_message_type(errarg->command),
@@ -6189,7 +6189,7 @@ apply_error_callback(void *arg)
{
if (errarg->remote_attnum < 0)
{
- if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ if (!XLogRecPtrIsValid(errarg->finish_lsn))
errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
errarg->origin_name,
logicalrep_message_type(errarg->command),
@@ -6207,7 +6207,7 @@ apply_error_callback(void *arg)
}
else
{
- if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ if (!XLogRecPtrIsValid(errarg->finish_lsn))
errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
errarg->origin_name,
logicalrep_message_type(errarg->command),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6363030808f..1ec1e997b27 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1270,15 +1270,15 @@ ReplicationSlotsComputeRequiredLSN(void)
*/
if (persistency == RS_PERSISTENT)
{
- if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
restart_lsn > last_saved_restart_lsn)
{
restart_lsn = last_saved_restart_lsn;
}
}
- if (restart_lsn != InvalidXLogRecPtr &&
- (min_required == InvalidXLogRecPtr ||
+ if (XLogRecPtrIsValid(restart_lsn) &&
+ (!XLogRecPtrIsValid(min_required) ||
restart_lsn < min_required))
min_required = restart_lsn;
}
@@ -1350,17 +1350,17 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
*/
if (persistency == RS_PERSISTENT)
{
- if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
restart_lsn > last_saved_restart_lsn)
{
restart_lsn = last_saved_restart_lsn;
}
}
- if (restart_lsn == InvalidXLogRecPtr)
+ if (!XLogRecPtrIsValid(restart_lsn))
continue;
- if (result == InvalidXLogRecPtr ||
+ if (!XLogRecPtrIsValid(result) ||
restart_lsn < result)
result = restart_lsn;
}
@@ -1573,8 +1573,8 @@ ReplicationSlotReserveWal(void)
ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL);
- Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
- Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
+ Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
+ Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
/*
* The replication slot mechanism is used to prevent removal of required
@@ -1730,7 +1730,7 @@ static inline bool
CanInvalidateIdleSlot(ReplicationSlot *s)
{
return (idle_replication_slot_timeout_secs != 0 &&
- !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
+ XLogRecPtrIsValid(s->data.restart_lsn) &&
s->inactive_since > 0 &&
!(RecoveryInProgress() && s->data.synced));
}
@@ -1754,7 +1754,7 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
{
XLogRecPtr restart_lsn = s->data.restart_lsn;
- if (restart_lsn != InvalidXLogRecPtr &&
+ if (XLogRecPtrIsValid(restart_lsn) &&
restart_lsn < oldestLSN)
return RS_INVAL_WAL_REMOVED;
}
@@ -2922,7 +2922,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
* Don't need to wait for the standbys to catch up if they are already
* beyond the specified WAL location.
*/
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
ss_oldest_flush_lsn >= wait_for_lsn)
return true;
@@ -2993,7 +2993,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
break;
}
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
{
/* Log a message if no active_pid for this physical slot */
if (inactive)
@@ -3012,7 +3012,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Assert(restart_lsn >= wait_for_lsn);
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ if (!XLogRecPtrIsValid(min_restart_lsn) ||
min_restart_lsn > restart_lsn)
min_restart_lsn = restart_lsn;
@@ -3031,7 +3031,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return false;
/* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
min_restart_lsn >= ss_oldest_flush_lsn);
ss_oldest_flush_lsn = min_restart_lsn;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index b8f21153e7b..0478fc9c977 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -46,7 +46,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
if (immediately_reserve)
{
/* Reserve WAL as the user asked for it */
- if (XLogRecPtrIsInvalid(restart_lsn))
+ if (!XLogRecPtrIsValid(restart_lsn))
ReplicationSlotReserveWal();
else
MyReplicationSlot->data.restart_lsn = restart_lsn;
@@ -308,12 +308,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;
- if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
else
nulls[i++] = true;
- if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+ if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
else
nulls[i++] = true;
@@ -357,7 +357,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
*
* If we do change it, save the state for safe_wal_size below.
*/
- if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
{
int pid;
@@ -407,7 +407,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
if (slot_contents.data.two_phase &&
- !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
+ XLogRecPtrIsValid(slot_contents.data.two_phase_at))
values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
else
nulls[i++] = true;
@@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
XLogRecPtr retlsn = startlsn;
- Assert(moveto != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(moveto));
if (startlsn < moveto)
{
@@ -523,7 +523,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
CheckSlotPermissions();
- if (XLogRecPtrIsInvalid(moveto))
+ if (!XLogRecPtrIsValid(moveto))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid target WAL LSN")));
@@ -545,7 +545,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
ReplicationSlotAcquire(NameStr(*slotname), true, true);
/* A slot whose restart_lsn has never been reserved cannot be advanced */
- if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+ if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot \"%s\" cannot be advanced",
@@ -679,7 +679,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
NameStr(*src_name))));
/* Copying non-reserved slot doesn't make sense */
- if (XLogRecPtrIsInvalid(src_restart_lsn))
+ if (!XLogRecPtrIsValid(src_restart_lsn))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot copy a replication slot that doesn't reserve WAL")));
@@ -785,7 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
errdetail("The source replication slot was modified incompatibly during the copy operation.")));
/* The source slot must have a consistent snapshot */
- if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+ if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy unfinished logical replication slot \"%s\"",
@@ -840,7 +840,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
/* All done. Set up the return values */
values[0] = NameGetDatum(dst_name);
nulls[0] = false;
- if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
+ if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
{
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
nulls[1] = false;
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 32cf3a48b89..a0c79958fd5 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -493,7 +493,7 @@ SyncRepReleaseWaiters(void)
if (MyWalSnd->sync_standby_priority == 0 ||
(MyWalSnd->state != WALSNDSTATE_STREAMING &&
MyWalSnd->state != WALSNDSTATE_STOPPING) ||
- XLogRecPtrIsInvalid(MyWalSnd->flush))
+ !XLogRecPtrIsValid(MyWalSnd->flush))
{
announce_next_takeover = true;
return;
@@ -676,11 +676,11 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr flush = sync_standbys[i].flush;
XLogRecPtr apply = sync_standbys[i].apply;
- if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
+ if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
*writePtr = write;
- if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
+ if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
*flushPtr = flush;
- if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
+ if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
*applyPtr = apply;
}
}
@@ -799,7 +799,7 @@ SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(stby->flush))
+ if (!XLogRecPtrIsValid(stby->flush))
continue;
/* OK, it's a candidate */
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7361ffc9dcf..2ee8fecee26 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1469,16 +1469,16 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
{
values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
- if (XLogRecPtrIsInvalid(receive_start_lsn))
+ if (!XLogRecPtrIsValid(receive_start_lsn))
nulls[2] = true;
else
values[2] = LSNGetDatum(receive_start_lsn);
values[3] = Int32GetDatum(receive_start_tli);
- if (XLogRecPtrIsInvalid(written_lsn))
+ if (!XLogRecPtrIsValid(written_lsn))
nulls[4] = true;
else
values[4] = LSNGetDatum(written_lsn);
- if (XLogRecPtrIsInvalid(flushed_lsn))
+ if (!XLogRecPtrIsValid(flushed_lsn))
nulls[5] = true;
else
values[5] = LSNGetDatum(flushed_lsn);
@@ -1491,7 +1491,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
nulls[8] = true;
else
values[8] = TimestampTzGetDatum(last_receipt_time);
- if (XLogRecPtrIsInvalid(latest_end_lsn))
+ if (!XLogRecPtrIsValid(latest_end_lsn))
nulls[9] = true;
else
values[9] = LSNGetDatum(latest_end_lsn);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 6a693d854c4..822645748a7 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -315,7 +315,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
* If this is the first startup of walreceiver (on this timeline),
* initialize flushedUpto and latestChunkStart to the starting point.
*/
- if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
+ if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
{
walrcv->flushedUpto = recptr;
walrcv->receivedTLI = tli;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 548eafa7a73..fc8f8559073 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -529,7 +529,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
i++;
/* start LSN */
- if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
{
char xloc[64];
@@ -541,7 +541,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
i++;
/* timeline this WAL was produced on */
- if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
{
TimeLineID slots_position_timeline;
TimeLineID current_timeline;
@@ -906,7 +906,7 @@ StartReplication(StartReplicationCmd *cmd)
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/
- if (!XLogRecPtrIsInvalid(switchpoint) &&
+ if (XLogRecPtrIsValid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
@@ -1827,7 +1827,7 @@ WalSndWaitForWal(XLogRecPtr loc)
* receipt of WAL up to RecentFlushPtr. This is particularly interesting
* if we're far behind.
*/
- if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
+ if (XLogRecPtrIsValid(RecentFlushPtr) &&
!NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
return RecentFlushPtr;
@@ -2397,7 +2397,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
bool changed = false;
ReplicationSlot *slot = MyReplicationSlot;
- Assert(lsn != InvalidXLogRecPtr);
+ Assert(XLogRecPtrIsValid(lsn));
SpinLockAcquire(&slot->mutex);
if (slot->data.restart_lsn != lsn)
{
@@ -2519,7 +2519,7 @@ ProcessStandbyReplyMessage(void)
/*
* Advance our local xmin horizon when the client confirmed a flush.
*/
- if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+ if (MyReplicationSlot && XLogRecPtrIsValid(flushPtr))
{
if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr);
@@ -3536,7 +3536,7 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr ||
+ if (!XLogRecPtrIsValid(flushPtr) ||
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
{
/*
@@ -3597,8 +3597,8 @@ WalSndDone(WalSndSendDataCallback send_data)
* flush location if valid, write otherwise. Tools like pg_receivewal will
* usually (unless in synchronous mode) return an invalid flush location.
*/
- replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
- MyWalSnd->write : MyWalSnd->flush;
+ replicatedPtr = XLogRecPtrIsValid(MyWalSnd->flush) ?
+ MyWalSnd->flush : MyWalSnd->write;
if (WalSndCaughtUp && sentPtr == replicatedPtr &&
!pq_is_send_pending())
@@ -4073,19 +4073,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
- if (XLogRecPtrIsInvalid(sent_ptr))
+ if (!XLogRecPtrIsValid(sent_ptr))
nulls[2] = true;
values[2] = LSNGetDatum(sent_ptr);
- if (XLogRecPtrIsInvalid(write))
+ if (!XLogRecPtrIsValid(write))
nulls[3] = true;
values[3] = LSNGetDatum(write);
- if (XLogRecPtrIsInvalid(flush))
+ if (!XLogRecPtrIsValid(flush))
nulls[4] = true;
values[4] = LSNGetDatum(flush);
- if (XLogRecPtrIsInvalid(apply))
+ if (!XLogRecPtrIsValid(apply))
nulls[5] = true;
values[5] = LSNGetDatum(apply);
@@ -4094,7 +4094,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
- priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
+ priority = XLogRecPtrIsValid(flush) ? priority : 0;
if (writeLag < 0)
nulls[6] = true;
@@ -4165,7 +4165,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, PqReplMsg_Keepalive);
- pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
+ pq_sendint64(&output_message, XLogRecPtrIsValid(writePtr) ? writePtr : sentPtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);