summaryrefslogtreecommitdiff
path: root/contrib/test_decoding
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-04-16 07:34:43 +0530
committerAmit Kapila <akapila@postgresql.org>2021-04-16 07:34:43 +0530
commitf5fc2f5b23d1b1dff60f8ca5dc211161df47eda4 (patch)
tree440595580ef83eac4609e3cd74c2fe43bc73f28b /contrib/test_decoding
parent1bf946bd43e545b86e567588b791311fe4e36a8c (diff)
Add information of total data processed to replication slot stats.
This adds the statistics about total transactions count and total transaction data logically sent to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats. Suggested-by: Andres Freund Author: Vignesh C and Amit Kapila Reviewed-by: Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/stats.out79
-rw-r--r--contrib/test_decoding/sql/stats.sql48
-rw-r--r--contrib/test_decoding/t/001_repl_stats.pl76
4 files changed, 168 insertions, 37 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c5e28ce5cca..9a31e0b8795 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
# typical installcheck users do not have (e.g. buildfarm clients).
NO_INSTALLCHECK = 1
+TAP_TESTS = 1
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index bca36fa9030..bc8e601eab6 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
@@ -16,12 +16,25 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
(1 row)
-- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
(1 row)
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | 0 | 0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | 0 | 0 | 0 | 0
(1 row)
-- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
5002
(1 row)
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats
+-----------------------
+
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot
+--------------------------------
+
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f | f | t | t
(1 row)
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
(1 row)
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 51294e48e87..8c34aeced1d 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
@@ -14,12 +14,25 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
-- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644
index 00000000000..11b6cd9b9c7
--- /dev/null
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -0,0 +1,76 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+ "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
+]);
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+]);
+
+# Wait for the statistics to be updated.
+$node->poll_query_until(
+ 'postgres', qq[
+ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
+ WHERE slot_name ~ 'regression_slot'
+ AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+ "SELECT slot_name, total_txns > 0 AS total_txn,
+ total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+ ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;