summaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c49
1 files changed, 42 insertions, 7 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index c5fbebf55ab..d9e45bab4ad 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,10 @@ static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *c
/*
* Entry in the map used to remember which relation schemas we sent.
*
+ * The schema_sent flag determines if the current schema record for the
+ * relation (and for its ancestor if publish_as_relid is set) was already sent
+ * to the subscriber (in which case we don't need to send it again).
+ *
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
@@ -64,10 +68,6 @@ typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
- /*
- * Did we send the schema? If ancestor relid is set, its schema must also
- * have been sent for this to be true.
- */
bool schema_sent;
bool replicate_valid;
@@ -292,10 +292,17 @@ static void
maybe_send_schema(LogicalDecodingContext *ctx,
Relation relation, RelationSyncEntry *relentry)
{
+ /* Nothing to do if we already sent the schema. */
if (relentry->schema_sent)
return;
- /* If needed, send the ancestor's schema first. */
+ /*
+ * Nope, so send the schema. If the changes will be published using an
+ * ancestor's schema, not the relation's own, send that ancestor's schema
+ * before sending relation's own (XXX - maybe sending only the former
+ * suffices?). This is also a good place to set the map that will be used
+ * to convert the relation's tuples into the ancestor's format, if needed.
+ */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
@@ -305,8 +312,21 @@ maybe_send_schema(LogicalDecodingContext *ctx,
/* Map must live as long as the session does. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
- CreateTupleDescCopy(outdesc));
+
+ /*
+ * Make copies of the TupleDescs that will live as long as the map
+ * does before putting into the map.
+ */
+ indesc = CreateTupleDescCopy(indesc);
+ outdesc = CreateTupleDescCopy(outdesc);
+ relentry->map = convert_tuples_by_name(indesc, outdesc);
+ if (relentry->map == NULL)
+ {
+ /* Map not necessary, so free the TupleDescs too. */
+ FreeTupleDesc(indesc);
+ FreeTupleDesc(outdesc);
+ }
+
MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, ctx);
RelationClose(ancestor);
@@ -759,6 +779,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
list_free(pubids);
entry->publish_as_relid = publish_as_relid;
+ entry->map = NULL; /* will be set by maybe_send_schema() if needed */
entry->replicate_valid = true;
}
@@ -801,9 +822,23 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
/*
* Reset schema sent status as the relation definition may have changed.
+ * Also, free any objects that depended on the earlier definition.
*/
if (entry != NULL)
+ {
entry->schema_sent = false;
+ if (entry->map)
+ {
+ /*
+ * Must free the TupleDescs contained in the map explicitly,
+ * because free_conversion_map() doesn't.
+ */
+ FreeTupleDesc(entry->map->indesc);
+ FreeTupleDesc(entry->map->outdesc);
+ free_conversion_map(entry->map);
+ }
+ entry->map = NULL;
+ }
}
/*