summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c115
1 files changed, 93 insertions, 22 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 245e9be6f27..cdea6295d8a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -167,6 +167,7 @@
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
+#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
@@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
EState *estate = edata->estate;
/* We must open indexes here. */
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
+ InitConflictIndexes(relinfo);
/* Do the insert. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
@@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata,
MemoryContext oldctx;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
found = FindReplTupleInLocalRel(edata, localrel,
&relmapentry->remoterel,
localindexoid,
remoteslot, &localslot);
- ExecClearTuple(remoteslot);
/*
* Tuple found.
@@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(localrel, &estate->es_tupleTable);
+ slot_store_data(newslot, relmapentry, newtup);
+
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot, localslot, newslot,
+ InvalidOid, localxmin, localorigin, localts);
+ }
+
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
@@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, remoteslot);
+ InitConflictIndexes(relinfo);
+
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
@@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
else
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, relmapentry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
+ remoteslot, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER,
+ remoteslot, localslot, NULL,
+ InvalidOid, localxmin, localorigin, localts);
+
EvalPlanQualSetSlot(&epqstate, localslot);
/* Do the actual delete. */
@@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
* The tuple to be deleted could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be deleted "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
+ remoteslot, NULL, NULL,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3023,20 +3066,44 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part, &localslot);
if (!found)
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, part_entry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation's partition \"%s\"",
- RelationGetRelationName(partrel));
+ ReportApplyConflict(estate, partrelinfo,
+ LOG, CT_UPDATE_MISSING,
+ remoteslot_part, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
+
return;
}
/*
+ * Report the conflict if the tuple was modified by a
+ * different origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(partrel, &estate->es_tupleTable);
+ slot_store_data(newslot, part_entry, newtup);
+
+ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot_part, localslot, newslot,
+ InvalidOid, localxmin, localorigin,
+ localts);
+ }
+
+ /*
* Apply the update to the local tuple, putting the result in
* remoteslot_part.
*/
@@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
MemoryContextSwitchTo(oldctx);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(partrelinfo, false);
/*
* Does the updated tuple still satisfy the current
@@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* work already done above to find the local tuple in the
* partition.
*/
+ ExecOpenIndices(partrelinfo, true);
+ InitConflictIndexes(partrelinfo);
+
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
@@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
get_namespace_name(RelationGetNamespace(partrel_new)),
RelationGetRelationName(partrel_new));
+ ExecOpenIndices(partrelinfo, false);
+
/* DELETE old tuple found in the old partition. */
EvalPlanQualSetSlot(&epqstate, localslot);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);