diff options
Diffstat (limited to 'src/backend/replication/logical')
| -rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 4 | ||||
| -rw-r--r-- | src/backend/replication/logical/launcher.c | 4 | ||||
| -rw-r--r-- | src/backend/replication/logical/logical.c | 32 | ||||
| -rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 6 | ||||
| -rw-r--r-- | src/backend/replication/logical/origin.c | 18 | ||||
| -rw-r--r-- | src/backend/replication/logical/proto.c | 18 | ||||
| -rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 38 | ||||
| -rw-r--r-- | src/backend/replication/logical/slotsync.c | 6 | ||||
| -rw-r--r-- | src/backend/replication/logical/snapbuild.c | 14 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 16 |
10 files changed, 78 insertions, 78 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 14325581afc..baa68c1ab6c 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -299,7 +299,7 @@ pa_can_start(void) * STREAM START message, and it doesn't seem worth sending the extra eight * bytes with the STREAM START to enable parallelism for this case. */ - if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) + if (XLogRecPtrIsValid(MySubscription->skiplsn)) return false; /* @@ -1640,7 +1640,7 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) */ pa_wait_for_xact_finish(winfo); - if (!XLogRecPtrIsInvalid(remote_lsn)) + if (XLogRecPtrIsValid(remote_lsn)) store_flush_position(remote_lsn, winfo->shared->last_commit_end); pa_free_worker(winfo); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6c6d4015ba7..6214028eda9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1661,7 +1661,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else nulls[3] = true; - if (XLogRecPtrIsInvalid(worker.last_lsn)) + if (!XLogRecPtrIsValid(worker.last_lsn)) nulls[4] = true; else values[4] = LSNGetDatum(worker.last_lsn); @@ -1673,7 +1673,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) nulls[6] = true; else values[6] = TimestampTzGetDatum(worker.last_recv_time); - if (XLogRecPtrIsInvalid(worker.reply_lsn)) + if (!XLogRecPtrIsValid(worker.reply_lsn)) nulls[7] = true; else values[7] = LSNGetDatum(worker.reply_lsn); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 93ed2eb368e..866f92cf799 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -388,7 +388,7 @@ CreateInitDecodingContext(const char *plugin, slot->data.plugin = plugin_name; SpinLockRelease(&slot->mutex); - if (XLogRecPtrIsInvalid(restart_lsn)) + if (!XLogRecPtrIsValid(restart_lsn)) ReplicationSlotReserveWal(); else { @@ -546,9 +546,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, /* slot must be valid to allow decoding */ Assert(slot->data.invalidated == RS_INVAL_NONE); - Assert(slot->data.restart_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(slot->data.restart_lsn)); - if (start_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(start_lsn)) { /* continue from last position */ start_lsn = slot->data.confirmed_flush; @@ -757,7 +757,7 @@ output_plugin_error_callback(void *arg) LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg; /* not all callbacks have an associated LSN */ - if (state->report_location != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(state->report_location)) errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X", NameStr(state->ctx->slot->data.name), NameStr(state->ctx->slot->data.plugin), @@ -1711,7 +1711,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. */ - else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) + else if (!XLogRecPtrIsValid(slot->candidate_xmin_lsn)) { slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -1749,8 +1749,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart slot = MyReplicationSlot; Assert(slot != NULL); - Assert(restart_lsn != InvalidXLogRecPtr); - Assert(current_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(restart_lsn)); + Assert(XLogRecPtrIsValid(current_lsn)); SpinLockAcquire(&slot->mutex); @@ -1779,7 +1779,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart * might never end up updating if the receiver acks too slowly. A missed * value here will just cause some extra effort after reconnecting. */ - else if (slot->candidate_restart_valid == InvalidXLogRecPtr) + else if (!XLogRecPtrIsValid(slot->candidate_restart_valid)) { slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; @@ -1819,11 +1819,11 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart void LogicalConfirmReceivedLocation(XLogRecPtr lsn) { - Assert(lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(lsn)); /* Do an unlocked check for candidate_lsn first. */ - if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr || - MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) || + XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid)) { bool updated_xmin = false; bool updated_restart = false; @@ -1849,7 +1849,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ - if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) && MyReplicationSlot->candidate_xmin_lsn <= lsn) { /* @@ -1871,10 +1871,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) } } - if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid) && MyReplicationSlot->candidate_restart_valid <= lsn) { - Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_lsn)); MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; @@ -2089,7 +2089,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner; XLogRecPtr retlsn; - Assert(moveto != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(moveto)); if (found_consistent_snapshot) *found_consistent_snapshot = false; @@ -2163,7 +2163,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, if (found_consistent_snapshot && DecodingContextReady(ctx)) *found_consistent_snapshot = true; - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(ctx->reader->EndRecPtr)) { LogicalConfirmReceivedLocation(moveto); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 25f890ddeed..49b2aef3c74 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -229,7 +229,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * Wait for specified streaming replication standby servers (if any) * to confirm receipt of WAL up to wait_for_wal_lsn. */ - if (XLogRecPtrIsInvalid(upto_lsn)) + if (!XLogRecPtrIsValid(upto_lsn)) wait_for_wal_lsn = end_of_wal; else wait_for_wal_lsn = Min(upto_lsn, end_of_wal); @@ -276,7 +276,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin } /* check limits */ - if (upto_lsn != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(upto_lsn) && upto_lsn <= ctx->reader->EndRecPtr) break; if (upto_nchanges != 0 && @@ -289,7 +289,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * Next time, start where we left off. (Hunting things, the family * business..) */ - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) + if (XLogRecPtrIsValid(ctx->reader->EndRecPtr) && confirm) { LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index bcd5d9aad62..4632aa8115d 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -984,8 +984,8 @@ replorigin_advance(RepOriginId node, /* initialize new slot */ LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); replication_state = free_state; - Assert(replication_state->remote_lsn == InvalidXLogRecPtr); - Assert(replication_state->local_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(replication_state->remote_lsn)); + Assert(!XLogRecPtrIsValid(replication_state->local_lsn)); replication_state->roident = node; } @@ -1020,7 +1020,7 @@ replorigin_advance(RepOriginId node, */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; - if (local_commit != InvalidXLogRecPtr && + if (XLogRecPtrIsValid(local_commit) && (go_backward || replication_state->local_lsn < local_commit)) replication_state->local_lsn = local_commit; LWLockRelease(&replication_state->lock); @@ -1064,7 +1064,7 @@ replorigin_get_progress(RepOriginId node, bool flush) LWLockRelease(ReplicationOriginLock); - if (flush && local_lsn != InvalidXLogRecPtr) + if (flush && XLogRecPtrIsValid(local_lsn)) XLogFlush(local_lsn); return remote_lsn; @@ -1197,8 +1197,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by) /* initialize new slot */ session_replication_state = &replication_states[free_slot]; - Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); - Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); + Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn)); + Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn)); session_replication_state->roident = node; } @@ -1282,7 +1282,7 @@ replorigin_session_get_progress(bool flush) local_lsn = session_replication_state->local_lsn; LWLockRelease(&session_replication_state->lock); - if (flush && local_lsn != InvalidXLogRecPtr) + if (flush && XLogRecPtrIsValid(local_lsn)) XLogFlush(local_lsn); return remote_lsn; @@ -1454,7 +1454,7 @@ pg_replication_origin_session_progress(PG_FUNCTION_ARGS) remote_lsn = replorigin_session_get_progress(flush); - if (remote_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(remote_lsn)) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); @@ -1543,7 +1543,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS) remote_lsn = replorigin_get_progress(roident, flush); - if (remote_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(remote_lsn)) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index ed62888764c..f0a913892b9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -64,7 +64,7 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) { /* read fields */ begin_data->final_lsn = pq_getmsgint64(in); - if (begin_data->final_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->final_lsn)) elog(ERROR, "final_lsn not set in begin message"); begin_data->committime = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -135,10 +135,10 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da { /* read fields */ begin_data->prepare_lsn = pq_getmsgint64(in); - if (begin_data->prepare_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->prepare_lsn)) elog(ERROR, "prepare_lsn not set in begin prepare message"); begin_data->end_lsn = pq_getmsgint64(in); - if (begin_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(begin_data->end_lsn)) elog(ERROR, "end_lsn not set in begin prepare message"); begin_data->prepare_time = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -207,10 +207,10 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype, /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); - if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->prepare_lsn)) elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -274,10 +274,10 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * /* read fields */ prepare_data->commit_lsn = pq_getmsgint64(in); - if (prepare_data->commit_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->commit_lsn)) elog(ERROR, "commit_lsn is not set in commit prepared message"); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in commit prepared message"); prepare_data->commit_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -333,10 +333,10 @@ logicalrep_read_rollback_prepared(StringInfo in, /* read fields */ rollback_data->prepare_end_lsn = pq_getmsgint64(in); - if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn)) elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); rollback_data->rollback_end_lsn = pq_getmsgint64(in); - if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn)) elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); rollback_data->prepare_time = pq_getmsgint64(in); rollback_data->rollback_time = pq_getmsgint64(in); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b57aef9916d..eb6a84554b7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -701,7 +701,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); - Assert(lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(lsn)); ent->txn = ReorderBufferAllocTXN(rb); ent->txn->xid = xid; @@ -849,7 +849,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, change->lsn = lsn; change->txn = txn; - Assert(InvalidXLogRecPtr != lsn); + Assert(XLogRecPtrIsValid(lsn)); dlist_push_tail(&txn->changes, &change->node); txn->nentries++; txn->nentries_mem++; @@ -966,14 +966,14 @@ AssertTXNLsnOrder(ReorderBuffer *rb) iter.cur); /* start LSN must be set */ - Assert(cur_txn->first_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(cur_txn->first_lsn)); /* If there is an end LSN, it must be higher than start LSN */ - if (cur_txn->end_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(cur_txn->end_lsn)) Assert(cur_txn->first_lsn <= cur_txn->end_lsn); /* Current initial LSN must be strictly higher than previous */ - if (prev_first_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(prev_first_lsn)) Assert(prev_first_lsn < cur_txn->first_lsn); /* known-as-subtxn txns must not be listed */ @@ -990,10 +990,10 @@ AssertTXNLsnOrder(ReorderBuffer *rb) /* base snapshot (and its LSN) must be set */ Assert(cur_txn->base_snapshot != NULL); - Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(cur_txn->base_snapshot_lsn)); /* current LSN must be strictly higher than previous */ - if (prev_base_snap_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(prev_base_snap_lsn)) Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); /* known-as-subtxn txns must not be listed */ @@ -1022,11 +1022,11 @@ AssertChangeLsnOrder(ReorderBufferTXN *txn) cur_change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(cur_change->lsn)); Assert(txn->first_lsn <= cur_change->lsn); - if (txn->end_lsn != InvalidXLogRecPtr) + if (XLogRecPtrIsValid(txn->end_lsn)) Assert(cur_change->lsn <= txn->end_lsn); Assert(prev_lsn <= cur_change->lsn); @@ -1053,7 +1053,7 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); Assert(!rbtxn_is_known_subxact(txn)); - Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); return txn; } @@ -2276,7 +2276,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * We can't call start stream callback before processing first * change. */ - if (prev_lsn == InvalidXLogRecPtr) + if (!XLogRecPtrIsValid(prev_lsn)) { if (streaming) { @@ -2291,7 +2291,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * subtransactions. The changes may have the same LSN due to * MULTI_INSERT xlog records. */ - Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + Assert(!XLogRecPtrIsValid(prev_lsn) || prev_lsn <= change->lsn); prev_lsn = change->lsn; @@ -2975,7 +2975,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * have been updated in it by now. */ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->final_lsn)); txn->gid = pstrdup(gid); @@ -3041,7 +3041,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, */ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE)); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->final_lsn)); /* * By this time the txn has the prepare record information and it is @@ -4552,8 +4552,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_mutable_iter cleanup_iter; File *fd = &file->vfd; - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(txn->final_lsn)); /* free current entries, so we have memory for more */ dlist_foreach_modify(cleanup_iter, &txn->changes) @@ -4860,8 +4860,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo cur; XLogSegNo last; - Assert(txn->first_lsn != InvalidXLogRecPtr); - Assert(txn->final_lsn != InvalidXLogRecPtr); + Assert(XLogRecPtrIsValid(txn->first_lsn)); + Assert(XLogRecPtrIsValid(txn->final_lsn)); XLByteToSeg(txn->first_lsn, first, wal_segment_size); XLByteToSeg(txn->final_lsn, last, wal_segment_size); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index b122d99b009..8b4afd87dc9 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -493,7 +493,7 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) ReplicationSlot *slot = MyReplicationSlot; Assert(slot != NULL); - Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn)); + Assert(!XLogRecPtrIsValid(slot->data.restart_lsn)); while (true) { @@ -899,8 +899,8 @@ synchronize_slots(WalReceiverConn *wrconn) * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL * slots in the first place. */ - if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) || - XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) || + !XLogRecPtrIsValid(remote_slot->confirmed_lsn) || !TransactionIdIsValid(remote_slot->catalog_xmin)) && remote_slot->invalidated == RS_INVAL_NONE) pfree(remote_slot); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 98ddee20929..6e18baa33cb 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1210,7 +1210,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * oldest ongoing txn might have started when we didn't yet serialize * anything because we hadn't reached a consistent state yet. */ - if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) + if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn)) LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); /* @@ -1218,8 +1218,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * we have one. */ else if (txn == NULL && - builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && - builder->last_serialized_snapshot != InvalidXLogRecPtr) + XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) && + XLogRecPtrIsValid(builder->last_serialized_snapshot)) LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); } @@ -1293,7 +1293,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn */ if (running->oldestRunningXid == running->nextXid) { - if (builder->start_decoding_at == InvalidXLogRecPtr || + if (!XLogRecPtrIsValid(builder->start_decoding_at) || builder->start_decoding_at <= lsn) /* can decode everything after this */ builder->start_decoding_at = lsn + 1; @@ -1509,8 +1509,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) struct stat stat_buf; Size sz; - Assert(lsn != InvalidXLogRecPtr); - Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || + Assert(XLogRecPtrIsValid(lsn)); + Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) || builder->last_serialized_snapshot <= lsn); /* @@ -2029,7 +2029,7 @@ CheckPointSnapBuild(void) lsn = ((uint64) hi) << 32 | lo; /* check whether we still need it */ - if (lsn < cutoff || cutoff == InvalidXLogRecPtr) + if (lsn < cutoff || !XLogRecPtrIsValid(cutoff)) { elog(DEBUG1, "removing snapbuild snapshot %s", path); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e1c757c911e..28f61f96a1a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -514,7 +514,7 @@ bool InitializingApplyWorker = false; * by the user. */ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; -#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) +#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn))) /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -4126,7 +4126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * due to a bug, we don't want to proceed as it can * incorrectly advance oldest_nonremovable_xid. */ - if (XLogRecPtrIsInvalid(rdt_data.remote_lsn)) + if (!XLogRecPtrIsValid(rdt_data.remote_lsn)) elog(ERROR, "cannot get the latest WAL position from the publisher"); maybe_advance_nonremovable_xid(&rdt_data, true); @@ -4613,7 +4613,7 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, static void wait_for_local_flush(RetainDeadTuplesData *rdt_data) { - Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) && + Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) && TransactionIdIsValid(rdt_data->candidate_xid)); /* @@ -6031,7 +6031,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn) * function is called for every remote transaction and we assume that * skipping the transaction is not used often. */ - if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || + if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) || MySubscription->skiplsn != finish_lsn)) return; @@ -6077,7 +6077,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) XLogRecPtr myskiplsn = MySubscription->skiplsn; bool started_tx = false; - if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker()) + if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker()) return; if (!IsTransactionState()) @@ -6173,7 +6173,7 @@ apply_error_callback(void *arg) errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"", errarg->origin_name, logicalrep_message_type(errarg->command)); - else if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + else if (!XLogRecPtrIsValid(errarg->finish_lsn)) errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u", errarg->origin_name, logicalrep_message_type(errarg->command), @@ -6189,7 +6189,7 @@ apply_error_callback(void *arg) { if (errarg->remote_attnum < 0) { - if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + if (!XLogRecPtrIsValid(errarg->finish_lsn)) errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u", errarg->origin_name, logicalrep_message_type(errarg->command), @@ -6207,7 +6207,7 @@ apply_error_callback(void *arg) } else { - if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + if (!XLogRecPtrIsValid(errarg->finish_lsn)) errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", errarg->origin_name, logicalrep_message_type(errarg->command), |
