diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 52 |
1 files changed, 43 insertions, 9 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 449327a147f..ce6e62152f0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* set the reference to top-level transaction */ + subtxn->toptxn = txn; + /* add to subtransaction list */ dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; @@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, /* * Setup the invalidation of the toplevel transaction. * - * This needs to be done before ReorderBufferCommit is called! + * This needs to be called for each XLOG_XACT_INVALIDATIONS message and + * accumulates all the invalidation messages in the toplevel transaction. + * This is required because in some cases where we skip processing the + * transaction (see ReorderBufferForget), we need to execute all the + * invalidations together. */ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, @@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - if (txn->ninvalidations != 0) - elog(ERROR, "only ever add one set of invalidations"); + /* + * We collect all the invalidations under the top transaction so that we + * can execute them all together. + */ + if (txn->toptxn) + txn = txn->toptxn; Assert(nmsgs > 0); - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + MemoryContextAlloc(rb->context, + sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } /* @@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * Mark top-level transaction as having catalog changes too if one of its + * children has so that the ReorderBufferBuildTupleCidHash can + * conveniently check just top-level transaction and decide whether to + * build the hash table or not. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } /* |