diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 225 |
1 files changed, 126 insertions, 99 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f90a896fc3e..407eee3c0bc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,13 +319,13 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store tuple data into slot. + * + * Incoming data can be either text or binary format. */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, errcallback.previous = error_context_stack; error_context_stack = &errcallback; - /* Call the "in" function for each non-dropped attribute */ + /* Call the "in" function for each non-dropped, non-null attribute */ Assert(natts == rel->attrmap->maplen); for (i = 0; i < natts; i++) { Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); int remoteattnum = rel->attrmap->attnums[i]; - if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + if (!att->attisdropped && remoteattnum >= 0) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* + * NULL value from remote. (We don't expect to see + * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as + * NULL.) + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; @@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, else { /* - * We assign NULL to dropped attributes, NULL values, and missing - * values (missing values should be later filled using + * We assign NULL to dropped attributes and missing values + * (missing values should be later filled using * slot_fill_defaults). */ slot->tts_values[i] = (Datum) 0; @@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } /* - * Replace selected columns with user data provided as C strings. + * Replace updated columns with data from the LogicalRepTupleData struct. * This is somewhat similar to heap_modify_tuple but also calls the type * input functions on the user data. - * "slot" is filled with a copy of the tuple in "srcslot", with - * columns selected by the "replaces" array replaced with data values - * from "values". + * + * "slot" is filled with a copy of the tuple in "srcslot", replacing + * columns provided in "tupleData" and leaving others as-is. + * * Caution: unreplaced pass-by-ref columns in "slot" will point into the * storage for "srcslot". This is OK for current usage, but someday we may * need to materialize "slot" at the end to make it independent of "srcslot". */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) - continue; - - if (values[remoteattnum] != NULL) + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* must be LOGICALREP_COLUMN_NULL */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; } - else - { - slot->tts_values[i] = (Datum) 0; - slot->tts_isnull[i] = true; - } } /* Pop the error context stack */ @@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -765,7 +831,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -776,8 +842,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply update to correct partition. */ @@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, relmapentry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply delete to correct partition. */ @@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, if (found) { /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, - part_entry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot_part, localslot, + part_entry, + newtup); MemoryContextSwitchTo(oldctx); } else @@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s) } /* - * Even if we used CASCADE on the upstream primary we explicitly default to - * replaying changes without further cascading. This might be later + * Even if we used CASCADE on the upstream primary we explicitly default + * to replaying changes without further cascading. This might be later * changeable with a user specified option. */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); @@ -1850,60 +1915,21 @@ maybe_reread_subscription(void) proc_exit(0); } - /* - * Exit if connection string was changed. The launcher will start new - * worker. - */ - if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if subscription name was changed (it's used for - * fallback_application_name). The launcher will start new worker. - */ - if (strcmp(newsub->name, MySubscription->name) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); - - proc_exit(0); - } - /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); /* - * We need to make new connection to new slot if slot name has changed so - * exit here as well if that's the case. - */ - if (strcmp(newsub->slotname, MySubscription->slotname) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if publication list was changed. The launcher will start new - * worker. + * Exit if any parameter that affects the remote connection was changed. + * The launcher will start a new worker. */ - if (!equal(newsub->publications, MySubscription->publications)) + if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || + strcmp(newsub->name, MySubscription->name) != 0 || + strcmp(newsub->slotname, MySubscription->slotname) != 0 || + newsub->binary != MySubscription->binary || + !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription's publications were changed", + (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); proc_exit(0); @@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); |