diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 115 |
1 files changed, 93 insertions, 22 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 245e9be6f27..cdea6295d8a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -167,6 +167,7 @@ #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "postmaster/walwriter.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" @@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata, EState *estate = edata->estate; /* We must open indexes here. */ - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, true); + InitConflictIndexes(relinfo); /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); @@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata, MemoryContext oldctx; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(relinfo, false); + ExecOpenIndices(relinfo, true); found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, localindexoid, remoteslot, &localslot); - ExecClearTuple(remoteslot); /* * Tuple found. @@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * Report the conflict if the tuple was modified by a different + * origin. + */ + if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + { + TupleTableSlot *newslot; + + /* Store the new tuple for conflict reporting */ + newslot = table_slot_create(localrel, &estate->es_tupleTable); + slot_store_data(newslot, relmapentry, newtup); + + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER, + remoteslot, localslot, newslot, + InvalidOid, localxmin, localorigin, localts); + } + /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_modify_data(remoteslot, localslot, relmapentry, newtup); @@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, EvalPlanQualSetSlot(&epqstate, remoteslot); + InitConflictIndexes(relinfo); + /* Do the actual update. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, @@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + TupleTableSlot *newslot = localslot; + + /* Store the new tuple for conflict reporting */ + slot_store_data(newslot, relmapentry, newtup); + /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, + remoteslot, NULL, newslot, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0); } /* Cleanup. */ @@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * Report the conflict if the tuple was modified by a different + * origin. + */ + if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER, + remoteslot, localslot, NULL, + InvalidOid, localxmin, localorigin, localts); + EvalPlanQualSetSlot(&epqstate, localslot); /* Do the actual delete. */ @@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be deleted " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, + remoteslot, NULL, NULL, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0); } /* Cleanup. */ @@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3023,20 +3066,44 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + TupleTableSlot *newslot = localslot; + + /* Store the new tuple for conflict reporting */ + slot_store_data(newslot, part_entry, newtup); + /* * The tuple to be updated could not be found. Do nothing * except for emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation's partition \"%s\"", - RelationGetRelationName(partrel)); + ReportApplyConflict(estate, partrelinfo, + LOG, CT_UPDATE_MISSING, + remoteslot_part, NULL, newslot, + InvalidOid, InvalidTransactionId, + InvalidRepOriginId, 0); + return; } /* + * Report the conflict if the tuple was modified by a + * different origin. + */ + if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + { + TupleTableSlot *newslot; + + /* Store the new tuple for conflict reporting */ + newslot = table_slot_create(partrel, &estate->es_tupleTable); + slot_store_data(newslot, part_entry, newtup); + + ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER, + remoteslot_part, localslot, newslot, + InvalidOid, localxmin, localorigin, + localts); + } + + /* * Apply the update to the local tuple, putting the result in * remoteslot_part. */ @@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, MemoryContextSwitchTo(oldctx); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(partrelinfo, false); /* * Does the updated tuple still satisfy the current @@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * work already done above to find the local tuple in the * partition. */ + ExecOpenIndices(partrelinfo, true); + InitConflictIndexes(partrelinfo); + EvalPlanQualSetSlot(&epqstate, remoteslot_part); TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_UPDATE); @@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, get_namespace_name(RelationGetNamespace(partrel_new)), RelationGetRelationName(partrel_new)); + ExecOpenIndices(partrelinfo, false); + /* DELETE old tuple found in the old partition. */ EvalPlanQualSetSlot(&epqstate, localslot); TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE); |