diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 64 |
1 files changed, 56 insertions, 8 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 56c25e3a6da..fa9413fa2a0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage *msgs) { ReorderBufferTXN *txn; + MemoryContext oldcontext; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - if (txn->ninvalidations != 0) - elog(ERROR, "only ever add one set of invalidations"); + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction, if available, + * so that we can execute them all together. + */ + if (txn->toplevel_xid) + { + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn, + true); + } Assert(nmsgs > 0); - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } + + MemoryContextSwitchTo(oldcontext); } /* @@ -3895,3 +3920,26 @@ restart: *cmax = ent->cmax; return true; } + +/* + * Count invalidation messages of specified transaction. + * + * Returns number of messages, and msgs is set to the pointer of the linked + * list for the messages. + */ +uint32 +ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, + SharedInvalidationMessage **msgs) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + if (txn == NULL) + return 0; + + *msgs = txn->invalidations; + + return txn->ninvalidations; +} |