summaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-08-20 08:35:11 +0530
committerAmit Kapila <akapila@postgresql.org>2024-08-20 08:35:11 +0530
commit9758174e2e5cd278cf37e0980da76b51890e0011 (patch)
tree9ca019972be8f6b4b20acd98cdeb12a9475851e9 /src/backend/executor
parentadf97c1562380e02acd60dc859c289ed3a8352ee (diff)
Log the conflicts while applying changes in logical replication.
This patch provides the additional logging information in the following conflict scenarios while applying changes: insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint. update_differ: Updating a row that was previously modified by another origin. update_exists: The updated row value violates a NOT DEFERRABLE unique constraint. update_missing: The tuple to be updated is missing. delete_differ: Deleting a row that was previously modified by another origin. delete_missing: The tuple to be deleted is missing. For insert_exists and update_exists conflicts, the log can include the origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ and delete_differ conflicts can only be detected when track_commit_timestamp is enabled on the subscriber. We do not offer additional logging for exclusion constraint violations because these constraints can specify rules that are more complex than simple equality checks. Resolving such conflicts won't be straightforward. This area can be further enhanced if required. Author: Hou Zhijie Reviewed-by: Shveta Malik, Amit Kapila, Nisha Moond, Hayato Kuroda, Dilip Kumar Discussion: https://postgr.es/m/OS0PR01MB5716352552DFADB8E9AD1D8994C92@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execIndexing.c17
-rw-r--r--src/backend/executor/execMain.c7
-rw-r--r--src/backend/executor/execReplication.c236
-rw-r--r--src/backend/executor/nodeModifyTable.c5
4 files changed, 192 insertions, 73 deletions
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 9f05b3654c1..403a3f40551 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
ii = BuildIndexInfo(indexDesc);
/*
- * If the indexes are to be used for speculative insertion, add extra
- * information required by unique index entries.
+ * If the indexes are to be used for speculative insertion or conflict
+ * detection in logical replication, add extra information required by
+ * unique index entries.
*/
if (speculative && ii->ii_Unique)
BuildSpeculativeIndexInfo(indexDesc, ii);
@@ -519,14 +520,18 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
*
* Note that this doesn't lock the values in any way, so it's
* possible that a conflicting tuple is inserted immediately
- * after this returns. But this can be used for a pre-check
- * before insertion.
+ * after this returns. This can be used for either a pre-check
+ * before insertion or a re-check after finding a conflict.
+ *
+ * 'tupleid' should be the TID of the tuple that has been recently
+ * inserted (or can be invalid if we haven't inserted a new tuple yet).
+ * This tuple will be excluded from conflict checking.
* ----------------------------------------------------------------
*/
bool
ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate, ItemPointer conflictTid,
- List *arbiterIndexes)
+ ItemPointer tupleid, List *arbiterIndexes)
{
int i;
int numIndices;
@@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
satisfiesConstraint =
check_exclusion_or_unique_constraint(heapRelation, indexRelation,
- indexInfo, &invalidItemPtr,
+ indexInfo, tupleid,
values, isnull, estate, false,
CEOUC_WAIT, true,
conflictTid);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4d7c92d63c1..29e186fa73d 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -88,11 +88,6 @@ static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
AclMode requiredPerms);
static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
-static char *ExecBuildSlotValueDescription(Oid reloid,
- TupleTableSlot *slot,
- TupleDesc tupdesc,
- Bitmapset *modifiedCols,
- int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
/* end of local decls */
@@ -2210,7 +2205,7 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
* column involved, that subset will be returned with a key identifying which
* columns they are.
*/
-static char *
+char *
ExecBuildSlotValueDescription(Oid reloid,
TupleTableSlot *slot,
TupleDesc tupdesc,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd5778..1086cbc9624 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -23,6 +23,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
+#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
return skey_attoff;
}
+
+/*
+ * Helper function to check if it is necessary to re-fetch and lock the tuple
+ * due to concurrent modifications. This function should be called after
+ * invoking table_tuple_lock.
+ */
+static bool
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+{
+ bool refetch = false;
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ break;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ break;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+
+ return refetch;
+}
+
/*
* Search the relation 'rel' for tuple using the index.
*
@@ -260,34 +306,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
index_endscan(scan);
@@ -444,34 +464,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
table_endscan(scan);
@@ -481,6 +475,89 @@ retry:
}
/*
+ * Find the tuple that violates the passed unique index (conflictindex).
+ *
+ * If the conflicting tuple is found return true, otherwise false.
+ *
+ * We lock the tuple to avoid getting it deleted before the caller can fetch
+ * the required information. Note that if the tuple is deleted before a lock
+ * is acquired, we will retry to find the conflicting tuple again.
+ */
+static bool
+FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
+ Oid conflictindex, TupleTableSlot *slot,
+ TupleTableSlot **conflictslot)
+{
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointerData conflictTid;
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ *conflictslot = NULL;
+
+retry:
+ if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
+ &conflictTid, &slot->tts_tid,
+ list_make1_oid(conflictindex)))
+ {
+ if (*conflictslot)
+ ExecDropSingleTupleTableSlot(*conflictslot);
+
+ *conflictslot = NULL;
+ return false;
+ }
+
+ *conflictslot = table_slot_create(rel, NULL);
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
+ *conflictslot,
+ GetCurrentCommandId(false),
+ LockTupleShare,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
+
+ return true;
+}
+
+/*
+ * Check all the unique indexes in 'recheckIndexes' for conflict with the
+ * tuple in 'remoteslot' and report if found.
+ */
+static void
+CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
+ ConflictType type, List *recheckIndexes,
+ TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
+{
+ /* Check all the unique indexes for a conflict */
+ foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ {
+ TupleTableSlot *conflictslot;
+
+ if (list_member_oid(recheckIndexes, uniqueidx) &&
+ FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
+ &conflictslot))
+ {
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+ }
+}
+
+/*
* Insert tuple represented in the slot to the relation, update the indexes,
* and execute any constraints and per-row triggers.
*
@@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (!skip_tuple)
{
List *recheckIndexes = NIL;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
/* OK, store the tuple and create index entries for it */
simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, false, false,
- NULL, NIL, false);
+ slot, estate, false,
+ conflictindexes ? true : false,
+ &conflict,
+ conflictindexes, false);
+
+ /*
+ * Checks the conflict indexes to fetch the conflicting local tuple
+ * and reports the conflict. We perform this check here, instead of
+ * performing an additional index scan before the actual insertion and
+ * reporting the conflict if any conflicting tuples are found. This is
+ * to avoid the overhead of executing the extra scan for each INSERT
+ * operation, even when no conflict arises, which could introduce
+ * significant overhead to replication, particularly in cases where
+ * conflicts are rare.
+ *
+ * XXX OTOH, this could lead to clean-up effort for dead tuples added
+ * in heap and index in case of conflicts. But as conflicts shouldn't
+ * be a frequent thing so we preferred to save the performance
+ * overhead of extra scan before each insertion.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+ recheckIndexes, NULL, slot);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
{
List *recheckIndexes = NIL;
TU_UpdateIndexes update_indexes;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
&update_indexes);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, true, false,
- NULL, NIL,
+ slot, estate, true,
+ conflictindexes ? true : false,
+ &conflict, conflictindexes,
(update_indexes == TU_Summarizing));
+ /*
+ * Refer to the comments above the call to CheckAndReportConflict() in
+ * ExecSimpleRelationInsert to understand why this check is done at
+ * this point.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
+ recheckIndexes, searchslot, slot);
+
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo,
NULL, NULL,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4913e493199..8bf4c80d4a0 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context,
/* Perform a speculative insertion. */
uint32 specToken;
ItemPointerData conflictTid;
+ ItemPointerData invalidItemPtr;
bool specConflict;
List *arbiterIndexes;
+ ItemPointerSetInvalid(&invalidItemPtr);
arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes;
/*
@@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context,
CHECK_FOR_INTERRUPTS();
specConflict = false;
if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate,
- &conflictTid, arbiterIndexes))
+ &conflictTid, &invalidItemPtr,
+ arbiterIndexes))
{
/* committed conflict tuple found */
if (onconflict == ONCONFLICT_UPDATE)