summaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c29
1 files changed, 24 insertions, 5 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 92eb17049c3..847806b0a2e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -235,6 +235,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static void init_tuple_slot(PGOutputData *data, Relation relation,
RelationSyncEntry *entry);
+static void pgoutput_memory_context_reset(void *arg);
/* row filter routines */
static EState *create_estate_for_relation(Relation rel);
@@ -427,6 +428,19 @@ parse_output_parameters(List *options, PGOutputData *data)
}
/*
+ * Memory context reset callback of PGOutputData->context.
+ */
+static void
+pgoutput_memory_context_reset(void *arg)
+{
+ if (RelationSyncCache)
+ {
+ hash_destroy(RelationSyncCache);
+ RelationSyncCache = NULL;
+ }
+}
+
+/*
* Initialize this plugin
*/
static void
@@ -435,6 +449,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
{
PGOutputData *data = palloc0(sizeof(PGOutputData));
static bool publication_callback_registered = false;
+ MemoryContextCallback *mcallback;
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
@@ -449,6 +464,14 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
"logical replication publication list context",
ALLOCSET_SMALL_SIZES);
+ /*
+ * Ensure to cleanup RelationSyncCache even when logical decoding invoked
+ * via SQL interface ends up with an error.
+ */
+ mcallback = palloc0(sizeof(MemoryContextCallback));
+ mcallback->func = pgoutput_memory_context_reset;
+ MemoryContextRegisterResetCallback(ctx->context, mcallback);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
@@ -1760,11 +1783,7 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
{
- if (RelationSyncCache)
- {
- hash_destroy(RelationSyncCache);
- RelationSyncCache = NULL;
- }
+ pgoutput_memory_context_reset(NULL);
}
/*