diff options
| -rw-r--r-- | contrib/test_decoding/expected/rewrite.out | 75 | ||||
| -rw-r--r-- | contrib/test_decoding/sql/rewrite.sql | 42 | ||||
| -rw-r--r-- | src/backend/access/heap/heapam.c | 11 | ||||
| -rw-r--r-- | src/backend/access/heap/rewriteheap.c | 19 | ||||
| -rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 25 | ||||
| -rw-r--r-- | src/include/access/heapam.h | 1 | 
6 files changed, 163 insertions, 10 deletions
| diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out index 4dcd4895438..3bf2afa9315 100644 --- a/contrib/test_decoding/expected/rewrite.out +++ b/contrib/test_decoding/expected/rewrite.out @@ -1,6 +1,61 @@  -- predictability  SET synchronous_commit = on;  DROP TABLE IF EXISTS replication_example; +-- Ensure there's tables with toast datums.  To do so, we dynamically +-- create a function returning a large textblob.  We want tables of +-- different kinds: mapped catalog table, unmapped catalog table, +-- shared catalog table and usertable. +CREATE FUNCTION exec(text) returns void language plpgsql volatile +  AS $f$ +    BEGIN +      EXECUTE $1; +    END; +$f$; +CREATE ROLE justforcomments NOLOGIN; +SELECT exec( +    format($outer$CREATE FUNCTION iamalongfunction() RETURNS TEXT IMMUTABLE LANGUAGE SQL AS $f$SELECT text %L$f$$outer$, +           (SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i)))); + exec  +------ +  +(1 row) + +SELECT exec( +    format($outer$COMMENT ON FUNCTION iamalongfunction() IS %L$outer$, +           iamalongfunction())); + exec  +------ +  +(1 row) + +SELECT exec( +    format($outer$COMMENT ON ROLE JUSTFORCOMMENTS IS %L$outer$, +           iamalongfunction())); + exec  +------ +  +(1 row) + +CREATE TABLE iamalargetable AS SELECT iamalongfunction() longfunctionoutput; +-- verify toast usage +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_proc'::regclass)) > 0; + ?column?  +---------- + t +(1 row) + +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_description'::regclass)) > 0; + ?column?  +---------- + t +(1 row) + +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_shdescription'::regclass)) > 0; + ?column?  +---------- + t +(1 row) +  SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');   ?column?   ---------- @@ -76,6 +131,23 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc   COMMIT  (15 rows) +-- trigger repeated rewrites of a system catalog with a toast table, +-- that previously was buggy: 20180914021046.oi7dm4ra3ot2g2kt@alap3.anarazel.de +VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (8, 6, 1); +VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (9, 7, 1); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +                                                                                       data                                                                                         +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.replication_example: INSERT: id[integer]:9 somedata[integer]:8 text[character varying]:null testcolumn1[integer]:6 testcolumn2[integer]:null testcolumn3[integer]:1 + COMMIT + BEGIN + table public.replication_example: INSERT: id[integer]:10 somedata[integer]:9 text[character varying]:null testcolumn1[integer]:7 testcolumn2[integer]:null testcolumn3[integer]:1 + COMMIT +(6 rows) +  SELECT pg_drop_replication_slot('regression_slot');   pg_drop_replication_slot   -------------------------- @@ -83,3 +155,6 @@ SELECT pg_drop_replication_slot('regression_slot');  (1 row)  DROP TABLE IF EXISTS replication_example; +DROP FUNCTION iamalongfunction(); +DROP FUNCTION exec(text); +DROP ROLE justforcomments; diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql index 8a7329423de..4271b82bead 100644 --- a/contrib/test_decoding/sql/rewrite.sql +++ b/contrib/test_decoding/sql/rewrite.sql @@ -3,6 +3,35 @@ SET synchronous_commit = on;  DROP TABLE IF EXISTS replication_example; +-- Ensure there's tables with toast datums.  To do so, we dynamically +-- create a function returning a large textblob.  We want tables of +-- different kinds: mapped catalog table, unmapped catalog table, +-- shared catalog table and usertable. +CREATE FUNCTION exec(text) returns void language plpgsql volatile +  AS $f$ +    BEGIN +      EXECUTE $1; +    END; +$f$; +CREATE ROLE justforcomments NOLOGIN; + +SELECT exec( +    format($outer$CREATE FUNCTION iamalongfunction() RETURNS TEXT IMMUTABLE LANGUAGE SQL AS $f$SELECT text %L$f$$outer$, +           (SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i)))); +SELECT exec( +    format($outer$COMMENT ON FUNCTION iamalongfunction() IS %L$outer$, +           iamalongfunction())); +SELECT exec( +    format($outer$COMMENT ON ROLE JUSTFORCOMMENTS IS %L$outer$, +           iamalongfunction())); +CREATE TABLE iamalargetable AS SELECT iamalongfunction() longfunctionoutput; + +-- verify toast usage +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_proc'::regclass)) > 0; +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_description'::regclass)) > 0; +SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_shdescription'::regclass)) > 0; + +  SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');  CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));  INSERT INTO replication_example(somedata) VALUES (1); @@ -57,6 +86,17 @@ COMMIT;  CHECKPOINT;  SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -SELECT pg_drop_replication_slot('regression_slot'); +-- trigger repeated rewrites of a system catalog with a toast table, +-- that previously was buggy: 20180914021046.oi7dm4ra3ot2g2kt@alap3.anarazel.de +VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (8, 6, 1); +VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (9, 7, 1); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT pg_drop_replication_slot('regression_slot');  DROP TABLE IF EXISTS replication_example; +DROP FUNCTION iamalongfunction(); +DROP FUNCTION exec(text); +DROP ROLE justforcomments; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 56f1d82f962..5305db94609 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2423,6 +2423,11 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)   * Speculatively inserted tuples behave as "value locks" of short duration,   * used to implement INSERT .. ON CONFLICT.   * + * HEAP_INSERT_NO_LOGICAL force-disables the emitting of logical decoding + * information for the tuple. This should solely be used during table rewrites + * where RelationIsLogicallyLogged(relation) is not yet accurate for the new + * relation. + *   * Note that most of these options will be applied when inserting into the   * heap's TOAST table, too, if the tuple requires any out-of-line data.  Only   * HEAP_INSERT_SPECULATIVE is explicitly ignored, as the toast data does not @@ -2551,7 +2556,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,  		 * page write, so make sure it's included even if we take a full-page  		 * image. (XXX We could alternatively store a pointer into the FPW).  		 */ -		if (RelationIsLogicallyLogged(relation)) +		if (RelationIsLogicallyLogged(relation) && +			!(options & HEAP_INSERT_NO_LOGICAL))  		{  			xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;  			bufflags |= REGBUF_KEEP_DATA; @@ -2716,6 +2722,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,  	bool		need_tuple_data = RelationIsLogicallyLogged(relation);  	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation); +	/* currently not needed (thus unsupported) for heap_multi_insert() */ +	AssertArg(!(options & HEAP_INSERT_NO_LOGICAL)); +  	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);  	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,  												   HEAP_DEFAULT_FILLFACTOR); diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 85f92973c95..71277889649 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -652,10 +652,23 @@ raw_heap_insert(RewriteState state, HeapTuple tup)  		heaptup = tup;  	}  	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) +	{ +		int options = HEAP_INSERT_SKIP_FSM; + +		if (!state->rs_use_wal) +			options |= HEAP_INSERT_SKIP_WAL; + +		/* +		 * The new relfilenode's relcache entrye doesn't have the necessary +		 * information to determine whether a relation should emit data for +		 * logical decoding.  Force it to off if necessary. +		 */ +		if (!RelationIsLogicallyLogged(state->rs_old_rel)) +			options |= HEAP_INSERT_NO_LOGICAL; +  		heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL, -										 HEAP_INSERT_SKIP_FSM | -										 (state->rs_use_wal ? -										  0 : HEAP_INSERT_SKIP_WAL)); +										 options); +	}  	else  		heaptup = tup; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7e349f4ca42..044d6baad7c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1527,8 +1527,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,  												change->data.tp.relnode.relNode);  					/* -					 * Catalog tuple without data, emitted while catalog was -					 * in the process of being rewritten. +					 * Mapped catalog tuple without data, emitted while +					 * catalog table was in the process of being rewritten. We +					 * can fail to look up the relfilenode, because the the +					 * relmapper has no "historic" view, in contrast to normal +					 * the normal catalog during decoding. Thus repeated +					 * rewrites can cause a lookup failure. That's OK because +					 * we do not decode catalog changes anyway. Normally such +					 * tuples would be skipped over below, but we can't +					 * identify whether the table should be logically logged +					 * without mapping the relfilenode to the oid.  					 */  					if (reloid == InvalidOid &&  						change->data.tp.newtuple == NULL && @@ -1590,10 +1598,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,  						 * transaction's changes. Otherwise it will get  						 * freed/reused while restoring spooled data from  						 * disk. +						 * +						 * But skip doing so if there's no tuple-data. That +						 * happens if a non-mapped system catalog with a toast +						 * table is rewritten.  						 */ -						dlist_delete(&change->node); -						ReorderBufferToastAppendChunk(rb, txn, relation, -													  change); +						if (change->data.tp.newtuple != NULL) +						{ +							dlist_delete(&change->node); +							ReorderBufferToastAppendChunk(rb, txn, relation, +														  change); +						}  					}  			change_done: diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index ca5cad7497f..40e153f71ad 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -29,6 +29,7 @@  #define HEAP_INSERT_SKIP_FSM	0x0002  #define HEAP_INSERT_FROZEN		0x0004  #define HEAP_INSERT_SPECULATIVE 0x0008 +#define HEAP_INSERT_NO_LOGICAL	0x0010  typedef struct BulkInsertStateData *BulkInsertState; | 
