diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b0ab91cc71b..2d9e1279bb2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -705,31 +705,35 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, toptxn = txn; /* - * Set the toast insert bit whenever we get toast insert to indicate a - * partial change and clear it when we get the insert or update on main - * table (Both update and insert will do the insert in the toast table). + * Indicate a partial change for toast inserts. The change will be + * considered as complete once we get the insert or update on the main + * table and we are sure that the pending toast chunks are not required + * anymore. + * + * If we allow streaming when there are pending toast chunks then such + * chunks won't be released till the insert (multi_insert) is complete and + * we expect the txn to have streamed all changes after streaming. This + * restriction is mainly to ensure the correctness of streamed + * transactions and it doesn't seem worth uplifting such a restriction + * just to allow this case because anyway we will stream the transaction + * once such an insert is complete. */ if (toast_insert) - toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT; - else if (rbtxn_has_toast_insert(toptxn) && - IsInsertOrUpdate(change->action)) - toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsInsertOrUpdate(change->action) && + change->data.tp.clear_toast_afterwards) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; /* - * Set the spec insert bit whenever we get the speculative insert to - * indicate the partial change and clear the same on speculative confirm. + * Indicate a partial change for speculative inserts. The change will be + * considered as complete once we get the speculative confirm token. */ if (IsSpecInsert(change->action)) - toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT; - else if (IsSpecConfirm(change->action)) - { - /* - * Speculative confirm change must be preceded by speculative - * insertion. - */ - Assert(rbtxn_has_spec_insert(toptxn)); - toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; - } + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsSpecConfirm(change->action)) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; /* * Stream the transaction if it is serialized before and the changes are @@ -741,7 +745,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, * changes. Delaying such transactions would increase apply lag for them. */ if (ReorderBufferCanStartStreaming(rb) && - !(rbtxn_has_incomplete_tuple(toptxn)) && + !(rbtxn_has_partial_change(toptxn)) && rbtxn_is_serialized(txn)) ReorderBufferStreamTXN(rb, toptxn); } @@ -3399,7 +3403,7 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) { largest = txn; largest_size = txn->total_size; |