| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
 | -- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-- fail because of an already existing slot
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-- fail because of an invalid name
SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
-- fail twice because of an invalid parameter values
SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar');
SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nonexistant-option', 'frakbar');
SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar');
-- succeed once
SELECT pg_drop_replication_slot('regression_slot');
-- fail
SELECT pg_drop_replication_slot('regression_slot');
-- check that we're detecting a streaming rep slot used for logical decoding
SELECT 'init' FROM pg_create_physical_replication_slot('repl');
SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('repl');
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
/* check whether status function reports us, only reproduceable columns */
SELECT slot_name, plugin, slot_type, active,
    NOT catalog_xmin IS NULL AS catalog_xmin_set,
    xmin IS NULl  AS data_xmin_not_set,
    pg_wal_lsn_diff(restart_lsn, '0/01000000') > 0 AS some_wal
FROM pg_replication_slots;
/*
 * Check that changes are handled correctly when interleaved with ddl
 */
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
BEGIN;
INSERT INTO replication_example(somedata, text) VALUES (1, 1);
INSERT INTO replication_example(somedata, text) VALUES (1, 2);
COMMIT;
ALTER TABLE replication_example ADD COLUMN bar int;
INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4);
BEGIN;
INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4);
INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4);
INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL);
COMMIT;
ALTER TABLE replication_example DROP COLUMN bar;
INSERT INTO replication_example(somedata, text) VALUES (3, 1);
BEGIN;
INSERT INTO replication_example(somedata, text) VALUES (3, 2);
INSERT INTO replication_example(somedata, text) VALUES (3, 3);
COMMIT;
ALTER TABLE replication_example RENAME COLUMN text TO somenum;
INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
-- collect all changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
-- check that this doesn't produce any changes from the heap rewrite
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
BEGIN;
INSERT INTO replication_example(somedata, somenum) VALUES (6, 1);
ALTER TABLE replication_example ADD COLUMN zaphod1 int;
INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1);
ALTER TABLE replication_example ADD COLUMN zaphod2 int;
INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1);
INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2);
COMMIT;
-- show changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- ON CONFLICT DO UPDATE support
BEGIN;
INSERT INTO replication_example(id, somedata, somenum) SELECT i, i, i FROM generate_series(-15, 15) i
  ON CONFLICT (id) DO UPDATE SET somenum = excluded.somenum + 1;
COMMIT;
/* display results */
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
INSERT INTO tr_unique(data) VALUES(10);
ALTER TABLE tr_unique RENAME TO tr_pkey;
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
INSERT INTO tr_pkey(data) VALUES(1);
--show deletion with primary key
DELETE FROM tr_pkey;
/* display results */
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
 * check that disk spooling works (also for logical messages)
 */
BEGIN;
CREATE TABLE tr_etoomuch (id serial primary key, data int);
INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
DELETE FROM tr_etoomuch WHERE id < 5000;
UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
CREATE TABLE tr_oddlength (id text primary key, data text);
INSERT INTO tr_oddlength VALUES('ab', 'foo');
COMMIT;
/* display results, but hide most of the output */
SELECT count(*), min(data), max(data)
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
GROUP BY substring(data, 1, 24)
ORDER BY 1,2;
-- check updates of primary keys work correctly
BEGIN;
CREATE TABLE spoolme AS SELECT g.i FROM generate_series(1, 5000) g(i);
UPDATE tr_etoomuch SET id = -id WHERE id = 5000;
UPDATE tr_oddlength SET id = 'x', data = 'quux';
UPDATE tr_oddlength SET id = 'yy', data = 'a';
DELETE FROM spoolme;
DROP TABLE spoolme;
COMMIT;
SELECT data
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
WHERE data ~ 'UPDATE';
-- check that a large, spooled, upsert works
INSERT INTO tr_etoomuch (id, data)
SELECT g.i, -g.i FROM generate_series(8000, 12000) g(i)
ON CONFLICT(id) DO UPDATE SET data = EXCLUDED.data;
SELECT substring(data, 1, 29), count(*)
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') WITH ORDINALITY
GROUP BY 1
ORDER BY min(ordinality);
/*
 * check whether we decode subtransactions correctly in relation with each
 * other
 */
CREATE TABLE tr_sub (id serial primary key, path text);
-- toplevel, subtxn, toplevel, subtxn, subtxn
BEGIN;
INSERT INTO tr_sub(path) VALUES ('1-top-#1');
SAVEPOINT a;
INSERT INTO tr_sub(path) VALUES ('1-top-1-#1');
INSERT INTO tr_sub(path) VALUES ('1-top-1-#2');
RELEASE SAVEPOINT a;
SAVEPOINT b;
SAVEPOINT c;
INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1');
INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2');
RELEASE SAVEPOINT c;
INSERT INTO tr_sub(path) VALUES ('1-top-2-#1');
RELEASE SAVEPOINT b;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- check that we handle xlog assignments correctly
BEGIN;
-- nest 80 subtxns
SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
-- assign xid by inserting
INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1');
INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2');
INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3');
RELEASE SAVEPOINT subtop;
INSERT INTO tr_sub(path) VALUES ('2-top-#1');
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- make sure rollbacked subtransactions aren't decoded
BEGIN;
INSERT INTO tr_sub(path) VALUES ('3-top-2-#1');
SAVEPOINT a;
INSERT INTO tr_sub(path) VALUES ('3-top-2-1-#1');
SAVEPOINT b;
INSERT INTO tr_sub(path) VALUES ('3-top-2-2-#1');
ROLLBACK TO SAVEPOINT b;
INSERT INTO tr_sub(path) VALUES ('3-top-2-#2');
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- test whether a known, but not yet logged toplevel xact, followed by a
-- subxact commit is handled correctly
BEGIN;
SELECT txid_current() != 0; -- so no fixed xid apears in the outfile
SAVEPOINT a;
INSERT INTO tr_sub(path) VALUES ('4-top-1-#1');
RELEASE SAVEPOINT a;
COMMIT;
-- test whether a change in a subtransaction, in an unknown toplevel
-- xact is handled correctly.
BEGIN;
SAVEPOINT a;
INSERT INTO tr_sub(path) VALUES ('5-top-1-#1');
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- check that DDL in aborted subtransactions handled correctly
CREATE TABLE tr_sub_ddl(data int);
BEGIN;
SAVEPOINT a;
ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE text;
INSERT INTO tr_sub_ddl VALUES ('blah-blah');
ROLLBACK TO SAVEPOINT a;
ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE bigint;
INSERT INTO tr_sub_ddl VALUES(43);
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
 * Check whether treating a table as a catalog table works somewhat
 */
CREATE TABLE replication_metadata (
    id serial primary key,
    relation name NOT NULL,
    options text[]
)
WITH (user_catalog_table = true)
;
\d+ replication_metadata
INSERT INTO replication_metadata(relation, options)
VALUES ('foo', ARRAY['a', 'b']);
ALTER TABLE replication_metadata RESET (user_catalog_table);
\d+ replication_metadata
INSERT INTO replication_metadata(relation, options)
VALUES ('bar', ARRAY['a', 'b']);
ALTER TABLE replication_metadata SET (user_catalog_table = true);
\d+ replication_metadata
INSERT INTO replication_metadata(relation, options)
VALUES ('blub', NULL);
-- make sure rewrites don't work
ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
ALTER TABLE replication_metadata SET (user_catalog_table = false);
\d+ replication_metadata
INSERT INTO replication_metadata(relation, options)
VALUES ('zaphod', NULL);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
 * check whether we handle updates/deletes correct with & without a pkey
 */
/* we should handle the case without a key at all more gracefully */
CREATE TABLE table_without_key(id serial, data int);
INSERT INTO table_without_key(data) VALUES(1),(2);
DELETE FROM table_without_key WHERE data = 1;
-- won't log old keys
UPDATE table_without_key SET data = 3 WHERE data = 2;
UPDATE table_without_key SET id = -id;
UPDATE table_without_key SET id = -id;
-- should log the full old row now
ALTER TABLE table_without_key REPLICA IDENTITY FULL;
UPDATE table_without_key SET data = 3 WHERE data = 2;
UPDATE table_without_key SET id = -id;
UPDATE table_without_key SET id = -id;
-- ensure that FULL correctly deals with new columns
ALTER TABLE table_without_key ADD COLUMN new_column text;
UPDATE table_without_key SET id = -id;
UPDATE table_without_key SET id = -id, new_column = 'someval';
DELETE FROM table_without_key WHERE data = 3;
CREATE TABLE table_with_pkey(id serial primary key, data int);
INSERT INTO table_with_pkey(data) VALUES(1), (2);
DELETE FROM table_with_pkey WHERE data = 1;
-- should log the old pkey
UPDATE table_with_pkey SET data = 3 WHERE data = 2;
UPDATE table_with_pkey SET id = -id;
UPDATE table_with_pkey SET id = -id;
-- check that we log nothing despite having a pkey
ALTER TABLE table_without_key REPLICA IDENTITY NOTHING;
UPDATE table_with_pkey SET id = -id;
-- check that we log everything despite having a pkey
ALTER TABLE table_without_key REPLICA IDENTITY FULL;
UPDATE table_with_pkey SET id = -id;
DELETE FROM table_with_pkey WHERE data = 3;
CREATE TABLE table_with_unique_not_null(id serial unique, data int);
ALTER TABLE table_with_unique_not_null ALTER COLUMN id SET NOT NULL; --already set
-- won't log anything, replica identity not setup
INSERT INTO table_with_unique_not_null(data) VALUES(1), (2);
DELETE FROM table_with_unique_not_null WHERE data = 1;
UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2;
UPDATE table_with_unique_not_null SET id = -id;
UPDATE table_with_unique_not_null SET id = -id;
DELETE FROM table_with_unique_not_null WHERE data = 3;
-- should log old key
ALTER TABLE table_with_unique_not_null REPLICA IDENTITY USING INDEX table_with_unique_not_null_id_key;
INSERT INTO table_with_unique_not_null(data) VALUES(1), (2);
DELETE FROM table_with_unique_not_null WHERE data = 1;
UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2;
UPDATE table_with_unique_not_null SET id = -id;
UPDATE table_with_unique_not_null SET id = -id;
DELETE FROM table_with_unique_not_null WHERE data = 3;
-- check toast support
BEGIN;
CREATE SEQUENCE toasttable_rand_seq START 79 INCREMENT 1499; -- portable "random"
CREATE TABLE toasttable(
       id serial primary key,
       toasted_col1 text,
       rand1 float8 DEFAULT nextval('toasttable_rand_seq'),
       toasted_col2 text,
       rand2 float8 DEFAULT nextval('toasttable_rand_seq')
       );
COMMIT;
-- uncompressed external toast data
INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i);
-- compressed external toast data
INSERT INTO toasttable(toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
-- update of existing column
UPDATE toasttable
    SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i))
WHERE id = 1;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i);
-- update of second column, first column unchanged
UPDATE toasttable
    SET toasted_col2 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i))
WHERE id = 1;
-- make sure we decode correctly even if the toast table is gone
DROP TABLE toasttable;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- done, free logical replication slot
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
/* check that the slot is gone */
SELECT * FROM pg_replication_slots;
 |