diff options
| author | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
| commit | 8d74fc96db5fd547e077bf9bf4c3b67f821d71cd (patch) | |
| tree | 3037345a7edabd025edcc5d9b431fb14f780e817 /src/test | |
| parent | 98105e53e0ab472b7721a3e8d7b9f1750a635120 (diff) | |
Add a view to show the stats of subscription workers.
This commit adds a new system view pg_stat_subscription_workers, that
shows information about any errors which occur during the application of
logical replication changes as well as during performing initial table
synchronization. The subscription statistics entries are removed when the
corresponding subscription is removed.
It also adds an SQL function pg_stat_reset_subscription_worker() to reset
single subscription errors.
The contents of this view can be used by an upcoming patch that skips the
particular transaction that conflicts with the existing data on the
subscriber.
This view can be extended in the future to track other xact related
statistics like the number of xacts committed/aborted for subscription
workers.
Author: Masahiko Sawada
Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/regress/expected/rules.out | 18 | ||||
| -rw-r--r-- | src/test/subscription/t/026_worker_stats.pl | 154 |
2 files changed, 172 insertions, 0 deletions
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2fa00a3c29a..b58b062b10d 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2094,6 +2094,24 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_time FROM (pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); +pg_stat_subscription_workers| SELECT w.subid, + s.subname, + w.subrelid, + w.last_error_relid, + w.last_error_command, + w.last_error_xid, + w.last_error_count, + w.last_error_message, + w.last_error_time + FROM ( SELECT pg_subscription.oid AS subid, + NULL::oid AS relid + FROM pg_subscription + UNION ALL + SELECT pg_subscription_rel.srsubid AS subid, + pg_subscription_rel.srrelid AS relid + FROM pg_subscription_rel) sr, + (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time) + JOIN pg_subscription s ON ((w.subid = s.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl new file mode 100644 index 00000000000..e64e0a74b87 --- /dev/null +++ b/src/test/subscription/t/026_worker_stats.pl @@ -0,0 +1,154 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for subscription error stats. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 5; + +# Test if the error reported on pg_stat_subscription_workers view is expected. +sub test_subscription_error +{ + my ($node, $relname, $xid, $expected_error, $msg) = @_; + + my $check_sql = qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_workers +WHERE last_error_relid = '$relname'::regclass]; + $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne ''; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', $check_sql, +) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0 +FROM pg_stat_subscription_workers +WHERE last_error_relid = '$relname'::regclass; +]); + is($result, $expected_error, $msg); +} + +# Create publisher node. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# The subscriber will enter an infinite error loop, so we don't want +# to overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + qq[ +wal_retrieve_retry_interval = 2s +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we +# create the same tables but with primary keys. Also, insert some data that +# will conflict with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); + +# Setup publications. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql( + 'postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;"); + +# There shouldn't be any subscription errors before starting logical replication. +my $result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(1) FROM pg_stat_subscription_workers"); +is($result, qq(0), 'check no subscription error'); + +# Create subscription. The table sync for test_tab2 on tap_sub will enter into +# infinite error loop due to violating the unique constraint. +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = off);"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Wait for initial table sync for test_tab1 to finish. +$node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT count(1) = 1 FROM pg_subscription_rel +WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's') +]) or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data. +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data are copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to +# violation of the unique constraint on test_tab1. +my $xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab1 VALUES (1); +SELECT pg_current_xact_id()::xid; +COMMIT; +]); +test_subscription_error($node_subscriber, 'test_tab1', $xid, + qq(tap_sub|INSERT|test_tab1|t), + 'check the error reported by the apply worker'); + +# Check the table sync worker's error in the view. +test_subscription_error($node_subscriber, 'test_tab2', '', + qq(tap_sub||test_tab2|t), + 'check the error reported by the table sync worker'); + +# Test for resetting subscription worker statistics. +# Truncate test_tab1 and test_tab2 so that applying changes and table sync can +# continue, respectively. +$node_subscriber->safe_psql( + 'postgres', + "TRUNCATE test_tab1, test_tab2;"); + +# Wait for the data to be replicated. +$node_subscriber->poll_query_until( + 'postgres', + "SELECT count(1) > 0 FROM test_tab1"); +$node_subscriber->poll_query_until( + 'postgres', + "SELECT count(1) > 0 FROM test_tab2"); + +# There shouldn't be any errors in the view after dropping the subscription. +$node_subscriber->safe_psql( + 'postgres', + "DROP SUBSCRIPTION tap_sub;"); +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(1) FROM pg_stat_subscription_workers"); +is($result, q(0), 'no error after dropping subscription'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); |
