summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c52
1 files changed, 43 insertions, 9 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 449327a147f..ce6e62152f0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
subtxn->toplevel_xid = xid;
Assert(subtxn->nsubtxns == 0);
+ /* set the reference to top-level transaction */
+ subtxn->toptxn = txn;
+
/* add to subtransaction list */
dlist_push_tail(&txn->subtxns, &subtxn->node);
txn->nsubtxns++;
@@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
/*
* Setup the invalidation of the toplevel transaction.
*
- * This needs to be done before ReorderBufferCommit is called!
+ * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
+ * accumulates all the invalidation messages in the toplevel transaction.
+ * This is required because in some cases where we skip processing the
+ * transaction (see ReorderBufferForget), we need to execute all the
+ * invalidations together.
*/
void
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- if (txn->ninvalidations != 0)
- elog(ERROR, "only ever add one set of invalidations");
+ /*
+ * We collect all the invalidations under the top transaction so that we
+ * can execute them all together.
+ */
+ if (txn->toptxn)
+ txn = txn->toptxn;
Assert(nmsgs > 0);
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ /* Accumulate invalidations. */
+ if (txn->ninvalidations == 0)
+ {
+ txn->ninvalidations = nmsgs;
+ txn->invalidations = (SharedInvalidationMessage *)
+ MemoryContextAlloc(rb->context,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(txn->invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ }
+ else
+ {
+ txn->invalidations = (SharedInvalidationMessage *)
+ repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+ (txn->ninvalidations + nmsgs));
+
+ memcpy(txn->invalidations + txn->ninvalidations, msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ txn->ninvalidations += nmsgs;
+ }
}
/*
@@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+
+ /*
+ * Mark top-level transaction as having catalog changes too if one of its
+ * children has so that the ReorderBufferBuildTupleCidHash can
+ * conveniently check just top-level transaction and decide whether to
+ * build the hash table or not.
+ */
+ if (txn->toptxn != NULL)
+ txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
}
/*