diff options
author | Amit Kapila <akapila@postgresql.org> | 2025-09-19 05:38:40 +0000 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2025-09-19 05:38:40 +0000 |
commit | 5b148706c5c8ffffe5662fe569a0f0bcef2351d9 (patch) | |
tree | 957ed0d501fbbff5c70382f1f98819b9be6608f9 /contrib/test_decoding/specs | |
parent | 8aac5923a3611aa89998368a09c54892b93ebdd9 (diff) |
Add optional pid parameter to pg_replication_origin_session_setup().
Commit 216a784829c introduced parallel apply workers, allowing multiple
processes to share a replication origin. To support this,
replorigin_session_setup() was extended to accept a pid argument
identifying the process using the origin.
This commit exposes that capability through the SQL interface function
pg_replication_origin_session_setup() by adding an optional pid parameter.
This enables multiple processes to coordinate replication using the same
origin when using SQL-level replication functions.
This change allows the non-builtin logical replication solutions to
implement parallel apply for large transactions.
Additionally, an existing internal error was made user-facing, as it can
now be triggered via the exposed SQL API.
Author: Doruk Yilmaz <doruk@mixrank.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com
Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
Diffstat (limited to 'contrib/test_decoding/specs')
-rw-r--r-- | contrib/test_decoding/specs/parallel_session_origin.spec | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec new file mode 100644 index 00000000000..c0e5fda0723 --- /dev/null +++ b/contrib/test_decoding/specs/parallel_session_origin.spec @@ -0,0 +1,56 @@ +# Test parallel replication origin manipulations; ensure local_lsn can be +# updated by all attached sessions. + +setup +{ + SELECT pg_replication_origin_create('origin'); + CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn); +} + +teardown +{ + SELECT pg_replication_origin_drop('origin'); + DROP TABLE local_lsn_store; +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); } +step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); } +step "s0_add_message" { + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s0'); +} +step "s0_store_lsn" { + INSERT INTO local_lsn_store + SELECT 0, local_lsn FROM pg_replication_origin_status; +} +step "s0_compare" { + SELECT s0.lsn < s1.lsn + FROM local_lsn_store as s0, local_lsn_store as s1 + WHERE s0.session = 0 AND s1.session = 1; +} +step "s0_reset" { SELECT pg_replication_origin_session_reset(); } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_setup" { + SELECT pg_replication_origin_session_setup('origin', pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/parallel_session_origin/s0'; +} +step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); } +step "s1_add_message" { + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s1'); +} +step "s1_store_lsn" { + INSERT INTO local_lsn_store + SELECT 1, local_lsn FROM pg_replication_origin_status; +} +step "s1_reset" { SELECT pg_replication_origin_session_reset(); } + +# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions +# commits a transaction and store the local_lsn of the replication origin. +# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn. +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset" |