summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b0ab91cc71b..2d9e1279bb2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -705,31 +705,35 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
toptxn = txn;
/*
- * Set the toast insert bit whenever we get toast insert to indicate a
- * partial change and clear it when we get the insert or update on main
- * table (Both update and insert will do the insert in the toast table).
+ * Indicate a partial change for toast inserts. The change will be
+ * considered as complete once we get the insert or update on the main
+ * table and we are sure that the pending toast chunks are not required
+ * anymore.
+ *
+ * If we allow streaming when there are pending toast chunks then such
+ * chunks won't be released till the insert (multi_insert) is complete and
+ * we expect the txn to have streamed all changes after streaming. This
+ * restriction is mainly to ensure the correctness of streamed
+ * transactions and it doesn't seem worth uplifting such a restriction
+ * just to allow this case because anyway we will stream the transaction
+ * once such an insert is complete.
*/
if (toast_insert)
- toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
- else if (rbtxn_has_toast_insert(toptxn) &&
- IsInsertOrUpdate(change->action))
- toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
+ toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
+ else if (rbtxn_has_partial_change(toptxn) &&
+ IsInsertOrUpdate(change->action) &&
+ change->data.tp.clear_toast_afterwards)
+ toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
/*
- * Set the spec insert bit whenever we get the speculative insert to
- * indicate the partial change and clear the same on speculative confirm.
+ * Indicate a partial change for speculative inserts. The change will be
+ * considered as complete once we get the speculative confirm token.
*/
if (IsSpecInsert(change->action))
- toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
- else if (IsSpecConfirm(change->action))
- {
- /*
- * Speculative confirm change must be preceded by speculative
- * insertion.
- */
- Assert(rbtxn_has_spec_insert(toptxn));
- toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
- }
+ toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
+ else if (rbtxn_has_partial_change(toptxn) &&
+ IsSpecConfirm(change->action))
+ toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
/*
* Stream the transaction if it is serialized before and the changes are
@@ -741,7 +745,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
* changes. Delaying such transactions would increase apply lag for them.
*/
if (ReorderBufferCanStartStreaming(rb) &&
- !(rbtxn_has_incomplete_tuple(toptxn)) &&
+ !(rbtxn_has_partial_change(toptxn)) &&
rbtxn_is_serialized(txn))
ReorderBufferStreamTXN(rb, toptxn);
}
@@ -3399,7 +3403,7 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
Assert(txn->base_snapshot != NULL);
if ((largest == NULL || txn->total_size > largest_size) &&
- (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
+ (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
{
largest = txn;
largest_size = txn->total_size;