diff options
| author | Álvaro Herrera <alvherre@kurilemu.de> | 2025-12-02 13:51:19 +0100 |
|---|---|---|
| committer | Álvaro Herrera <alvherre@kurilemu.de> | 2025-12-02 13:51:53 +0100 |
| commit | 90eae926abbbcedbbea2ad5302722185e8652dca (patch) | |
| tree | 74943ac23b6ed6dfff725106047adb1b26e18654 | |
| parent | 4f941d432b42eccd99ba0d22e3a59c073ac2406a (diff) | |
Fix ON CONFLICT with REINDEX CONCURRENTLY and partitions
When planning queries with ON CONFLICT on partitioned tables, the
indexes to consider as arbiters for each partition are determined based
on those found in the parent table. However, it's possible for an index
on a partition to be reindexed, and in that case, the auxiliary indexes
created on the partition must be considered as arbiters as well; failing
to do that may result in spurious "duplicate key" errors given
sufficient bad luck.
We fix that in this commit by matching every index that doesn't have a
parent to each initially-determined arbiter index. Every unparented
matching index is considered an additional arbiter index.
Closely related to the fixes in bc32a12e0db2 and 2bc7e886fc1b, and for
identical reasons, not backpatched (for now) even though it's a
longstanding issue.
Author: Mihail Nikalayeu <mihailnikalayeu@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Discussion: https://postgr.es/m/CANtu0ojXmqjmEzp-=aJSxjsdE76iAsRgHBoK0QtYHimb_mEfsg@mail.gmail.com
5 files changed, 506 insertions, 21 deletions
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index c6d4040e240..b3960bb7151 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -15,6 +15,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "catalog/index.h" #include "catalog/partition.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -491,6 +492,65 @@ ExecFindPartition(ModifyTableState *mtstate, } /* + * IsIndexCompatibleAsArbiter + * Return true if two indexes are identical for INSERT ON CONFLICT + * purposes. + * + * Only indexes of the same relation are supported. + */ +static bool +IsIndexCompatibleAsArbiter(Relation arbiterIndexRelation, + IndexInfo *arbiterIndexInfo, + Relation indexRelation, + IndexInfo *indexInfo) +{ + Assert(arbiterIndexRelation->rd_index->indrelid == indexRelation->rd_index->indrelid); + + /* must match whether they're unique */ + if (arbiterIndexInfo->ii_Unique != indexInfo->ii_Unique) + return false; + + /* No support currently for comparing exclusion indexes. */ + if (arbiterIndexInfo->ii_ExclusionOps != NULL || + indexInfo->ii_ExclusionOps != NULL) + return false; + + /* the "nulls not distinct" criterion must match */ + if (arbiterIndexInfo->ii_NullsNotDistinct != + indexInfo->ii_NullsNotDistinct) + return false; + + /* number of key attributes must match */ + if (arbiterIndexInfo->ii_NumIndexKeyAttrs != + indexInfo->ii_NumIndexKeyAttrs) + return false; + + for (int i = 0; i < arbiterIndexInfo->ii_NumIndexKeyAttrs; i++) + { + if (arbiterIndexRelation->rd_indcollation[i] != + indexRelation->rd_indcollation[i]) + return false; + + if (arbiterIndexRelation->rd_opfamily[i] != + indexRelation->rd_opfamily[i]) + return false; + + if (arbiterIndexRelation->rd_index->indkey.values[i] != + indexRelation->rd_index->indkey.values[i]) + return false; + } + + if (list_difference(RelationGetIndexExpressions(arbiterIndexRelation), + RelationGetIndexExpressions(indexRelation)) != NIL) + return false; + + if (list_difference(RelationGetIndexPredicate(arbiterIndexRelation), + RelationGetIndexPredicate(indexRelation)) != NIL) + return false; + return true; +} + +/* * ExecInitPartitionInfo * Lock the partition and initialize ResultRelInfo. Also setup other * information for the partition and store it in the next empty slot in @@ -689,45 +749,117 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, { TupleDesc partrelDesc = RelationGetDescr(partrel); ExprContext *econtext = mtstate->ps.ps_ExprContext; - ListCell *lc; List *arbiterIndexes = NIL; + int additional_arbiters = 0; /* * If there is a list of arbiter indexes, map it to a list of indexes - * in the partition. We do that by scanning the partition's index - * list and searching for ancestry relationships to each index in the - * ancestor table. + * in the partition. We also add any "identical indexes" to any of + * those, to cover the case where one of them is concurrently being + * reindexed. */ if (rootResultRelInfo->ri_onConflictArbiterIndexes != NIL) { - List *childIdxs; + List *unparented_idxs = NIL, + *arbiters_listidxs = NIL; - childIdxs = RelationGetIndexList(leaf_part_rri->ri_RelationDesc); - - foreach(lc, childIdxs) + for (int listidx = 0; listidx < leaf_part_rri->ri_NumIndices; listidx++) { - Oid childIdx = lfirst_oid(lc); + Oid indexoid; List *ancestors; - ListCell *lc2; - ancestors = get_partition_ancestors(childIdx); - foreach(lc2, rootResultRelInfo->ri_onConflictArbiterIndexes) + /* + * If one of this index's ancestors is in the root's arbiter + * list, then use this index as arbiter for this partition. + * Otherwise, if this index has no parent, track it for later, + * in case REINDEX CONCURRENTLY is working on one of the + * arbiters. + * + * XXX get_partition_ancestors is slow: it scans pg_inherits + * each time. Consider a syscache or some other way to cache? + */ + indexoid = RelationGetRelid(leaf_part_rri->ri_IndexRelationDescs[listidx]); + ancestors = get_partition_ancestors(indexoid); + if (ancestors != NIL) { - if (list_member_oid(ancestors, lfirst_oid(lc2))) - arbiterIndexes = lappend_oid(arbiterIndexes, childIdx); + foreach_oid(parent_idx, rootResultRelInfo->ri_onConflictArbiterIndexes) + { + if (list_member_oid(ancestors, parent_idx)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, indexoid); + arbiters_listidxs = lappend_int(arbiters_listidxs, listidx); + break; + } + } } + else + unparented_idxs = lappend_int(unparented_idxs, listidx); list_free(ancestors); } + + /* + * If we found any indexes with no ancestors, it's possible that + * some arbiter index is undergoing concurrent reindex. Match all + * unparented indexes against arbiters; add unparented matching + * ones as "additional arbiters". + * + * This is critical so that all concurrent transactions use the + * same set as arbiters during REINDEX CONCURRENTLY, to avoid + * spurious "duplicate key" errors. + */ + if (unparented_idxs && arbiterIndexes) + { + foreach_int(unparented_i, unparented_idxs) + { + Relation unparented_rel; + IndexInfo *unparenred_ii; + + unparented_rel = leaf_part_rri->ri_IndexRelationDescs[unparented_i]; + unparenred_ii = leaf_part_rri->ri_IndexRelationInfo[unparented_i]; + + Assert(!list_member_oid(arbiterIndexes, + unparented_rel->rd_index->indexrelid)); + + /* Ignore indexes not ready */ + if (!unparenred_ii->ii_ReadyForInserts) + continue; + + foreach_int(arbiter_i, arbiters_listidxs) + { + Relation arbiter_rel; + IndexInfo *arbiter_ii; + + arbiter_rel = leaf_part_rri->ri_IndexRelationDescs[arbiter_i]; + arbiter_ii = leaf_part_rri->ri_IndexRelationInfo[arbiter_i]; + + /* + * If the non-ancestor index is compatible with the + * arbiter, use the non-ancestor as arbiter too. + */ + if (IsIndexCompatibleAsArbiter(arbiter_rel, + arbiter_ii, + unparented_rel, + unparenred_ii)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, + unparented_rel->rd_index->indexrelid); + additional_arbiters++; + break; + } + } + } + } + list_free(unparented_idxs); + list_free(arbiters_listidxs); } /* - * If the resulting lists are of inequal length, something is wrong. - * XXX This may happen because we don't match the lists correctly when - * a partitioned index is being processed by REINDEX CONCURRENTLY. - * FIXME later. + * We expect to find as many arbiter indexes on this partition as the + * root has, plus however many "additional arbiters" (to wit: those + * being concurrently rebuilt) we found. */ if (list_length(rootResultRelInfo->ri_onConflictArbiterIndexes) != - list_length(arbiterIndexes)) + list_length(arbiterIndexes) - additional_arbiters) elog(ERROR, "invalid arbiter index list"); leaf_part_rri->ri_onConflictArbiterIndexes = arbiterIndexes; diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile index 0a9716db27c..a618e6a9899 100644 --- a/src/test/modules/injection_points/Makefile +++ b/src/test/modules/injection_points/Makefile @@ -18,9 +18,10 @@ ISOLATION = basic \ inplace \ syscache-update-pruned \ index-concurrently-upsert \ + index-concurrently-upsert-predicate \ reindex-concurrently-upsert \ reindex-concurrently-upsert-on-constraint \ - index-concurrently-upsert-predicate + reindex-concurrently-upsert-partitioned TAP_TESTS = 1 diff --git a/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out new file mode 100644 index 00000000000..4c79a43d986 --- /dev/null +++ b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out @@ -0,0 +1,238 @@ +Parsed test spec with 4 sessions + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s4_wakeup_to_set_dead s2_start_upsert s4_wakeup_s1 s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_swap s3_start_reindex s1_start_upsert s4_wakeup_to_swap s2_start_upsert s4_wakeup_s2 s4_wakeup_s1 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_swap: + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_to_swap: + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s2_start_upsert s4_wakeup_s1 s4_wakeup_to_set_dead s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build index 3bbbc0bf7b3..1a2af8a26c4 100644 --- a/src/test/modules/injection_points/meson.build +++ b/src/test/modules/injection_points/meson.build @@ -49,9 +49,10 @@ tests += { 'inplace', 'syscache-update-pruned', 'index-concurrently-upsert', + 'index-concurrently-upsert-predicate', 'reindex-concurrently-upsert', 'reindex-concurrently-upsert-on-constraint', - 'index-concurrently-upsert-predicate', + 'reindex-concurrently-upsert-partitioned', ], 'runningcheck': false, # see syscache-update-pruned # Some tests wait for all snapshots, so avoid parallel execution diff --git a/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec new file mode 100644 index 00000000000..b57e11f3947 --- /dev/null +++ b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec @@ -0,0 +1,113 @@ +# This test verifies INSERT ON CONFLICT DO UPDATE behavior on partitioned +# tables concurrent with REINDEX CONCURRENTLY. +# +# - s1: UPSERT a tuple +# - s2: UPSERT the same tuple +# - s3: concurrently REINDEX the primary key index +# +# - s4: controls concurrency via injection points + +setup +{ + CREATE EXTENSION injection_points; + CREATE SCHEMA test; + CREATE TABLE test.tbl(i int primary key, updated_at timestamp) PARTITION BY RANGE (i); + CREATE TABLE test.tbl_partition PARTITION OF test.tbl + FOR VALUES FROM (0) TO (10000) + WITH (parallel_workers = 0); +} + +teardown +{ + DROP SCHEMA test CASCADE; + DROP EXTENSION injection_points; +} + +session s1 +setup +{ + SELECT injection_points_set_local(); + SELECT injection_points_attach('check-exclusion-or-unique-constraint-no-conflict', 'wait'); +} +step s1_start_upsert +{ + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); +} + +session s2 +setup +{ + SELECT injection_points_set_local(); + SELECT injection_points_attach('exec-insert-before-insert-speculative', 'wait'); +} +step s2_start_upsert +{ + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); +} + +session s3 +setup +{ + SELECT injection_points_set_local(); +} +step s3_setup_wait_before_set_dead +{ + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); +} +step s3_setup_wait_before_swap +{ + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); +} +step s3_start_reindex +{ + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; +} + +session s4 +step s4_wakeup_to_swap +{ + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); +} +step s4_wakeup_s1 +{ + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); +} +step s4_wakeup_s2 +{ + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); +} +step s4_wakeup_to_set_dead +{ + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); +} + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_set_dead + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_s2 + +permutation + s3_setup_wait_before_swap + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_swap + s2_start_upsert(s1_start_upsert) + s4_wakeup_s2 + s4_wakeup_s1 + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_to_set_dead + s4_wakeup_s2 |
