summaryrefslogtreecommitdiff
path: root/contrib/test_decoding
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/parallel_session_origin.out79
-rw-r--r--contrib/test_decoding/expected/replorigin.out3
-rw-r--r--contrib/test_decoding/meson.build1
-rw-r--r--contrib/test_decoding/specs/parallel_session_origin.spec56
-rw-r--r--contrib/test_decoding/sql/replorigin.sql3
6 files changed, 143 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..acbcaed2feb 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution
+ skip_snapshot_restore invalidation_distribution parallel_session_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
new file mode 100644
index 00000000000..e515b39f7ce
--- /dev/null
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s0_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s1_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare:
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..29a9630c900 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: cannot use PID -1 for inactive replication origin with ID 1
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..99310555e6c 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
+ 'parallel_session_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
new file mode 100644
index 00000000000..c0e5fda0723
--- /dev/null
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -0,0 +1,56 @@
+# Test parallel replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..17f2b888238 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');