diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 |
1 files changed, 27 insertions, 29 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e25ad672235..caaa59c7bc7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -199,6 +199,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) ResultRelInfo *resultRelInfo; RangeTblEntry *rte; + /* + * Input functions may need an active snapshot, as may AFTER triggers + * invoked during finish_estate. For safety, ensure an active snapshot + * exists throughout all our usage of the executor. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); @@ -224,6 +231,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) } /* + * Finish any operations related to the executor state created by + * create_estate_for_relation(). + */ +static void +finish_estate(EState *estate) +{ + /* Handle any queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + /* Cleanup. */ + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + PopActiveSnapshot(); +} + +/* * Executes default values for columns for which we can't map to remote * relation columns. * @@ -634,9 +657,6 @@ apply_handle_insert(StringInfo s) RelationGetDescr(rel->localrel), &TTSOpsVirtual); - /* Input functions may need an active snapshot, so get one */ - PushActiveSnapshot(GetTransactionSnapshot()); - /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, newtup.values); @@ -651,13 +671,7 @@ apply_handle_insert(StringInfo s) apply_handle_insert_internal(estate->es_result_relation_info, estate, remoteslot); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -778,8 +792,6 @@ apply_handle_update(StringInfo s) /* Also populate extraUpdatedCols, in case we have generated columns */ fill_extraUpdatedCols(target_rte, rel->localrel); - PushActiveSnapshot(GetTransactionSnapshot()); - /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, @@ -794,13 +806,7 @@ apply_handle_update(StringInfo s) apply_handle_update_internal(estate->es_result_relation_info, estate, remoteslot, &newtup, rel); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -902,8 +908,6 @@ apply_handle_delete(StringInfo s) RelationGetDescr(rel->localrel), &TTSOpsVirtual); - PushActiveSnapshot(GetTransactionSnapshot()); - /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, oldtup.values); @@ -917,13 +921,7 @@ apply_handle_delete(StringInfo s) apply_handle_delete_internal(estate->es_result_relation_info, estate, remoteslot, &rel->remoterel); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -1248,7 +1246,7 @@ apply_handle_truncate(StringInfo s) List *relids = NIL; List *relids_logged = NIL; ListCell *lc; - LOCKMODE lockmode = AccessExclusiveLock; + LOCKMODE lockmode = AccessExclusiveLock; ensure_transaction(); |