diff options
Diffstat (limited to 'contrib')
23 files changed, 3149 insertions, 0 deletions
diff --git a/contrib/Makefile b/contrib/Makefile index c90fe29222a..8dc40f7de00 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -50,6 +50,7 @@ SUBDIRS = \ spi \ tablefunc \ tcn \ + test_decoding \ test_parser \ test_shm_mq \ tsearch2 \ diff --git a/contrib/test_decoding/.gitignore b/contrib/test_decoding/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/contrib/test_decoding/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile new file mode 100644 index 00000000000..c193f737864 --- /dev/null +++ b/contrib/test_decoding/Makefile @@ -0,0 +1,69 @@ +# contrib/test_decoding/Makefile + +MODULES = test_decoding +OBJS = test_decoding.o + +# Note: because we don't tell the Makefile there are any regression tests, +# we have to clean those result files explicitly +EXTRA_CLEAN = -r $(pg_regress_clean_files) + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/test_decoding +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +# Disabled because these tests require "wal_level=logical", which +# typical installcheck users do not have (e.g. buildfarm clients). +installcheck:; + +# But it can nonetheless be very helpful to run tests on preexisting +# installation, allow to do so, but only if requested explicitly. +installcheck-force: regresscheck-install-force isolationcheck-install-force + +check: regresscheck isolationcheck + +submake-regress: + $(MAKE) -C $(top_builddir)/src/test/regress all + +submake-isolation: + $(MAKE) -C $(top_builddir)/src/test/isolation all + +submake-test_decoding: + $(MAKE) -C $(top_builddir)/contrib/test_decoding + +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary + +regresscheck: all | submake-regress submake-test_decoding + $(pg_regress_check) \ + --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf \ + --temp-install=./tmp_check \ + --extra-install=contrib/test_decoding \ + $(REGRESSCHECKS) + +regresscheck-install-force: | submake-regress submake-test_decoding + $(pg_regress_installcheck) \ + --extra-install=contrib/test_decoding \ + $(REGRESSCHECKS) + +ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml + +isolationcheck: all | submake-isolation submake-test_decoding + $(pg_isolation_regress_check) \ + --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf \ + --extra-install=contrib/test_decoding \ + $(ISOLATIONCHECKS) + +isolationcheck-install-force: all | submake-isolation submake-test_decoding + $(pg_isolation_regress_installcheck) \ + --extra-install=contrib/test_decoding \ + $(ISOLATIONCHECKS) + +PHONY: submake-test_decoding submake-regress check \ + regresscheck regresscheck-install-force \ + isolationcheck isolationcheck-install-force diff --git a/contrib/test_decoding/expected/binary.out b/contrib/test_decoding/expected/binary.out new file mode 100644 index 00000000000..3409d9d4265 --- /dev/null +++ b/contrib/test_decoding/expected/binary.out @@ -0,0 +1,35 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- succeeds, textual plugin, textual consumer +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0'); + data +------ +(0 rows) + +-- fails, binary plugin, textual consumer +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '1'); +ERROR: output plugin cannot produce text output +-- succeeds, textual plugin, binary consumer +SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '0'); + data +------ +(0 rows) + +-- succeeds, binary plugin, binary consumer +SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '1'); + data +------ +(0 rows) + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + init +(1 row) + diff --git a/contrib/test_decoding/expected/concurrent_ddl_dml.out b/contrib/test_decoding/expected/concurrent_ddl_dml.out new file mode 100644 index 00000000000..cc9165655fb --- /dev/null +++ b/contrib/test_decoding/expected/concurrent_ddl_dml.out @@ -0,0 +1,733 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_float s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_float: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl1_float s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waiting ...> +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl1_float: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_char s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl1_char s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...> +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl1_char: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s1_insert_tbl2 s2_alter_tbl1_float s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_float: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s1_insert_tbl2 s2_alter_tbl1_char s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_char: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_float s1_insert_tbl2 s2_alter_tbl1_float s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_float: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_float: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[double precision]:1 +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_char s1_insert_tbl2 s2_alter_tbl1_char s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_char: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_char s1_begin s1_insert_tbl1 s2_alter_tbl2_text s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_text: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[text]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_char s1_begin s1_insert_tbl1 s2_alter_tbl2_text s1_insert_tbl2 s2_alter_tbl1_char s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_text: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_char: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[text]:'1' +COMMIT +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_boolean s1_insert_tbl2 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_boolean: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; +ERROR: column "val2" cannot be cast automatically to type boolean +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_boolean s1_insert_tbl2 s2_alter_tbl1_boolean s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_boolean: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; +ERROR: column "val2" cannot be cast automatically to type boolean +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_alter_tbl1_boolean: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl1_boolean: <... completed> +error in steps s1_commit s2_alter_tbl1_boolean: ERROR: column "val2" cannot be cast automatically to type boolean +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_add_int s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s1_insert_tbl2 s1_commit s1_begin s2_alter_tbl2_add_int s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s1_begin: BEGIN; +step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_add_float s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s1_insert_tbl2 s1_commit s1_begin s2_alter_tbl2_add_float s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s1_begin: BEGIN; +step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[double precision]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s2_alter_tbl2_add_char s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s1_begin s1_insert_tbl1 s1_insert_tbl2 s1_commit s1_begin s2_alter_tbl2_add_char s1_insert_tbl2_3col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s1_begin: BEGIN; +step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_int s1_begin s1_insert_tbl2_3col s2_alter_tbl2_drop_3rd_col s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; +step s1_begin: BEGIN; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl2_drop_3rd_col: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +BEGIN +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_int s1_begin s1_insert_tbl2_3col s2_alter_tbl2_drop_3rd_col s1_insert_tbl2 s1_commit s1_insert_tbl2 s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; +step s1_begin: BEGIN; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; <waiting ...> +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_drop_3rd_col: <... completed> +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:null +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_int s1_begin s1_insert_tbl2_3col s2_alter_tbl2_drop_3rd_col s1_commit s2_get_changes s2_alter_tbl2_add_text s1_begin s1_insert_tbl2_3col s2_alter_tbl2_3rd_char s1_insert_tbl2_3col s1_commit s2_get_changes s2_alter_tbl2_3rd_int s1_insert_tbl2_3col s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; +step s1_begin: BEGIN; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; <waiting ...> +step s1_commit: COMMIT; +step s2_alter_tbl2_drop_3rd_col: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +BEGIN +COMMIT +step s2_alter_tbl2_add_text: ALTER TABLE tbl2 ADD COLUMN val3 TEXT; +step s1_begin: BEGIN; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_3rd_char: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; <waiting ...> +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_3rd_char: <... completed> +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +COMMIT +BEGIN +COMMIT +step s2_alter_tbl2_3rd_int: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:null +table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_char s1_begin s1_insert_tbl1 s1_insert_tbl2_3col s2_alter_tbl2_3rd_text s1_insert_tbl2_3col s1_commit s1_insert_tbl2_3col s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_3rd_text: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; <waiting ...> +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_3rd_text: <... completed> +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_text s1_begin s1_insert_tbl1 s1_insert_tbl2_3col s2_alter_tbl2_3rd_char s1_insert_tbl2_3col s1_commit s1_insert_tbl2_3col s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_text: ALTER TABLE tbl2 ADD COLUMN val3 TEXT; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_alter_tbl2_3rd_char: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; <waiting ...> +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_3rd_char: <... completed> +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_char s1_begin s1_insert_tbl1 s2_alter_tbl2_3rd_text s1_insert_tbl2_3col s1_commit s2_alter_tbl2_drop_3rd_col s1_insert_tbl2 s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_3rd_text: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_text s1_begin s1_insert_tbl1 s2_alter_tbl2_3rd_char s1_insert_tbl2_3col s1_commit s2_alter_tbl2_drop_3rd_col s1_insert_tbl2 s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_text: ALTER TABLE tbl2 ADD COLUMN val3 TEXT; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_3rd_char: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; +step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); +step s1_commit: COMMIT; +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; +step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[character varying]:'1' +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop + +starting permutation: s1_init s2_alter_tbl2_add_char s1_begin s1_insert_tbl1 s2_alter_tbl2_drop_3rd_col s1_insert_tbl1 s1_commit s2_get_changes +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); +data + +BEGIN +COMMIT +BEGIN +COMMIT +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out new file mode 100644 index 00000000000..986f504effb --- /dev/null +++ b/contrib/test_decoding/expected/ddl.out @@ -0,0 +1,651 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- fail because of an already existing slot +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ERROR: replication slot "regression_slot" already exists +-- fail because of an invalid name +SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding'); +ERROR: replication slot name "Invalid Name" contains invalid character +HINT: Replication slot names may only contain letters, numbers and the underscore character. +-- fail twice because of an invalid parameter values +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); +ERROR: could not parse value "frakbar" for parameter "include-xids" +CONTEXT: slot "regression_slot", output plugin "test_decoding", in the startup callback +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nonexistant-option', 'frakbar'); +ERROR: option "nonexistant-option" = "frakbar" is unknown +CONTEXT: slot "regression_slot", output plugin "test_decoding", in the startup callback +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); +ERROR: could not parse value "frakbar" for parameter "include-xids" +CONTEXT: slot "regression_slot", output plugin "test_decoding", in the startup callback +-- succeed once +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- fail +SELECT pg_drop_replication_slot('regression_slot'); +ERROR: replication slot "regression_slot" does not exist +-- check that we're detecting a streaming rep slot used for logical decoding +SELECT 'init' FROM pg_create_physical_replication_slot('repl'); + ?column? +---------- + init +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0'); +ERROR: cannot use physical replication slot for logical decoding +SELECT pg_drop_replication_slot('repl'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +/* check whether status function reports us, only reproduceable columns */ +SELECT slot_name, plugin, slot_type, active, + NOT catalog_xmin IS NULL AS catalog_xmin_set, + xmin IS NULl AS data_xmin_not_set, + pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal +FROM pg_replication_slots; + slot_name | plugin | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal +-----------------+---------------+-----------+--------+------------------+-------------------+---------- + regression_slot | test_decoding | logical | f | t | t | t +(1 row) + +/* + * Check that changes are handled correctly when interleaved with ddl + */ +CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); +BEGIN; +INSERT INTO replication_example(somedata, text) VALUES (1, 1); +INSERT INTO replication_example(somedata, text) VALUES (1, 2); +COMMIT; +ALTER TABLE replication_example ADD COLUMN bar int; +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4); +BEGIN; +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4); +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4); +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL); +COMMIT; +ALTER TABLE replication_example DROP COLUMN bar; +INSERT INTO replication_example(somedata, text) VALUES (3, 1); +BEGIN; +INSERT INTO replication_example(somedata, text) VALUES (3, 2); +INSERT INTO replication_example(somedata, text) VALUES (3, 3); +COMMIT; +ALTER TABLE replication_example RENAME COLUMN text TO somenum; +INSERT INTO replication_example(somedata, somenum) VALUES (4, 1); +-- collect all changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +--------------------------------------------------------------------------------------------------------------------------- + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:'1' + table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 text[character varying]:'2' + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:3 somedata[integer]:2 text[character varying]:'1' bar[integer]:4 + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:4 somedata[integer]:2 text[character varying]:'2' bar[integer]:4 + table public.replication_example: INSERT: id[integer]:5 somedata[integer]:2 text[character varying]:'3' bar[integer]:4 + table public.replication_example: INSERT: id[integer]:6 somedata[integer]:2 text[character varying]:'4' bar[integer]:null + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:7 somedata[integer]:3 text[character varying]:'1' + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:8 somedata[integer]:3 text[character varying]:'2' + table public.replication_example: INSERT: id[integer]:9 somedata[integer]:3 text[character varying]:'3' + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:10 somedata[integer]:4 somenum[character varying]:'1' + COMMIT +(30 rows) + +ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); +-- throw away changes, they contain oids +SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + count +------- + 12 +(1 row) + +INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); +BEGIN; +INSERT INTO replication_example(somedata, somenum) VALUES (6, 1); +ALTER TABLE replication_example ADD COLUMN zaphod1 int; +INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1); +ALTER TABLE replication_example ADD COLUMN zaphod2 int; +INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1); +INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2); +COMMIT; +-- show changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.replication_example: INSERT: id[integer]:11 somedata[integer]:5 somenum[integer]:1 + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:12 somedata[integer]:6 somenum[integer]:1 + table public.replication_example: INSERT: id[integer]:13 somedata[integer]:6 somenum[integer]:2 zaphod1[integer]:1 + table public.replication_example: INSERT: id[integer]:14 somedata[integer]:6 somenum[integer]:3 zaphod1[integer]:null zaphod2[integer]:1 + table public.replication_example: INSERT: id[integer]:15 somedata[integer]:6 somenum[integer]:4 zaphod1[integer]:2 zaphod2[integer]:null + COMMIT +(9 rows) + +-- hide changes bc of oid visible in full table rewrites +CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); +INSERT INTO tr_unique(data) VALUES(10); +ALTER TABLE tr_unique RENAME TO tr_pkey; +ALTER TABLE tr_pkey ADD COLUMN id serial primary key; +SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + count +------- + 10 +(1 row) + +INSERT INTO tr_pkey(data) VALUES(1); +--show deletion with primary key +DELETE FROM tr_pkey; +/* display results */ +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +---------------------------------------------------------------------------- + BEGIN + table public.tr_pkey: INSERT: id2[integer]:2 data[integer]:1 id[integer]:2 + COMMIT + BEGIN + table public.tr_pkey: DELETE: id[integer]:1 + table public.tr_pkey: DELETE: id[integer]:2 + COMMIT +(7 rows) + +/* + * check that disk spooling works + */ +BEGIN; +CREATE TABLE tr_etoomuch (id serial primary key, data int); +INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +DELETE FROM tr_etoomuch WHERE id < 5000; +UPDATE tr_etoomuch SET data = - data WHERE id > 5000; +COMMIT; +/* display results, but hide most of the output */ +SELECT count(*), min(data), max(data) +FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0') +GROUP BY substring(data, 1, 24) +ORDER BY 1; + count | min | max +-------+-------------------------------------------------+------------------------------------------------------------------------ + 1 | COMMIT | COMMIT + 1 | BEGIN | BEGIN + 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 +(3 rows) + +/* + * check whether we decode subtransactions correctly in relation with each + * other + */ +CREATE TABLE tr_sub (id serial primary key, path text); +-- toplevel, subtxn, toplevel, subtxn, subtxn +BEGIN; +INSERT INTO tr_sub(path) VALUES ('1-top-#1'); +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('1-top-1-#1'); +INSERT INTO tr_sub(path) VALUES ('1-top-1-#2'); +RELEASE SAVEPOINT a; +SAVEPOINT b; +SAVEPOINT c; +INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1'); +INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2'); +RELEASE SAVEPOINT c; +INSERT INTO tr_sub(path) VALUES ('1-top-2-#1'); +RELEASE SAVEPOINT b; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +---------------------------------------------------------------------- + BEGIN + COMMIT + BEGIN + table public.tr_sub: INSERT: id[integer]:1 path[text]:'1-top-#1' + table public.tr_sub: INSERT: id[integer]:2 path[text]:'1-top-1-#1' + table public.tr_sub: INSERT: id[integer]:3 path[text]:'1-top-1-#2' + table public.tr_sub: INSERT: id[integer]:4 path[text]:'1-top-2-1-#1' + table public.tr_sub: INSERT: id[integer]:5 path[text]:'1-top-2-1-#2' + table public.tr_sub: INSERT: id[integer]:6 path[text]:'1-top-2-#1' + COMMIT +(10 rows) + +-- check that we handle xlog assignments correctly +BEGIN; +-- nest 80 subtxns +SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +-- assign xid by inserting +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1'); +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2'); +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3'); +RELEASE SAVEPOINT subtop; +INSERT INTO tr_sub(path) VALUES ('2-top-#1'); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +------------------------------------------------------------------------ + BEGIN + table public.tr_sub: INSERT: id[integer]:7 path[text]:'2-top-1...--#1' + table public.tr_sub: INSERT: id[integer]:8 path[text]:'2-top-1...--#2' + table public.tr_sub: INSERT: id[integer]:9 path[text]:'2-top-1...--#3' + table public.tr_sub: INSERT: id[integer]:10 path[text]:'2-top-#1' + COMMIT +(6 rows) + +-- make sure rollbacked subtransactions aren't decoded +BEGIN; +INSERT INTO tr_sub(path) VALUES ('3-top-2-#1'); +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('3-top-2-1-#1'); +SAVEPOINT b; +INSERT INTO tr_sub(path) VALUES ('3-top-2-2-#1'); +ROLLBACK TO SAVEPOINT b; +INSERT INTO tr_sub(path) VALUES ('3-top-2-#2'); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +----------------------------------------------------------------------- + BEGIN + table public.tr_sub: INSERT: id[integer]:11 path[text]:'3-top-2-#1' + table public.tr_sub: INSERT: id[integer]:12 path[text]:'3-top-2-1-#1' + table public.tr_sub: INSERT: id[integer]:14 path[text]:'3-top-2-#2' + COMMIT +(5 rows) + +-- test whether a known, but not yet logged toplevel xact, followed by a +-- subxact commit is handled correctly +BEGIN; +SELECT txid_current() != 0; -- so no fixed xid apears in the outfile + ?column? +---------- + t +(1 row) + +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('4-top-1-#1'); +RELEASE SAVEPOINT a; +COMMIT; +-- test whether a change in a subtransaction, in an unknown toplevel +-- xact is handled correctly. +BEGIN; +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('5-top-1-#1'); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +--------------------------------------------------------------------- + BEGIN + table public.tr_sub: INSERT: id[integer]:15 path[text]:'4-top-1-#1' + COMMIT + BEGIN + table public.tr_sub: INSERT: id[integer]:16 path[text]:'5-top-1-#1' + COMMIT +(6 rows) + +/* + * Check whether treating a table as a catalog table works somewhat + */ +CREATE TABLE replication_metadata ( + id serial primary key, + relation name NOT NULL, + options text[] +) +WITH (user_catalog_table = true) +; +\d+ replication_metadata + Table "public.replication_metadata" + Column | Type | Modifiers | Storage | Stats target | Description +----------+---------+-------------------------------------------------------------------+----------+--------------+------------- + id | integer | not null default nextval('replication_metadata_id_seq'::regclass) | plain | | + relation | name | not null | plain | | + options | text[] | | extended | | +Indexes: + "replication_metadata_pkey" PRIMARY KEY, btree (id) +Has OIDs: no +Options: user_catalog_table=true + +INSERT INTO replication_metadata(relation, options) +VALUES ('foo', ARRAY['a', 'b']); +ALTER TABLE replication_metadata RESET (user_catalog_table); +\d+ replication_metadata + Table "public.replication_metadata" + Column | Type | Modifiers | Storage | Stats target | Description +----------+---------+-------------------------------------------------------------------+----------+--------------+------------- + id | integer | not null default nextval('replication_metadata_id_seq'::regclass) | plain | | + relation | name | not null | plain | | + options | text[] | | extended | | +Indexes: + "replication_metadata_pkey" PRIMARY KEY, btree (id) +Has OIDs: no + +INSERT INTO replication_metadata(relation, options) +VALUES ('bar', ARRAY['a', 'b']); +ALTER TABLE replication_metadata SET (user_catalog_table = true); +\d+ replication_metadata + Table "public.replication_metadata" + Column | Type | Modifiers | Storage | Stats target | Description +----------+---------+-------------------------------------------------------------------+----------+--------------+------------- + id | integer | not null default nextval('replication_metadata_id_seq'::regclass) | plain | | + relation | name | not null | plain | | + options | text[] | | extended | | +Indexes: + "replication_metadata_pkey" PRIMARY KEY, btree (id) +Has OIDs: no +Options: user_catalog_table=true + +INSERT INTO replication_metadata(relation, options) +VALUES ('blub', NULL); +-- make sure rewrites don't work +ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int; +ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text; +ERROR: cannot rewrite table "replication_metadata" used as a catalog table +ALTER TABLE replication_metadata SET (user_catalog_table = false); +\d+ replication_metadata + Table "public.replication_metadata" + Column | Type | Modifiers | Storage | Stats target | Description +----------------+---------+-------------------------------------------------------------------+----------+--------------+------------- + id | integer | not null default nextval('replication_metadata_id_seq'::regclass) | plain | | + relation | name | not null | plain | | + options | text[] | | extended | | + rewritemeornot | integer | | plain | | +Indexes: + "replication_metadata_pkey" PRIMARY KEY, btree (id) +Has OIDs: no +Options: user_catalog_table=false + +INSERT INTO replication_metadata(relation, options) +VALUES ('zaphod', NULL); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + COMMIT + BEGIN + table public.replication_metadata: INSERT: id[integer]:1 relation[name]:'foo' options[text[]]:'{a,b}' + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_metadata: INSERT: id[integer]:2 relation[name]:'bar' options[text[]]:'{a,b}' + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_metadata: INSERT: id[integer]:3 relation[name]:'blub' options[text[]]:null + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_metadata: INSERT: id[integer]:4 relation[name]:'zaphod' options[text[]]:null rewritemeornot[integer]:null + COMMIT +(22 rows) + +/* + * check whether we handle updates/deletes correct with & without a pkey + */ +/* we should handle the case without a key at all more gracefully */ +CREATE TABLE table_without_key(id serial, data int); +INSERT INTO table_without_key(data) VALUES(1),(2); +DELETE FROM table_without_key WHERE data = 1; +-- won't log old keys +UPDATE table_without_key SET data = 3 WHERE data = 2; +UPDATE table_without_key SET id = -id; +UPDATE table_without_key SET id = -id; +-- should log the full old row now +ALTER TABLE table_without_key REPLICA IDENTITY FULL; +UPDATE table_without_key SET data = 3 WHERE data = 2; +UPDATE table_without_key SET id = -id; +UPDATE table_without_key SET id = -id; +DELETE FROM table_without_key WHERE data = 3; +CREATE TABLE table_with_pkey(id serial primary key, data int); +INSERT INTO table_with_pkey(data) VALUES(1), (2); +DELETE FROM table_with_pkey WHERE data = 1; +-- should log the old pkey +UPDATE table_with_pkey SET data = 3 WHERE data = 2; +UPDATE table_with_pkey SET id = -id; +UPDATE table_with_pkey SET id = -id; +-- check that we log nothing despite having a pkey +ALTER TABLE table_without_key REPLICA IDENTITY NOTHING; +UPDATE table_with_pkey SET id = -id; +-- check that we log everything despite having a pkey +ALTER TABLE table_without_key REPLICA IDENTITY FULL; +UPDATE table_with_pkey SET id = -id; +DELETE FROM table_with_pkey WHERE data = 3; +CREATE TABLE table_with_unique_not_null(id serial unique, data int); +ALTER TABLE table_with_unique_not_null ALTER COLUMN id SET NOT NULL; --already set +-- won't log anything, replica identity not setup +INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); +DELETE FROM table_with_unique_not_null WHERE data = 1; +UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; +UPDATE table_with_unique_not_null SET id = -id; +UPDATE table_with_unique_not_null SET id = -id; +DELETE FROM table_with_unique_not_null WHERE data = 3; +-- should log old key +ALTER TABLE table_with_unique_not_null REPLICA IDENTITY USING INDEX table_with_unique_not_null_id_key; +INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); +DELETE FROM table_with_unique_not_null WHERE data = 1; +UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; +UPDATE table_with_unique_not_null SET id = -id; +UPDATE table_with_unique_not_null SET id = -id; +DELETE FROM table_with_unique_not_null WHERE data = 3; +-- check toast support +BEGIN; +CREATE SEQUENCE toasttable_rand_seq START 79 INCREMENT 1499; -- portable "random" +CREATE TABLE toasttable( + id serial primary key, + toasted_col1 text, + rand1 float8 DEFAULT nextval('toasttable_rand_seq'), + toasted_col2 text, + rand2 float8 DEFAULT nextval('toasttable_rand_seq') + ); +COMMIT; +-- uncompressed external toast data +INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); +-- compressed external toast data +INSERT INTO toasttable(toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +-- update of existing column +UPDATE toasttable + SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) +WHERE id = 1; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + datatable public.table_without_key: INSERT: id[integer]:1 data[integer]:1 + table public.table_without_key: INSERT: id[integer]:2 data[integer]:2 + COMMIT + BEGIN + table public.table_without_key: DELETE: (no-tuple-data) + COMMIT + BEGIN + table public.table_without_key: UPDATE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_without_key: UPDATE: id[integer]:-2 data[integer]:3 + COMMIT + BEGIN + table public.table_without_key: UPDATE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_without_key: UPDATE: old-key: id[integer]:2 data[integer]:3 new-tuple: id[integer]:-2 data[integer]:3 + COMMIT + BEGIN + table public.table_without_key: UPDATE: old-key: id[integer]:-2 data[integer]:3 new-tuple: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_without_key: DELETE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_with_pkey: INSERT: id[integer]:1 data[integer]:1 + table public.table_with_pkey: INSERT: id[integer]:2 data[integer]:2 + COMMIT + BEGIN + table public.table_with_pkey: DELETE: id[integer]:1 + COMMIT + BEGIN + table public.table_with_pkey: UPDATE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_pkey: UPDATE: old-key: id[integer]:2 new-tuple: id[integer]:-2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_pkey: UPDATE: old-key: id[integer]:-2 new-tuple: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_with_pkey: UPDATE: old-key: id[integer]:2 new-tuple: id[integer]:-2 data[integer]:3 + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_with_pkey: UPDATE: old-key: id[integer]:-2 new-tuple: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_pkey: DELETE: id[integer]:2 + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_with_unique_not_null: INSERT: id[integer]:1 data[integer]:1 + table public.table_with_unique_not_null: INSERT: id[integer]:2 data[integer]:2 + COMMIT + BEGIN + table public.table_with_unique_not_null: DELETE: (no-tuple-data) + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: id[integer]:-2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: id[integer]:2 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: DELETE: (no-tuple-data) + COMMIT + BEGIN + COMMIT + BEGIN + table public.table_with_unique_not_null: INSERT: id[integer]:3 data[integer]:1 + table public.table_with_unique_not_null: INSERT: id[integer]:4 data[integer]:2 + COMMIT + BEGIN + table public.table_with_unique_not_null: DELETE: id[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: id[integer]:4 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: old-key: id[integer]:4 new-tuple: id[integer]:-4 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: UPDATE: old-key: id[integer]:-4 new-tuple: id[integer]:4 data[integer]:3 + COMMIT + BEGIN + table public.table_with_unique_not_null: DELETE: id[integer]:4 + COMMIT + BEGIN + COMMIT + BEGIN + table public.toasttable: INSERT: id[integer]:1 toasted_col1[text]:'12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000' rand1[double precision]:79 toasted_col2[text]:null rand2[double precision]:1578 + COMMIT + BEGIN + table public.toasttable: INSERT: id[integer]:2 toasted_col1[text]:null rand1[double precision]:3077 toasted_col2[text]:'' rand2[double precision]:4576 + COMMIT + BEGIN + table public.toasttable: UPDATE: id[integer]:1 toasted_col1[text]:'12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000' rand1[double precision]:79 toasted_col2[text]:null rand2[double precision]:1578 + COMMIT +(113 rows) + +INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); +-- update of second column, first column unchanged +UPDATE toasttable + SET toasted_col2 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) +WHERE id = 1; +-- make sure we decode correctly even if the toast table is gone +DROP TABLE toasttable; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + datatable public.toasttable: INSERT: id[integer]:3 toasted_col1[text]:'12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000' rand1[double precision]:6075 toasted_col2[text]:null rand2[double precision]:7574 + COMMIT + BEGIN + table public.toasttable: UPDATE: id[integer]:1 toasted_col1[text]:unchanged-toast-datum rand1[double precision]:79 toasted_col2[text]:'12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000' rand2[double precision]:1578 + COMMIT + BEGIN + COMMIT +(8 rows) + +-- done, free logical replication slot +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +/* check that we aren't visible anymore now */ +SELECT * FROM pg_stat_replication; + pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | backend_xmin | state | sent_location | write_location | flush_location | replay_location | sync_priority | sync_state +-----+----------+---------+------------------+-------------+-----------------+-------------+---------------+--------------+-------+---------------+----------------+----------------+-----------------+---------------+------------ +(0 rows) + diff --git a/contrib/test_decoding/expected/decoding_in_xact.out b/contrib/test_decoding/expected/decoding_in_xact.out new file mode 100644 index 00000000000..d15b0b542ba --- /dev/null +++ b/contrib/test_decoding/expected/decoding_in_xact.out @@ -0,0 +1,89 @@ +-- predictability +SET synchronous_commit = on; +-- fail because we're creating a slot while in an xact with xid +BEGIN; +SELECT txid_current() = 0; + ?column? +---------- + f +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ERROR: cannot create logical replication slot in transaction that has performed writes +ROLLBACK; +-- fail because we're creating a slot while in an subxact whose topxact has a xid +BEGIN; +SELECT txid_current() = 0; + ?column? +---------- + f +(1 row) + +SAVEPOINT barf; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ERROR: cannot create logical replication slot in transaction that has performed writes +ROLLBACK TO SAVEPOINT barf; +ROLLBACK; +-- succeed, outside tx. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + +-- succeed, in tx without xid. +BEGIN; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +COMMIT; +CREATE TABLE nobarf(id serial primary key, data text); +INSERT INTO nobarf(data) VALUES('1'); +-- decoding works in transaction with xid +BEGIN; +SELECT txid_current() = 0; + ?column? +---------- + f +(1 row) + +-- don't show yet, haven't committed +INSERT INTO nobarf(data) VALUES('2'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +----------------------------------------------------------- + BEGIN + COMMIT + BEGIN + table public.nobarf: INSERT: id[integer]:1 data[text]:'1' + COMMIT +(5 rows) + +COMMIT; +INSERT INTO nobarf(data) VALUES('3'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +----------------------------------------------------------- + BEGIN + table public.nobarf: INSERT: id[integer]:2 data[text]:'2' + COMMIT + BEGIN + table public.nobarf: INSERT: id[integer]:3 data[text]:'3' + COMMIT +(6 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/contrib/test_decoding/expected/delayed_startup.out b/contrib/test_decoding/expected/delayed_startup.out new file mode 100644 index 00000000000..db8c525ac40 --- /dev/null +++ b/contrib/test_decoding/expected/delayed_startup.out @@ -0,0 +1,38 @@ +Parsed test spec with 2 sessions + +starting permutation: s1b s1w s2init s1c s2start s1b s1w s1c s2start s1b s1w s2start s1c s2start +step s1b: BEGIN ISOLATION LEVEL SERIALIZABLE; +step s1w: INSERT INTO do_write DEFAULT VALUES; +step s2init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...> +step s1c: COMMIT; +step s2init: <... completed> +?column? + +init +step s2start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +step s1b: BEGIN ISOLATION LEVEL SERIALIZABLE; +step s1w: INSERT INTO do_write DEFAULT VALUES; +step s1c: COMMIT; +step s2start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:2 +COMMIT +step s1b: BEGIN ISOLATION LEVEL SERIALIZABLE; +step s1w: INSERT INTO do_write DEFAULT VALUES; +step s2start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +step s1c: COMMIT; +step s2start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:3 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/expected/mxact.out b/contrib/test_decoding/expected/mxact.out new file mode 100644 index 00000000000..f0d96cc67d0 --- /dev/null +++ b/contrib/test_decoding/expected/mxact.out @@ -0,0 +1,66 @@ +Parsed test spec with 3 sessions + +starting permutation: s0init s0start s1begin s1sharepgclass s2begin s2sharepgclass s0w s0start s2commit s1commit +step s0init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s0start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +step s1begin: BEGIN; +step s1sharepgclass: SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; +?column? + +t +step s2begin: BEGIN; +step s2sharepgclass: SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; +?column? + +t +step s0w: INSERT INTO do_write DEFAULT VALUES; +step s0start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:1 +COMMIT +step s2commit: COMMIT; +step s1commit: COMMIT; +?column? + +stop + +starting permutation: s0init s0start s1begin s1keysharepgclass s2begin s2keysharepgclass s0alter s0w s0start s2commit s1commit +step s0init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? + +init +step s0start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +step s1begin: BEGIN; +step s1keysharepgclass: SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; +?column? + +t +step s2begin: BEGIN; +step s2keysharepgclass: SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; +?column? + +t +step s0alter: ALTER TABLE do_write ADD column ts timestamptz; +step s0w: INSERT INTO do_write DEFAULT VALUES; +step s0start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +COMMIT +BEGIN +table public.do_write: INSERT: id[integer]:1 ts[timestamp with time zone]:null +COMMIT +step s2commit: COMMIT; +step s1commit: COMMIT; +?column? + +stop diff --git a/contrib/test_decoding/expected/permissions.out b/contrib/test_decoding/expected/permissions.out new file mode 100644 index 00000000000..85b7f5d6257 --- /dev/null +++ b/contrib/test_decoding/expected/permissions.out @@ -0,0 +1,130 @@ +-- predictability +SET synchronous_commit = on; +-- setup +CREATE ROLE lr_normal; +CREATE ROLE lr_superuser SUPERUSER; +CREATE ROLE lr_replication REPLICATION; +CREATE TABLE lr_test(data text); +-- superuser can control replication +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +INSERT INTO lr_test VALUES('lr_superuser_init'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +-------------------------------------------------------------- + BEGIN + table public.lr_test: INSERT: data[text]:'lr_superuser_init' + COMMIT +(3 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +RESET ROLE; +-- replication user can control replication +SET ROLE lr_replication; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +INSERT INTO lr_test VALUES('lr_superuser_init'); +ERROR: permission denied for relation lr_test +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +RESET ROLE; +-- plain user *can't* can control replication +SET ROLE lr_normal; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ERROR: must be superuser or replication role to use replication slots +INSERT INTO lr_test VALUES('lr_superuser_init'); +ERROR: permission denied for relation lr_test +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +ERROR: must be superuser or replication role to use replication slots +SELECT pg_drop_replication_slot('regression_slot'); +ERROR: must be superuser or replication role to use replication slots +RESET ROLE; +-- replication users can drop superuser created slots +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +RESET ROLE; +SET ROLE lr_replication; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +RESET ROLE; +-- normal users can't drop existing slots +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +RESET ROLE; +SET ROLE lr_normal; +SELECT pg_drop_replication_slot('regression_slot'); +ERROR: must be superuser or replication role to use replication slots +RESET ROLE; +-- all users can see existing slots +SET ROLE lr_superuser; +SELECT slot_name, plugin FROM pg_replication_slots; + slot_name | plugin +-----------------+--------------- + regression_slot | test_decoding +(1 row) + +RESET ROLE; +SET ROLE lr_replication; +SELECT slot_name, plugin FROM pg_replication_slots; + slot_name | plugin +-----------------+--------------- + regression_slot | test_decoding +(1 row) + +RESET ROLE; +SET ROLE lr_normal; +SELECT slot_name, plugin FROM pg_replication_slots; + slot_name | plugin +-----------------+--------------- + regression_slot | test_decoding +(1 row) + +RESET ROLE; +-- cleanup +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +DROP ROLE lr_normal; +DROP ROLE lr_superuser; +DROP ROLE lr_replication; +DROP TABLE lr_test; diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out new file mode 100644 index 00000000000..ec23ab9024a --- /dev/null +++ b/contrib/test_decoding/expected/rewrite.out @@ -0,0 +1,107 @@ +-- predictability +SET synchronous_commit = on; +DROP TABLE IF EXISTS replication_example; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); +INSERT INTO replication_example(somedata) VALUES (1); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +---------------------------------------------------------------------------------------------------------- + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:null + COMMIT +(5 rows) + +BEGIN; +INSERT INTO replication_example(somedata) VALUES (2); +ALTER TABLE replication_example ADD COLUMN testcolumn1 int; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (3, 1); +COMMIT; +BEGIN; +INSERT INTO replication_example(somedata) VALUES (3); +ALTER TABLE replication_example ADD COLUMN testcolumn2 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn2) VALUES (4, 2, 1); +COMMIT; +VACUUM FULL pg_am; +VACUUM FULL pg_amop; +VACUUM FULL pg_proc; +VACUUM FULL pg_opclass; +VACUUM FULL pg_type; +VACUUM FULL pg_index; +VACUUM FULL pg_database; +-- repeated rewrites that fail +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +ROLLBACK; +-- repeated rewrites that succeed +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +COMMIT; + -- repeated rewrites in different transactions +VACUUM FULL pg_class; +VACUUM FULL pg_class; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (5, 3); +BEGIN; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (6, 4); +ALTER TABLE replication_example ADD COLUMN testcolumn3 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5, 1); +COMMIT; +-- make old files go away +CHECKPOINT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + data +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.replication_example: INSERT: id[integer]:2 somedata[integer]:2 text[character varying]:null + table public.replication_example: INSERT: id[integer]:3 somedata[integer]:3 text[character varying]:null testcolumn1[integer]:1 + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:4 somedata[integer]:3 text[character varying]:null testcolumn1[integer]:null + table public.replication_example: INSERT: id[integer]:5 somedata[integer]:4 text[character varying]:null testcolumn1[integer]:2 testcolumn2[integer]:1 + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:6 somedata[integer]:5 text[character varying]:null testcolumn1[integer]:3 testcolumn2[integer]:null + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:7 somedata[integer]:6 text[character varying]:null testcolumn1[integer]:4 testcolumn2[integer]:null + table public.replication_example: INSERT: id[integer]:8 somedata[integer]:7 text[character varying]:null testcolumn1[integer]:5 testcolumn2[integer]:null testcolumn3[integer]:1 + COMMIT +(35 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +DROP TABLE IF EXISTS replication_example; diff --git a/contrib/test_decoding/expected/toast.out b/contrib/test_decoding/expected/toast.out new file mode 100644 index 00000000000..6adef83f029 --- /dev/null +++ b/contrib/test_decoding/expected/toast.out @@ -0,0 +1,90 @@ +-- predictability +SET synchronous_commit = on; +DROP TABLE IF EXISTS xpto; +NOTICE: table "xpto" does not exist, skipping +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE SEQUENCE xpto_rand_seq START 79 INCREMENT 1499; -- portable "random" +CREATE TABLE xpto ( + id serial primary key, + toasted_col1 text, + rand1 float8 DEFAULT nextval('xpto_rand_seq'), + toasted_col2 text, + rand2 float8 DEFAULT nextval('xpto_rand_seq') +); +-- uncompressed external toast data +INSERT INTO xpto (toasted_col1, toasted_col2) SELECT string_agg(g.i::text, ''), string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i); +-- compressed external toast data +INSERT INTO xpto (toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +-- update of existing column +UPDATE xpto SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) WHERE id = 1; +UPDATE xpto SET rand1 = 123.456 WHERE id = 1; +DELETE FROM xpto WHERE id = 1; +DROP TABLE IF EXISTS toasted_key; +NOTICE: table "toasted_key" does not exist, skipping +CREATE TABLE toasted_key ( + id serial, + toasted_key text PRIMARY KEY, + toasted_col1 text, + toasted_col2 text +); +ALTER TABLE toasted_key ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_key ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; +INSERT INTO toasted_key(toasted_key, toasted_col1) VALUES(repeat('1234567890', 200), repeat('9876543210', 200)); +-- test update of a toasted key without changing it +UPDATE toasted_key SET toasted_col2 = toasted_col1; +-- test update of a toasted key, changing it +UPDATE toasted_key SET toasted_key = toasted_key || '1'; +DELETE FROM toasted_key; +SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + substr +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + table public.xpto: INSERT: id[integer]:1 toasted_col1[text]:'1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 + COMMIT + BEGIN + table public.xpto: INSERT: id[integer]:2 toasted_col1[text]:null rand1[double precision]:3077 toasted_col2[text]:'00010002000300040005000600070008000900100011001200130014001500160017001800190020002100 + COMMIT + BEGIN + table public.xpto: UPDATE: id[integer]:1 toasted_col1[text]:'1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 + COMMIT + BEGIN + table public.xpto: UPDATE: id[integer]:1 toasted_col1[text]:unchanged-toast-datum rand1[double precision]:123.456 toasted_col2[text]:unchanged-toast-datum rand2[double precision]:1578 + COMMIT + BEGIN + table public.xpto: DELETE: id[integer]:1 + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + COMMIT + BEGIN + table public.toasted_key: INSERT: id[integer]:1 toasted_key[text]:'1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123 + COMMIT + BEGIN + table public.toasted_key: UPDATE: id[integer]:1 toasted_key[text]:unchanged-toast-datum toasted_col1[text]:unchanged-toast-datum toasted_col2[text]:'987654321098765432109876543210987654321098765432109 + COMMIT + BEGIN + table public.toasted_key: UPDATE: old-key: toasted_key[text]:'123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678 + COMMIT + BEGIN + table public.toasted_key: DELETE: toasted_key[text]:'123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567 + COMMIT +(37 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf new file mode 100644 index 00000000000..367f7066514 --- /dev/null +++ b/contrib/test_decoding/logical.conf @@ -0,0 +1,2 @@ +wal_level = logical +max_replication_slots = 4 diff --git a/contrib/test_decoding/specs/concurrent_ddl_dml.spec b/contrib/test_decoding/specs/concurrent_ddl_dml.spec new file mode 100644 index 00000000000..7c8a7c7977f --- /dev/null +++ b/contrib/test_decoding/specs/concurrent_ddl_dml.spec @@ -0,0 +1,94 @@ +setup +{ + DROP TABLE IF EXISTS tbl1; + DROP TABLE IF EXISTS tbl2; + CREATE TABLE tbl1(val1 integer, val2 integer); + CREATE TABLE tbl2(val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + DROP TABLE tbl2; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_insert_tbl1_3col" { INSERT INTO tbl1 (val1, val2, val3) VALUES (1, 1, 1); } +step "s1_insert_tbl2" { INSERT INTO tbl2 (val1, val2) VALUES (1, 1); } +step "s1_insert_tbl2_3col" { INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +step "s2_alter_tbl1_float" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; } +step "s2_alter_tbl1_char" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; } +step "s2_alter_tbl1_text" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE text; } +step "s2_alter_tbl1_boolean" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; } + +step "s2_alter_tbl1_add_int" { ALTER TABLE tbl1 ADD COLUMN val3 INTEGER; } +step "s2_alter_tbl1_add_float" { ALTER TABLE tbl1 ADD COLUMN val3 FLOAT; } +step "s2_alter_tbl1_add_char" { ALTER TABLE tbl1 ADD COLUMN val3 character varying; } +step "s2_alter_tbl1_add_boolean" { ALTER TABLE tbl1 ADD COLUMN val3 BOOLEAN; } +step "s2_alter_tbl1_add_text" { ALTER TABLE tbl1 ADD COLUMN val3 TEXT; } + +step "s2_alter_tbl2_float" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; } +step "s2_alter_tbl2_char" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; } +step "s2_alter_tbl2_text" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; } +step "s2_alter_tbl2_boolean" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; } +step "s2_alter_tbl2_text" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; } + +step "s2_alter_tbl2_add_int" { ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; } +step "s2_alter_tbl2_add_float" { ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; } +step "s2_alter_tbl2_add_char" { ALTER TABLE tbl2 ADD COLUMN val3 character varying; } +step "s2_alter_tbl2_add_boolean" { ALTER TABLE tbl2 ADD COLUMN val3 BOOLEAN; } +step "s2_alter_tbl2_add_text" { ALTER TABLE tbl2 ADD COLUMN val3 TEXT; } +step "s2_alter_tbl2_drop_3rd_col" { ALTER TABLE tbl2 DROP COLUMN val3; } +step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; } +step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; } +step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; } + +step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); } + + + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s2_alter_tbl1_boolean" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s1_commit" "s1_insert_tbl2" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_3rd_int" "s1_insert_tbl2_3col" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl1" "s1_commit" "s2_get_changes" diff --git a/contrib/test_decoding/specs/delayed_startup.spec b/contrib/test_decoding/specs/delayed_startup.spec new file mode 100644 index 00000000000..b7fe8148ce5 --- /dev/null +++ b/contrib/test_decoding/specs/delayed_startup.spec @@ -0,0 +1,24 @@ +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } +step "s1b" { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step "s1w" { INSERT INTO do_write DEFAULT VALUES; } +step "s1c" { COMMIT; } +session "s2" +setup { SET synchronous_commit=on; } +step "s2init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s2start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} + + +permutation "s1b" "s1w" "s2init" "s1c" "s2start" "s1b" "s1w" "s1c" "s2start" "s1b" "s1w" "s2start" "s1c" "s2start" diff --git a/contrib/test_decoding/specs/mxact.spec b/contrib/test_decoding/specs/mxact.spec new file mode 100644 index 00000000000..ea5b1aa2d67 --- /dev/null +++ b/contrib/test_decoding/specs/mxact.spec @@ -0,0 +1,38 @@ +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE IF EXISTS do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s0start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} +step "s0alter" {ALTER TABLE do_write ADD column ts timestamptz; } +step "s0w" { INSERT INTO do_write DEFAULT VALUES; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1begin" {BEGIN;} +step "s1sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; } +step "s1keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; } +step "s1commit" {COMMIT;} + +session "s2" +setup { SET synchronous_commit=on; } +step "s2begin" {BEGIN;} +step "s2sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; } +step "s2keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; } +step "s2commit" {COMMIT;} + +# test that we're handling an update-only mxact xmax correctly +permutation "s0init" "s0start" "s1begin" "s1sharepgclass" "s2begin" "s2sharepgclass" "s0w" "s0start" "s2commit" "s1commit" + +# test that we're handling an update-only mxact xmax correctly +permutation "s0init" "s0start" "s1begin" "s1keysharepgclass" "s2begin" "s2keysharepgclass" "s0alter" "s0w" "s0start" "s2commit" "s1commit" diff --git a/contrib/test_decoding/sql/binary.sql b/contrib/test_decoding/sql/binary.sql new file mode 100644 index 00000000000..619f00b3bc8 --- /dev/null +++ b/contrib/test_decoding/sql/binary.sql @@ -0,0 +1,14 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +-- succeeds, textual plugin, textual consumer +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0'); +-- fails, binary plugin, textual consumer +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '1'); +-- succeeds, textual plugin, binary consumer +SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '0'); +-- succeeds, binary plugin, binary consumer +SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '1'); + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql new file mode 100644 index 00000000000..b4807e991b7 --- /dev/null +++ b/contrib/test_decoding/sql/ddl.sql @@ -0,0 +1,337 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +-- fail because of an already existing slot +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +-- fail because of an invalid name +SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding'); + +-- fail twice because of an invalid parameter values +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nonexistant-option', 'frakbar'); +SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); + +-- succeed once +SELECT pg_drop_replication_slot('regression_slot'); +-- fail +SELECT pg_drop_replication_slot('regression_slot'); + +-- check that we're detecting a streaming rep slot used for logical decoding +SELECT 'init' FROM pg_create_physical_replication_slot('repl'); +SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('repl'); + + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +/* check whether status function reports us, only reproduceable columns */ +SELECT slot_name, plugin, slot_type, active, + NOT catalog_xmin IS NULL AS catalog_xmin_set, + xmin IS NULl AS data_xmin_not_set, + pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal +FROM pg_replication_slots; + +/* + * Check that changes are handled correctly when interleaved with ddl + */ +CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); +BEGIN; +INSERT INTO replication_example(somedata, text) VALUES (1, 1); +INSERT INTO replication_example(somedata, text) VALUES (1, 2); +COMMIT; + +ALTER TABLE replication_example ADD COLUMN bar int; + +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4); + +BEGIN; +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4); +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4); +INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL); +COMMIT; + +ALTER TABLE replication_example DROP COLUMN bar; +INSERT INTO replication_example(somedata, text) VALUES (3, 1); + +BEGIN; +INSERT INTO replication_example(somedata, text) VALUES (3, 2); +INSERT INTO replication_example(somedata, text) VALUES (3, 3); +COMMIT; + +ALTER TABLE replication_example RENAME COLUMN text TO somenum; + +INSERT INTO replication_example(somedata, somenum) VALUES (4, 1); + +-- collect all changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); +-- throw away changes, they contain oids +SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); + +BEGIN; +INSERT INTO replication_example(somedata, somenum) VALUES (6, 1); +ALTER TABLE replication_example ADD COLUMN zaphod1 int; +INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1); +ALTER TABLE replication_example ADD COLUMN zaphod2 int; +INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1); +INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2); +COMMIT; + +-- show changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +-- hide changes bc of oid visible in full table rewrites +CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); +INSERT INTO tr_unique(data) VALUES(10); +ALTER TABLE tr_unique RENAME TO tr_pkey; +ALTER TABLE tr_pkey ADD COLUMN id serial primary key; +SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +INSERT INTO tr_pkey(data) VALUES(1); +--show deletion with primary key +DELETE FROM tr_pkey; + +/* display results */ +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +/* + * check that disk spooling works + */ +BEGIN; +CREATE TABLE tr_etoomuch (id serial primary key, data int); +INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +DELETE FROM tr_etoomuch WHERE id < 5000; +UPDATE tr_etoomuch SET data = - data WHERE id > 5000; +COMMIT; + +/* display results, but hide most of the output */ +SELECT count(*), min(data), max(data) +FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0') +GROUP BY substring(data, 1, 24) +ORDER BY 1; + +/* + * check whether we decode subtransactions correctly in relation with each + * other + */ +CREATE TABLE tr_sub (id serial primary key, path text); + +-- toplevel, subtxn, toplevel, subtxn, subtxn +BEGIN; +INSERT INTO tr_sub(path) VALUES ('1-top-#1'); + +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('1-top-1-#1'); +INSERT INTO tr_sub(path) VALUES ('1-top-1-#2'); +RELEASE SAVEPOINT a; + +SAVEPOINT b; +SAVEPOINT c; +INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1'); +INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2'); +RELEASE SAVEPOINT c; +INSERT INTO tr_sub(path) VALUES ('1-top-2-#1'); +RELEASE SAVEPOINT b; +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +-- check that we handle xlog assignments correctly +BEGIN; +-- nest 80 subtxns +SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; +-- assign xid by inserting +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1'); +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2'); +INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3'); +RELEASE SAVEPOINT subtop; +INSERT INTO tr_sub(path) VALUES ('2-top-#1'); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +-- make sure rollbacked subtransactions aren't decoded +BEGIN; +INSERT INTO tr_sub(path) VALUES ('3-top-2-#1'); +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('3-top-2-1-#1'); +SAVEPOINT b; +INSERT INTO tr_sub(path) VALUES ('3-top-2-2-#1'); +ROLLBACK TO SAVEPOINT b; +INSERT INTO tr_sub(path) VALUES ('3-top-2-#2'); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +-- test whether a known, but not yet logged toplevel xact, followed by a +-- subxact commit is handled correctly +BEGIN; +SELECT txid_current() != 0; -- so no fixed xid apears in the outfile +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('4-top-1-#1'); +RELEASE SAVEPOINT a; +COMMIT; + +-- test whether a change in a subtransaction, in an unknown toplevel +-- xact is handled correctly. +BEGIN; +SAVEPOINT a; +INSERT INTO tr_sub(path) VALUES ('5-top-1-#1'); +COMMIT; + + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + + +/* + * Check whether treating a table as a catalog table works somewhat + */ +CREATE TABLE replication_metadata ( + id serial primary key, + relation name NOT NULL, + options text[] +) +WITH (user_catalog_table = true) +; +\d+ replication_metadata + +INSERT INTO replication_metadata(relation, options) +VALUES ('foo', ARRAY['a', 'b']); + +ALTER TABLE replication_metadata RESET (user_catalog_table); +\d+ replication_metadata + +INSERT INTO replication_metadata(relation, options) +VALUES ('bar', ARRAY['a', 'b']); + +ALTER TABLE replication_metadata SET (user_catalog_table = true); +\d+ replication_metadata + +INSERT INTO replication_metadata(relation, options) +VALUES ('blub', NULL); + +-- make sure rewrites don't work +ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int; +ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text; + +ALTER TABLE replication_metadata SET (user_catalog_table = false); +\d+ replication_metadata + +INSERT INTO replication_metadata(relation, options) +VALUES ('zaphod', NULL); + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +/* + * check whether we handle updates/deletes correct with & without a pkey + */ + +/* we should handle the case without a key at all more gracefully */ +CREATE TABLE table_without_key(id serial, data int); +INSERT INTO table_without_key(data) VALUES(1),(2); +DELETE FROM table_without_key WHERE data = 1; +-- won't log old keys +UPDATE table_without_key SET data = 3 WHERE data = 2; +UPDATE table_without_key SET id = -id; +UPDATE table_without_key SET id = -id; +-- should log the full old row now +ALTER TABLE table_without_key REPLICA IDENTITY FULL; +UPDATE table_without_key SET data = 3 WHERE data = 2; +UPDATE table_without_key SET id = -id; +UPDATE table_without_key SET id = -id; +DELETE FROM table_without_key WHERE data = 3; + +CREATE TABLE table_with_pkey(id serial primary key, data int); +INSERT INTO table_with_pkey(data) VALUES(1), (2); +DELETE FROM table_with_pkey WHERE data = 1; +-- should log the old pkey +UPDATE table_with_pkey SET data = 3 WHERE data = 2; +UPDATE table_with_pkey SET id = -id; +UPDATE table_with_pkey SET id = -id; +-- check that we log nothing despite having a pkey +ALTER TABLE table_without_key REPLICA IDENTITY NOTHING; +UPDATE table_with_pkey SET id = -id; +-- check that we log everything despite having a pkey +ALTER TABLE table_without_key REPLICA IDENTITY FULL; +UPDATE table_with_pkey SET id = -id; +DELETE FROM table_with_pkey WHERE data = 3; + +CREATE TABLE table_with_unique_not_null(id serial unique, data int); +ALTER TABLE table_with_unique_not_null ALTER COLUMN id SET NOT NULL; --already set +-- won't log anything, replica identity not setup +INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); +DELETE FROM table_with_unique_not_null WHERE data = 1; +UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; +UPDATE table_with_unique_not_null SET id = -id; +UPDATE table_with_unique_not_null SET id = -id; +DELETE FROM table_with_unique_not_null WHERE data = 3; +-- should log old key +ALTER TABLE table_with_unique_not_null REPLICA IDENTITY USING INDEX table_with_unique_not_null_id_key; +INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); +DELETE FROM table_with_unique_not_null WHERE data = 1; +UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; +UPDATE table_with_unique_not_null SET id = -id; +UPDATE table_with_unique_not_null SET id = -id; +DELETE FROM table_with_unique_not_null WHERE data = 3; + +-- check toast support +BEGIN; +CREATE SEQUENCE toasttable_rand_seq START 79 INCREMENT 1499; -- portable "random" +CREATE TABLE toasttable( + id serial primary key, + toasted_col1 text, + rand1 float8 DEFAULT nextval('toasttable_rand_seq'), + toasted_col2 text, + rand2 float8 DEFAULT nextval('toasttable_rand_seq') + ); +COMMIT; +-- uncompressed external toast data +INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); + +-- compressed external toast data +INSERT INTO toasttable(toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); + +-- update of existing column +UPDATE toasttable + SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) +WHERE id = 1; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); + +-- update of second column, first column unchanged +UPDATE toasttable + SET toasted_col2 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) +WHERE id = 1; + +-- make sure we decode correctly even if the toast table is gone +DROP TABLE toasttable; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +-- done, free logical replication slot +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); + +/* check that we aren't visible anymore now */ +SELECT * FROM pg_stat_replication; diff --git a/contrib/test_decoding/sql/decoding_in_xact.sql b/contrib/test_decoding/sql/decoding_in_xact.sql new file mode 100644 index 00000000000..2771afee7a4 --- /dev/null +++ b/contrib/test_decoding/sql/decoding_in_xact.sql @@ -0,0 +1,41 @@ +-- predictability +SET synchronous_commit = on; + +-- fail because we're creating a slot while in an xact with xid +BEGIN; +SELECT txid_current() = 0; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ROLLBACK; + +-- fail because we're creating a slot while in an subxact whose topxact has a xid +BEGIN; +SELECT txid_current() = 0; +SAVEPOINT barf; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ROLLBACK TO SAVEPOINT barf; +ROLLBACK; + +-- succeed, outside tx. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + +-- succeed, in tx without xid. +BEGIN; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +COMMIT; + +CREATE TABLE nobarf(id serial primary key, data text); +INSERT INTO nobarf(data) VALUES('1'); + +-- decoding works in transaction with xid +BEGIN; +SELECT txid_current() = 0; +-- don't show yet, haven't committed +INSERT INTO nobarf(data) VALUES('2'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +COMMIT; + +INSERT INTO nobarf(data) VALUES('3'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/permissions.sql b/contrib/test_decoding/sql/permissions.sql new file mode 100644 index 00000000000..39d70b56b00 --- /dev/null +++ b/contrib/test_decoding/sql/permissions.sql @@ -0,0 +1,69 @@ +-- predictability +SET synchronous_commit = on; + +-- setup +CREATE ROLE lr_normal; +CREATE ROLE lr_superuser SUPERUSER; +CREATE ROLE lr_replication REPLICATION; +CREATE TABLE lr_test(data text); + +-- superuser can control replication +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +INSERT INTO lr_test VALUES('lr_superuser_init'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); +RESET ROLE; + +-- replication user can control replication +SET ROLE lr_replication; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +INSERT INTO lr_test VALUES('lr_superuser_init'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); +RESET ROLE; + +-- plain user *can't* can control replication +SET ROLE lr_normal; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +INSERT INTO lr_test VALUES('lr_superuser_init'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); +RESET ROLE; + +-- replication users can drop superuser created slots +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +RESET ROLE; +SET ROLE lr_replication; +SELECT pg_drop_replication_slot('regression_slot'); +RESET ROLE; + +-- normal users can't drop existing slots +SET ROLE lr_superuser; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +RESET ROLE; +SET ROLE lr_normal; +SELECT pg_drop_replication_slot('regression_slot'); +RESET ROLE; + +-- all users can see existing slots +SET ROLE lr_superuser; +SELECT slot_name, plugin FROM pg_replication_slots; +RESET ROLE; + +SET ROLE lr_replication; +SELECT slot_name, plugin FROM pg_replication_slots; +RESET ROLE; + +SET ROLE lr_normal; +SELECT slot_name, plugin FROM pg_replication_slots; +RESET ROLE; + +-- cleanup +SELECT pg_drop_replication_slot('regression_slot'); + +DROP ROLE lr_normal; +DROP ROLE lr_superuser; +DROP ROLE lr_replication; +DROP TABLE lr_test; diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql new file mode 100644 index 00000000000..9a3dcbf8579 --- /dev/null +++ b/contrib/test_decoding/sql/rewrite.sql @@ -0,0 +1,62 @@ +-- predictability +SET synchronous_commit = on; + +DROP TABLE IF EXISTS replication_example; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); +INSERT INTO replication_example(somedata) VALUES (1); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); + +BEGIN; +INSERT INTO replication_example(somedata) VALUES (2); +ALTER TABLE replication_example ADD COLUMN testcolumn1 int; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (3, 1); +COMMIT; + +BEGIN; +INSERT INTO replication_example(somedata) VALUES (3); +ALTER TABLE replication_example ADD COLUMN testcolumn2 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn2) VALUES (4, 2, 1); +COMMIT; + +VACUUM FULL pg_am; +VACUUM FULL pg_amop; +VACUUM FULL pg_proc; +VACUUM FULL pg_opclass; +VACUUM FULL pg_type; +VACUUM FULL pg_index; +VACUUM FULL pg_database; + +-- repeated rewrites that fail +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +ROLLBACK; + +-- repeated rewrites that succeed +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +COMMIT; + + -- repeated rewrites in different transactions +VACUUM FULL pg_class; +VACUUM FULL pg_class; + +INSERT INTO replication_example(somedata, testcolumn1) VALUES (5, 3); + +BEGIN; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (6, 4); +ALTER TABLE replication_example ADD COLUMN testcolumn3 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5, 1); +COMMIT; + +-- make old files go away +CHECKPOINT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); + +DROP TABLE IF EXISTS replication_example; diff --git a/contrib/test_decoding/sql/toast.sql b/contrib/test_decoding/sql/toast.sql new file mode 100644 index 00000000000..943db9d2eed --- /dev/null +++ b/contrib/test_decoding/sql/toast.sql @@ -0,0 +1,51 @@ +-- predictability +SET synchronous_commit = on; + +DROP TABLE IF EXISTS xpto; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE SEQUENCE xpto_rand_seq START 79 INCREMENT 1499; -- portable "random" +CREATE TABLE xpto ( + id serial primary key, + toasted_col1 text, + rand1 float8 DEFAULT nextval('xpto_rand_seq'), + toasted_col2 text, + rand2 float8 DEFAULT nextval('xpto_rand_seq') +); + +-- uncompressed external toast data +INSERT INTO xpto (toasted_col1, toasted_col2) SELECT string_agg(g.i::text, ''), string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i); + +-- compressed external toast data +INSERT INTO xpto (toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); + +-- update of existing column +UPDATE xpto SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) WHERE id = 1; + +UPDATE xpto SET rand1 = 123.456 WHERE id = 1; + +DELETE FROM xpto WHERE id = 1; + +DROP TABLE IF EXISTS toasted_key; +CREATE TABLE toasted_key ( + id serial, + toasted_key text PRIMARY KEY, + toasted_col1 text, + toasted_col2 text +); + +ALTER TABLE toasted_key ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_key ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; + +INSERT INTO toasted_key(toasted_key, toasted_col1) VALUES(repeat('1234567890', 200), repeat('9876543210', 200)); + +-- test update of a toasted key without changing it +UPDATE toasted_key SET toasted_col2 = toasted_col1; +-- test update of a toasted key, changing it +UPDATE toasted_key SET toasted_key = toasted_key || '1'; + +DELETE FROM toasted_key; + +SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c new file mode 100644 index 00000000000..ea463fb2e7c --- /dev/null +++ b/contrib/test_decoding/test_decoding.c @@ -0,0 +1,404 @@ +/*------------------------------------------------------------------------- + * + * test_decoding.c + * example logical decoding output plugin + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_decoding/test_decoding.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/sysattr.h" + +#include "catalog/pg_class.h" +#include "catalog/pg_type.h" + +#include "nodes/parsenodes.h" + +#include "replication/output_plugin.h" +#include "replication/logical.h" + +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/relcache.h" +#include "utils/syscache.h" +#include "utils/typcache.h" + + +PG_MODULE_MAGIC; + +extern void _PG_init(void); +extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); + +typedef struct +{ + MemoryContext context; + bool include_xids; + bool include_timestamp; +} TestDecodingData; + +/* These must be available to pg_dlsym() */ +static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init); +static void pg_decode_shutdown(LogicalDecodingContext *ctx); +static void pg_decode_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pg_decode_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, Relation rel, + ReorderBufferChange *change); + +void +_PG_init(void) +{ + /* other plugins can perform things here */ +} + +/* specify output plugin callbacks */ +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + + cb->startup_cb = pg_decode_startup; + cb->begin_cb = pg_decode_begin_txn; + cb->change_cb = pg_decode_change; + cb->commit_cb = pg_decode_commit_txn; + cb->shutdown_cb = pg_decode_shutdown; +} + + +/* initialize this plugin */ +static void +pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) +{ + ListCell *option; + TestDecodingData *data; + + data = palloc(sizeof(TestDecodingData)); + data->context = AllocSetContextCreate(ctx->context, + "text conversion context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + data->include_xids = true; + data->include_timestamp = false; + + ctx->output_plugin_private = data; + + opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; + + foreach(option, ctx->output_plugin_options) + { + DefElem *elem = lfirst(option); + + Assert(elem->arg == NULL || IsA(elem->arg, String)); + + if (strcmp(elem->defname, "include-xids") == 0) + { + /* if option does not provide a value, it means its value is true */ + if (elem->arg == NULL) + data->include_xids = true; + else if (!parse_bool(strVal(elem->arg), &data->include_xids)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "include-timestamp") == 0) + { + if (elem->arg == NULL) + data->include_timestamp = true; + else if (!parse_bool(strVal(elem->arg), &data->include_timestamp)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "force-binary") == 0) + { + bool force_binary; + + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &force_binary)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + + if (force_binary) + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("option \"%s\" = \"%s\" is unknown", + elem->defname, + elem->arg ? strVal(elem->arg) : "(null)"))); + } + } +} + +/* cleanup this plugin's resources */ +static void +pg_decode_shutdown(LogicalDecodingContext *ctx) +{ + TestDecodingData *data = ctx->output_plugin_private; + + /* cleanup our own resources via memory context reset */ + MemoryContextDelete(data->context); +} + +/* BEGIN callback */ +static void +pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "BEGIN %u", txn->xid); + else + appendStringInfoString(ctx->out, "BEGIN"); + OutputPluginWrite(ctx, true); +} + +/* COMMIT callback */ +static void +pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "COMMIT %u", txn->xid); + else + appendStringInfoString(ctx->out, "COMMIT"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* + * Print literal `outputstr' already represented as string of type `typid' + * into stringbuf `s'. + * + * Some builtin types aren't quoted, the rest is quoted. Escaping is done as + * if standard_conforming_strings were enabled. + */ +static void +print_literal(StringInfo s, Oid typid, char *outputstr) +{ + const char *valptr; + + switch (typid) + { + case INT2OID: + case INT4OID: + case INT8OID: + case OIDOID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + /* NB: We don't care about Inf, NaN et al. */ + appendStringInfoString(s, outputstr); + break; + + case BITOID: + case VARBITOID: + appendStringInfo(s, "B'%s'", outputstr); + break; + + case BOOLOID: + if (strcmp(outputstr, "t") == 0) + appendStringInfoString(s, "true"); + else + appendStringInfoString(s, "false"); + break; + + default: + appendStringInfoChar(s, '\''); + for (valptr = outputstr; *valptr; valptr++) + { + char ch = *valptr; + + if (SQL_STR_DOUBLE(ch, false)) + appendStringInfoChar(s, ch); + appendStringInfoChar(s, ch); + } + appendStringInfoChar(s, '\''); + break; + } +} + +/* print the tuple 'tuple' into the StringInfo s */ +static void +tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls) +{ + int natt; + Oid oid; + + /* print oid of tuple, it's not included in the TupleDesc */ + if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid) + { + appendStringInfo(s, " oid[oid]:%u", oid); + } + + /* print all columns individually */ + for (natt = 0; natt < tupdesc->natts; natt++) + { + Form_pg_attribute attr; /* the attribute itself */ + Oid typid; /* type of current attribute */ + Oid typoutput; /* output function */ + bool typisvarlena; + Datum origval; /* possibly toasted Datum */ + bool isnull; /* column is null? */ + + attr = tupdesc->attrs[natt]; + + /* + * don't print dropped columns, we can't be sure everything is + * available for them + */ + if (attr->attisdropped) + continue; + + /* + * Don't print system columns, oid will already have been printed if + * present. + */ + if (attr->attnum < 0) + continue; + + typid = attr->atttypid; + + /* get Datum from tuple */ + origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); + + if (isnull && skip_nulls) + continue; + + /* print attribute name */ + appendStringInfoChar(s, ' '); + appendStringInfoString(s, quote_identifier(NameStr(attr->attname))); + + /* print attribute type */ + appendStringInfoChar(s, '['); + appendStringInfoString(s, format_type_be(typid)); + appendStringInfoChar(s, ']'); + + /* query output function */ + getTypeOutputInfo(typid, + &typoutput, &typisvarlena); + + /* print separator */ + appendStringInfoChar(s, ':'); + + /* print data */ + if (isnull) + appendStringInfoString(s, "null"); + else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) + appendStringInfoString(s, "unchanged-toast-datum"); + else if (!typisvarlena) + print_literal(s, typid, + OidOutputFunctionCall(typoutput, origval)); + else + { + Datum val; /* definitely detoasted Datum */ + val = PointerGetDatum(PG_DETOAST_DATUM(origval)); + print_literal(s, typid, OidOutputFunctionCall(typoutput, val)); + } + } +} + +/* + * callback for individual changed tuples + */ +static void +pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + TestDecodingData *data; + Form_pg_class class_form; + TupleDesc tupdesc; + MemoryContext old; + + data = ctx->output_plugin_private; + class_form = RelationGetForm(relation); + tupdesc = RelationGetDescr(relation); + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "table "); + appendStringInfoString(ctx->out, + quote_qualified_identifier( + get_namespace_name( + get_rel_namespace(RelationGetRelid(relation))), + NameStr(class_form->relname))); + appendStringInfoString(ctx->out, ":"); + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + appendStringInfoString(ctx->out, " INSERT:"); + if (change->tp.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->tp.newtuple->tuple, + false); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + appendStringInfoString(ctx->out, " UPDATE:"); + if (change->tp.oldtuple != NULL) + { + appendStringInfoString(ctx->out, " old-key:"); + tuple_to_stringinfo(ctx->out, tupdesc, + &change->tp.oldtuple->tuple, + true); + appendStringInfoString(ctx->out, " new-tuple:"); + } + + if (change->tp.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->tp.newtuple->tuple, + false); + break; + case REORDER_BUFFER_CHANGE_DELETE: + appendStringInfoString(ctx->out, " DELETE:"); + + /* if there was no PK, we only know that a delete happened */ + if (change->tp.oldtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + /* In DELETE, only the replica identity is present; display that */ + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->tp.oldtuple->tuple, + true); + break; + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} |
