summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-08-20 08:35:11 +0530
committerAmit Kapila <akapila@postgresql.org>2024-08-20 08:35:11 +0530
commit9758174e2e5cd278cf37e0980da76b51890e0011 (patch)
tree9ca019972be8f6b4b20acd98cdeb12a9475851e9 /src/backend/replication/logical/worker.c
parentadf97c1562380e02acd60dc859c289ed3a8352ee (diff)
Log the conflicts while applying changes in logical replication.
This patch provides the additional logging information in the following conflict scenarios while applying changes: insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint. update_differ: Updating a row that was previously modified by another origin. update_exists: The updated row value violates a NOT DEFERRABLE unique constraint. update_missing: The tuple to be updated is missing. delete_differ: Deleting a row that was previously modified by another origin. delete_missing: The tuple to be deleted is missing. For insert_exists and update_exists conflicts, the log can include the origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ and delete_differ conflicts can only be detected when track_commit_timestamp is enabled on the subscriber. We do not offer additional logging for exclusion constraint violations because these constraints can specify rules that are more complex than simple equality checks. Resolving such conflicts won't be straightforward. This area can be further enhanced if required. Author: Hou Zhijie Reviewed-by: Shveta Malik, Amit Kapila, Nisha Moond, Hayato Kuroda, Dilip Kumar Discussion: https://postgr.es/m/OS0PR01MB5716352552DFADB8E9AD1D8994C92@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c115
1 files changed, 93 insertions, 22 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 245e9be6f27..cdea6295d8a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -167,6 +167,7 @@
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
+#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
@@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
EState *estate = edata->estate;
/* We must open indexes here. */
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
+ InitConflictIndexes(relinfo);
/* Do the insert. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
@@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata,
MemoryContext oldctx;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
found = FindReplTupleInLocalRel(edata, localrel,
&relmapentry->remoterel,
localindexoid,
remoteslot, &localslot);
- ExecClearTuple(remoteslot);
/*
* Tuple found.
@@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(localrel, &estate->es_tupleTable);
+ slot_store_data(newslot, relmapentry, newtup);
+
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot, localslot, newslot,
+ InvalidOid, localxmin, localorigin, localts);
+ }
+
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
@@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, remoteslot);
+ InitConflictIndexes(relinfo);
+
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
@@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
else
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, relmapentry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
+ remoteslot, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER,
+ remoteslot, localslot, NULL,
+ InvalidOid, localxmin, localorigin, localts);
+
EvalPlanQualSetSlot(&epqstate, localslot);
/* Do the actual delete. */
@@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
* The tuple to be deleted could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be deleted "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
+ remoteslot, NULL, NULL,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3023,20 +3066,44 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part, &localslot);
if (!found)
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, part_entry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation's partition \"%s\"",
- RelationGetRelationName(partrel));
+ ReportApplyConflict(estate, partrelinfo,
+ LOG, CT_UPDATE_MISSING,
+ remoteslot_part, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
+
return;
}
/*
+ * Report the conflict if the tuple was modified by a
+ * different origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(partrel, &estate->es_tupleTable);
+ slot_store_data(newslot, part_entry, newtup);
+
+ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot_part, localslot, newslot,
+ InvalidOid, localxmin, localorigin,
+ localts);
+ }
+
+ /*
* Apply the update to the local tuple, putting the result in
* remoteslot_part.
*/
@@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
MemoryContextSwitchTo(oldctx);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(partrelinfo, false);
/*
* Does the updated tuple still satisfy the current
@@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* work already done above to find the local tuple in the
* partition.
*/
+ ExecOpenIndices(partrelinfo, true);
+ InitConflictIndexes(partrelinfo);
+
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
@@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
get_namespace_name(RelationGetNamespace(partrel_new)),
RelationGetRelationName(partrel_new));
+ ExecOpenIndices(partrelinfo, false);
+
/* DELETE old tuple found in the old partition. */
EvalPlanQualSetSlot(&epqstate, localslot);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);