summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c225
1 files changed, 126 insertions, 99 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f90a896fc3e..407eee3c0bc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -319,13 +319,13 @@ slot_store_error_callback(void *arg)
}
/*
- * Store data in C string form into slot.
- * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
- * use better.
+ * Store tuple data into slot.
+ *
+ * Incoming data can be either text or binary format.
*/
static void
-slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
- char **values)
+slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+ LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
@@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
- /* Call the "in" function for each non-dropped attribute */
+ /* Call the "in" function for each non-dropped, non-null attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
int remoteattnum = rel->attrmap->attnums[i];
- if (!att->attisdropped && remoteattnum >= 0 &&
- values[remoteattnum] != NULL)
+ if (!att->attisdropped && remoteattnum >= 0)
{
- Oid typinput;
- Oid typioparam;
+ StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
- getTypeInputInfo(att->atttypid, &typinput, &typioparam);
- slot->tts_values[i] =
- OidInputFunctionCall(typinput, values[remoteattnum],
- typioparam, att->atttypmod);
- slot->tts_isnull[i] = false;
+ if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, colvalue->data,
+ typioparam, att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+ {
+ Oid typreceive;
+ Oid typioparam;
+
+ /*
+ * In some code paths we may be asked to re-parse the same
+ * tuple data. Reset the StringInfo's cursor so that works.
+ */
+ colvalue->cursor = 0;
+
+ getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+ slot->tts_values[i] =
+ OidReceiveFunctionCall(typreceive, colvalue,
+ typioparam, att->atttypmod);
+
+ /* Trouble if it didn't eat the whole buffer */
+ if (colvalue->cursor != colvalue->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("incorrect binary data format in logical replication column %d",
+ remoteattnum + 1)));
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ /*
+ * NULL value from remote. (We don't expect to see
+ * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
+ * NULL.)
+ */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
@@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
else
{
/*
- * We assign NULL to dropped attributes, NULL values, and missing
- * values (missing values should be later filled using
+ * We assign NULL to dropped attributes and missing values
+ * (missing values should be later filled using
* slot_fill_defaults).
*/
slot->tts_values[i] = (Datum) 0;
@@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
}
/*
- * Replace selected columns with user data provided as C strings.
+ * Replace updated columns with data from the LogicalRepTupleData struct.
* This is somewhat similar to heap_modify_tuple but also calls the type
* input functions on the user data.
- * "slot" is filled with a copy of the tuple in "srcslot", with
- * columns selected by the "replaces" array replaced with data values
- * from "values".
+ *
+ * "slot" is filled with a copy of the tuple in "srcslot", replacing
+ * columns provided in "tupleData" and leaving others as-is.
+ *
* Caution: unreplaced pass-by-ref columns in "slot" will point into the
* storage for "srcslot". This is OK for current usage, but someday we may
* need to materialize "slot" at the end to make it independent of "srcslot".
*/
static void
-slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
- LogicalRepRelMapEntry *rel,
- char **values, bool *replaces)
+slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
+ LogicalRepRelMapEntry *rel,
+ LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
@@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
if (remoteattnum < 0)
continue;
- if (!replaces[remoteattnum])
- continue;
-
- if (values[remoteattnum] != NULL)
+ if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
{
- Oid typinput;
- Oid typioparam;
+ StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
- getTypeInputInfo(att->atttypid, &typinput, &typioparam);
- slot->tts_values[i] =
- OidInputFunctionCall(typinput, values[remoteattnum],
- typioparam, att->atttypmod);
- slot->tts_isnull[i] = false;
+ if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, colvalue->data,
+ typioparam, att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+ {
+ Oid typreceive;
+ Oid typioparam;
+
+ /*
+ * In some code paths we may be asked to re-parse the same
+ * tuple data. Reset the StringInfo's cursor so that works.
+ */
+ colvalue->cursor = 0;
+
+ getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+ slot->tts_values[i] =
+ OidReceiveFunctionCall(typreceive, colvalue,
+ typioparam, att->atttypmod);
+
+ /* Trouble if it didn't eat the whole buffer */
+ if (colvalue->cursor != colvalue->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("incorrect binary data format in logical replication column %d",
+ remoteattnum + 1)));
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ /* must be LOGICALREP_COLUMN_NULL */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
}
- else
- {
- slot->tts_values[i] = (Datum) 0;
- slot->tts_isnull[i] = true;
- }
}
/* Pop the error context stack */
@@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s)
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- slot_store_cstrings(remoteslot, rel, newtup.values);
+ slot_store_data(remoteslot, rel, &newtup);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
@@ -765,7 +831,7 @@ apply_handle_update(StringInfo s)
target_rte = list_nth(estate->es_range_table, 0);
for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
{
- if (newtup.changed[i])
+ if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED)
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
i + 1 - FirstLowInvalidHeapAttributeNumber);
}
@@ -776,8 +842,8 @@ apply_handle_update(StringInfo s)
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- slot_store_cstrings(remoteslot, rel,
- has_oldtup ? oldtup.values : newtup.values);
+ slot_store_data(remoteslot, rel,
+ has_oldtup ? &oldtup : &newtup);
MemoryContextSwitchTo(oldctx);
/* For a partitioned table, apply update to correct partition. */
@@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
{
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- slot_modify_cstrings(remoteslot, localslot, relmapentry,
- newtup->values, newtup->changed);
+ slot_modify_data(remoteslot, localslot, relmapentry, newtup);
MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot);
@@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s)
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- slot_store_cstrings(remoteslot, rel, oldtup.values);
+ slot_store_data(remoteslot, rel, &oldtup);
MemoryContextSwitchTo(oldctx);
/* For a partitioned table, apply delete to correct partition. */
@@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
if (found)
{
/* Apply the update. */
- slot_modify_cstrings(remoteslot_part, localslot,
- part_entry,
- newtup->values, newtup->changed);
+ slot_modify_data(remoteslot_part, localslot,
+ part_entry,
+ newtup);
MemoryContextSwitchTo(oldctx);
}
else
@@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s)
}
/*
- * Even if we used CASCADE on the upstream primary we explicitly default to
- * replaying changes without further cascading. This might be later
+ * Even if we used CASCADE on the upstream primary we explicitly default
+ * to replaying changes without further cascading. This might be later
* changeable with a user specified option.
*/
ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
@@ -1850,60 +1915,21 @@ maybe_reread_subscription(void)
proc_exit(0);
}
- /*
- * Exit if connection string was changed. The launcher will start new
- * worker.
- */
- if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
- {
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "restart because the connection information was changed",
- MySubscription->name)));
-
- proc_exit(0);
- }
-
- /*
- * Exit if subscription name was changed (it's used for
- * fallback_application_name). The launcher will start new worker.
- */
- if (strcmp(newsub->name, MySubscription->name) != 0)
- {
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "restart because subscription was renamed",
- MySubscription->name)));
-
- proc_exit(0);
- }
-
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
/*
- * We need to make new connection to new slot if slot name has changed so
- * exit here as well if that's the case.
- */
- if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
- {
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "restart because the replication slot name was changed",
- MySubscription->name)));
-
- proc_exit(0);
- }
-
- /*
- * Exit if publication list was changed. The launcher will start new
- * worker.
+ * Exit if any parameter that affects the remote connection was changed.
+ * The launcher will start a new worker.
*/
- if (!equal(newsub->publications, MySubscription->publications))
+ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
+ strcmp(newsub->name, MySubscription->name) != 0 ||
+ strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
+ newsub->binary != MySubscription->binary ||
+ !equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "restart because subscription's publications were changed",
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
MySubscription->name)));
proc_exit(0);
@@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg)
options.slotname = myslotname;
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
+ options.proto.logical.binary = MySubscription->binary;
/* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options);