diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e46a62e1990..39bfb6d39c9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int local_attnum; + int remote_attnum; } SlotErrCallbackArg; static MemoryContext ApplyMessageContext = NULL; @@ -282,19 +283,29 @@ static void slot_store_error_callback(void *arg) { SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; + LogicalRepRelMapEntry *rel; + char *remotetypname; Oid remotetypoid, localtypoid; - if (errarg->attnum < 0) + /* Nothing to do if remote attribute number is not set */ + if (errarg->remote_attnum < 0) return; - remotetypoid = errarg->rel->atttyps[errarg->attnum]; - localtypoid = logicalrep_typmap_getid(remotetypoid); + rel = errarg->rel; + remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum]; + + /* Fetch remote type name from the LogicalRepTypMap cache */ + remotetypname = logicalrep_typmap_gettypname(remotetypoid); + + /* Fetch local type OID from the local sys cache */ + localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1); + errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", " "remote type %s, local type %s", - errarg->rel->nspname, errarg->rel->relname, - errarg->rel->attnames[errarg->attnum], - format_type_be(remotetypoid), + rel->remoterel.nspname, rel->remoterel.relname, + rel->remoterel.attnames[errarg->remote_attnum], + remotetypname, format_type_be(localtypoid)); } @@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ExecClearTuple(slot); /* Push callback + info on the error context stack */ - errarg.rel = &rel->remoterel; - errarg.attnum = -1; + errarg.rel = rel; + errarg.local_attnum = -1; + errarg.remote_attnum = -1; errcallback.callback = slot_store_error_callback; errcallback.arg = (void *) &errarg; errcallback.previous = error_context_stack; @@ -334,14 +346,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, Oid typinput; Oid typioparam; - errarg.attnum = remoteattnum; + errarg.local_attnum = i; + errarg.remote_attnum = remoteattnum; getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = OidInputFunctionCall(typinput, - values[remoteattnum], - typioparam, - att->atttypmod); + slot->tts_values[i] = + OidInputFunctionCall(typinput, values[remoteattnum], + typioparam, att->atttypmod); slot->tts_isnull[i] = false; + + errarg.local_attnum = -1; + errarg.remote_attnum = -1; } else { @@ -380,8 +395,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ExecClearTuple(slot); /* Push callback + info on the error context stack */ - errarg.rel = &rel->remoterel; - errarg.attnum = -1; + errarg.rel = rel; + errarg.local_attnum = -1; + errarg.remote_attnum = -1; errcallback.callback = slot_store_error_callback; errcallback.arg = (void *) &errarg; errcallback.previous = error_context_stack; @@ -404,14 +420,17 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, Oid typinput; Oid typioparam; - errarg.attnum = remoteattnum; + errarg.local_attnum = i; + errarg.remote_attnum = remoteattnum; getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = OidInputFunctionCall(typinput, - values[remoteattnum], - typioparam, - att->atttypmod); + slot->tts_values[i] = + OidInputFunctionCall(typinput, values[remoteattnum], + typioparam, att->atttypmod); slot->tts_isnull[i] = false; + + errarg.local_attnum = -1; + errarg.remote_attnum = -1; } else { |