summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c18
1 files changed, 5 insertions, 13 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eb7db89cef7..cfc924cd893 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
/* We must be in a valid transaction state */
Assert(IsTransactionState());
- /* The synchronization worker runs in single transaction. */
- if (!am_tablesync_worker())
- {
- /* Commit the per-stream transaction */
- CommitTransactionCommand();
- }
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
in_streamed_transaction = false;
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
/* Cleanup the subxact info */
cleanup_subxact_info();
- /* The synchronization worker runs in single transaction */
- if (!am_tablesync_worker())
- CommitTransactionCommand();
+ CommitTransactionCommand();
return;
}
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
/* write the updated subxact list */
subxact_info_write(MyLogicalRepWorker->subid, xid);
- if (!am_tablesync_worker())
- CommitTransactionCommand();
+ CommitTransactionCommand();
}
}
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
static void
apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
{
- /* The synchronization worker runs in single transaction. */
- if (IsTransactionState() && !am_tablesync_worker())
+ if (IsTransactionState())
{
/*
* Update origin state so we can restart streaming from correct