summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c64
1 files changed, 56 insertions, 8 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 56c25e3a6da..fa9413fa2a0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
SharedInvalidationMessage *msgs)
{
ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- if (txn->ninvalidations != 0)
- elog(ERROR, "only ever add one set of invalidations");
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together.
+ */
+ if (txn->toplevel_xid)
+ {
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
+ true);
+ }
Assert(nmsgs > 0);
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ /* Accumulate invalidations. */
+ if (txn->ninvalidations == 0)
+ {
+ txn->ninvalidations = nmsgs;
+ txn->invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(txn->invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ }
+ else
+ {
+ txn->invalidations = (SharedInvalidationMessage *)
+ repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+ (txn->ninvalidations + nmsgs));
+
+ memcpy(txn->invalidations + txn->ninvalidations, msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ txn->ninvalidations += nmsgs;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
}
/*
@@ -3895,3 +3920,26 @@ restart:
*cmax = ent->cmax;
return true;
}
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+ SharedInvalidationMessage **msgs)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ if (txn == NULL)
+ return 0;
+
+ *msgs = txn->invalidations;
+
+ return txn->ninvalidations;
+}