summaryrefslogtreecommitdiff
path: root/contrib/test_decoding/specs
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-09-19 05:38:40 +0000
committerAmit Kapila <akapila@postgresql.org>2025-09-19 05:38:40 +0000
commit5b148706c5c8ffffe5662fe569a0f0bcef2351d9 (patch)
tree957ed0d501fbbff5c70382f1f98819b9be6608f9 /contrib/test_decoding/specs
parent8aac5923a3611aa89998368a09c54892b93ebdd9 (diff)
Add optional pid parameter to pg_replication_origin_session_setup().
Commit 216a784829c introduced parallel apply workers, allowing multiple processes to share a replication origin. To support this, replorigin_session_setup() was extended to accept a pid argument identifying the process using the origin. This commit exposes that capability through the SQL interface function pg_replication_origin_session_setup() by adding an optional pid parameter. This enables multiple processes to coordinate replication using the same origin when using SQL-level replication functions. This change allows the non-builtin logical replication solutions to implement parallel apply for large transactions. Additionally, an existing internal error was made user-facing, as it can now be triggered via the exposed SQL API. Author: Doruk Yilmaz <doruk@mixrank.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Euler Taveira <euler@eulerto.com> Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
Diffstat (limited to 'contrib/test_decoding/specs')
-rw-r--r--contrib/test_decoding/specs/parallel_session_origin.spec56
1 files changed, 56 insertions, 0 deletions
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
new file mode 100644
index 00000000000..c0e5fda0723
--- /dev/null
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -0,0 +1,56 @@
+# Test parallel replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"