diff options
Diffstat (limited to 'src')
5 files changed, 506 insertions, 21 deletions
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index c6d4040e240..b3960bb7151 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -15,6 +15,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "catalog/index.h" #include "catalog/partition.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -491,6 +492,65 @@ ExecFindPartition(ModifyTableState *mtstate, } /* + * IsIndexCompatibleAsArbiter + * Return true if two indexes are identical for INSERT ON CONFLICT + * purposes. + * + * Only indexes of the same relation are supported. + */ +static bool +IsIndexCompatibleAsArbiter(Relation arbiterIndexRelation, + IndexInfo *arbiterIndexInfo, + Relation indexRelation, + IndexInfo *indexInfo) +{ + Assert(arbiterIndexRelation->rd_index->indrelid == indexRelation->rd_index->indrelid); + + /* must match whether they're unique */ + if (arbiterIndexInfo->ii_Unique != indexInfo->ii_Unique) + return false; + + /* No support currently for comparing exclusion indexes. */ + if (arbiterIndexInfo->ii_ExclusionOps != NULL || + indexInfo->ii_ExclusionOps != NULL) + return false; + + /* the "nulls not distinct" criterion must match */ + if (arbiterIndexInfo->ii_NullsNotDistinct != + indexInfo->ii_NullsNotDistinct) + return false; + + /* number of key attributes must match */ + if (arbiterIndexInfo->ii_NumIndexKeyAttrs != + indexInfo->ii_NumIndexKeyAttrs) + return false; + + for (int i = 0; i < arbiterIndexInfo->ii_NumIndexKeyAttrs; i++) + { + if (arbiterIndexRelation->rd_indcollation[i] != + indexRelation->rd_indcollation[i]) + return false; + + if (arbiterIndexRelation->rd_opfamily[i] != + indexRelation->rd_opfamily[i]) + return false; + + if (arbiterIndexRelation->rd_index->indkey.values[i] != + indexRelation->rd_index->indkey.values[i]) + return false; + } + + if (list_difference(RelationGetIndexExpressions(arbiterIndexRelation), + RelationGetIndexExpressions(indexRelation)) != NIL) + return false; + + if (list_difference(RelationGetIndexPredicate(arbiterIndexRelation), + RelationGetIndexPredicate(indexRelation)) != NIL) + return false; + return true; +} + +/* * ExecInitPartitionInfo * Lock the partition and initialize ResultRelInfo. Also setup other * information for the partition and store it in the next empty slot in @@ -689,45 +749,117 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, { TupleDesc partrelDesc = RelationGetDescr(partrel); ExprContext *econtext = mtstate->ps.ps_ExprContext; - ListCell *lc; List *arbiterIndexes = NIL; + int additional_arbiters = 0; /* * If there is a list of arbiter indexes, map it to a list of indexes - * in the partition. We do that by scanning the partition's index - * list and searching for ancestry relationships to each index in the - * ancestor table. + * in the partition. We also add any "identical indexes" to any of + * those, to cover the case where one of them is concurrently being + * reindexed. */ if (rootResultRelInfo->ri_onConflictArbiterIndexes != NIL) { - List *childIdxs; + List *unparented_idxs = NIL, + *arbiters_listidxs = NIL; - childIdxs = RelationGetIndexList(leaf_part_rri->ri_RelationDesc); - - foreach(lc, childIdxs) + for (int listidx = 0; listidx < leaf_part_rri->ri_NumIndices; listidx++) { - Oid childIdx = lfirst_oid(lc); + Oid indexoid; List *ancestors; - ListCell *lc2; - ancestors = get_partition_ancestors(childIdx); - foreach(lc2, rootResultRelInfo->ri_onConflictArbiterIndexes) + /* + * If one of this index's ancestors is in the root's arbiter + * list, then use this index as arbiter for this partition. + * Otherwise, if this index has no parent, track it for later, + * in case REINDEX CONCURRENTLY is working on one of the + * arbiters. + * + * XXX get_partition_ancestors is slow: it scans pg_inherits + * each time. Consider a syscache or some other way to cache? + */ + indexoid = RelationGetRelid(leaf_part_rri->ri_IndexRelationDescs[listidx]); + ancestors = get_partition_ancestors(indexoid); + if (ancestors != NIL) { - if (list_member_oid(ancestors, lfirst_oid(lc2))) - arbiterIndexes = lappend_oid(arbiterIndexes, childIdx); + foreach_oid(parent_idx, rootResultRelInfo->ri_onConflictArbiterIndexes) + { + if (list_member_oid(ancestors, parent_idx)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, indexoid); + arbiters_listidxs = lappend_int(arbiters_listidxs, listidx); + break; + } + } } + else + unparented_idxs = lappend_int(unparented_idxs, listidx); list_free(ancestors); } + + /* + * If we found any indexes with no ancestors, it's possible that + * some arbiter index is undergoing concurrent reindex. Match all + * unparented indexes against arbiters; add unparented matching + * ones as "additional arbiters". + * + * This is critical so that all concurrent transactions use the + * same set as arbiters during REINDEX CONCURRENTLY, to avoid + * spurious "duplicate key" errors. + */ + if (unparented_idxs && arbiterIndexes) + { + foreach_int(unparented_i, unparented_idxs) + { + Relation unparented_rel; + IndexInfo *unparenred_ii; + + unparented_rel = leaf_part_rri->ri_IndexRelationDescs[unparented_i]; + unparenred_ii = leaf_part_rri->ri_IndexRelationInfo[unparented_i]; + + Assert(!list_member_oid(arbiterIndexes, + unparented_rel->rd_index->indexrelid)); + + /* Ignore indexes not ready */ + if (!unparenred_ii->ii_ReadyForInserts) + continue; + + foreach_int(arbiter_i, arbiters_listidxs) + { + Relation arbiter_rel; + IndexInfo *arbiter_ii; + + arbiter_rel = leaf_part_rri->ri_IndexRelationDescs[arbiter_i]; + arbiter_ii = leaf_part_rri->ri_IndexRelationInfo[arbiter_i]; + + /* + * If the non-ancestor index is compatible with the + * arbiter, use the non-ancestor as arbiter too. + */ + if (IsIndexCompatibleAsArbiter(arbiter_rel, + arbiter_ii, + unparented_rel, + unparenred_ii)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, + unparented_rel->rd_index->indexrelid); + additional_arbiters++; + break; + } + } + } + } + list_free(unparented_idxs); + list_free(arbiters_listidxs); } /* - * If the resulting lists are of inequal length, something is wrong. - * XXX This may happen because we don't match the lists correctly when - * a partitioned index is being processed by REINDEX CONCURRENTLY. - * FIXME later. + * We expect to find as many arbiter indexes on this partition as the + * root has, plus however many "additional arbiters" (to wit: those + * being concurrently rebuilt) we found. */ if (list_length(rootResultRelInfo->ri_onConflictArbiterIndexes) != - list_length(arbiterIndexes)) + list_length(arbiterIndexes) - additional_arbiters) elog(ERROR, "invalid arbiter index list"); leaf_part_rri->ri_onConflictArbiterIndexes = arbiterIndexes; diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile index 0a9716db27c..a618e6a9899 100644 --- a/src/test/modules/injection_points/Makefile +++ b/src/test/modules/injection_points/Makefile @@ -18,9 +18,10 @@ ISOLATION = basic \ inplace \ syscache-update-pruned \ index-concurrently-upsert \ + index-concurrently-upsert-predicate \ reindex-concurrently-upsert \ reindex-concurrently-upsert-on-constraint \ - index-concurrently-upsert-predicate + reindex-concurrently-upsert-partitioned TAP_TESTS = 1 diff --git a/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out new file mode 100644 index 00000000000..4c79a43d986 --- /dev/null +++ b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out @@ -0,0 +1,238 @@ +Parsed test spec with 4 sessions + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s4_wakeup_to_set_dead s2_start_upsert s4_wakeup_s1 s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_swap s3_start_reindex s1_start_upsert s4_wakeup_to_swap s2_start_upsert s4_wakeup_s2 s4_wakeup_s1 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_swap: + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_to_swap: + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s2_start_upsert s4_wakeup_s1 s4_wakeup_to_set_dead s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; + <waiting ...> +step s1_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s2_start_upsert: + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); + <waiting ...> +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build index 3bbbc0bf7b3..1a2af8a26c4 100644 --- a/src/test/modules/injection_points/meson.build +++ b/src/test/modules/injection_points/meson.build @@ -49,9 +49,10 @@ tests += { 'inplace', 'syscache-update-pruned', 'index-concurrently-upsert', + 'index-concurrently-upsert-predicate', 'reindex-concurrently-upsert', 'reindex-concurrently-upsert-on-constraint', - 'index-concurrently-upsert-predicate', + 'reindex-concurrently-upsert-partitioned', ], 'runningcheck': false, # see syscache-update-pruned # Some tests wait for all snapshots, so avoid parallel execution diff --git a/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec new file mode 100644 index 00000000000..b57e11f3947 --- /dev/null +++ b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec @@ -0,0 +1,113 @@ +# This test verifies INSERT ON CONFLICT DO UPDATE behavior on partitioned +# tables concurrent with REINDEX CONCURRENTLY. +# +# - s1: UPSERT a tuple +# - s2: UPSERT the same tuple +# - s3: concurrently REINDEX the primary key index +# +# - s4: controls concurrency via injection points + +setup +{ + CREATE EXTENSION injection_points; + CREATE SCHEMA test; + CREATE TABLE test.tbl(i int primary key, updated_at timestamp) PARTITION BY RANGE (i); + CREATE TABLE test.tbl_partition PARTITION OF test.tbl + FOR VALUES FROM (0) TO (10000) + WITH (parallel_workers = 0); +} + +teardown +{ + DROP SCHEMA test CASCADE; + DROP EXTENSION injection_points; +} + +session s1 +setup +{ + SELECT injection_points_set_local(); + SELECT injection_points_attach('check-exclusion-or-unique-constraint-no-conflict', 'wait'); +} +step s1_start_upsert +{ + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); +} + +session s2 +setup +{ + SELECT injection_points_set_local(); + SELECT injection_points_attach('exec-insert-before-insert-speculative', 'wait'); +} +step s2_start_upsert +{ + INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now(); +} + +session s3 +setup +{ + SELECT injection_points_set_local(); +} +step s3_setup_wait_before_set_dead +{ + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); +} +step s3_setup_wait_before_swap +{ + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); +} +step s3_start_reindex +{ + REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; +} + +session s4 +step s4_wakeup_to_swap +{ + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); +} +step s4_wakeup_s1 +{ + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); +} +step s4_wakeup_s2 +{ + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); +} +step s4_wakeup_to_set_dead +{ + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); +} + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_set_dead + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_s2 + +permutation + s3_setup_wait_before_swap + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_swap + s2_start_upsert(s1_start_upsert) + s4_wakeup_s2 + s4_wakeup_s1 + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_to_set_dead + s4_wakeup_s2 |
