From 2c7ea57e56ca5f668c32d4266e0a3e45b455bef5 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 7 Apr 2022 18:13:13 +0200 Subject: Revert "Logical decoding of sequences" This reverts a sequence of commits, implementing features related to logical decoding and replication of sequences: - 0da92dc530c9251735fc70b20cd004d9630a1266 - 80901b32913ffa59bf157a4d88284b2b3a7511d9 - b779d7d8fdae088d70da5ed9fcd8205035676df3 - d5ed9da41d96988d905b49bebb273a9b2d6e2915 - a180c2b34de0989269fdb819bff241a249bf5380 - 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a - 2d2232933b02d9396113662e44dca5f120d6830e - 002c9dd97a0c874fd1693a570383e2dd38cd40d5 - 05843b1aa49df2ecc9b97c693b755bd1b6f856a9 The implementation has issues, mostly due to combining transactional and non-transactional behavior of sequences. It's not clear how this could be fixed, but it'll require reworking significant part of the patch. Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com --- src/backend/replication/logical/decode.c | 131 ------------------------------- 1 file changed, 131 deletions(-) (limited to 'src/backend/replication/logical/decode.c') diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c6ea7c98e15..6303647fe0f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,7 +42,6 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" -#include "commands/sequence.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); -static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)); } - -/* - * DecodeSeqTuple - * decode tuple describing the sequence increment - * - * Sequences are represented as a table with a single row, which gets updated - * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we - * simply copy it into the tuplebuf (similar to seq_redo). - */ -static void -DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) -{ - int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; - - Assert(datalen >= 0); - - tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - - ItemPointerSetInvalid(&tuple->tuple.t_self); - - tuple->tuple.t_tableOid = InvalidOid; - - memcpy(((char *) tuple->tuple.t_data), - data + sizeof(xl_seq_rec), - SizeofHeapTupleHeader); - - memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, - data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, - datalen); -} - -/* - * Handle sequence decode - * - * Decoding sequences is a bit tricky, because while most sequence actions - * are non-transactional (not subject to rollback), some need to be handled - * as transactional. - * - * By default, a sequence increment is non-transactional - we must not queue - * it in a transaction as other changes, because the transaction might get - * rolled back and we'd discard the increment. The downstream would not be - * notified about the increment, which is wrong. - * - * On the other hand, the sequence may be created in a transaction. In this - * case we *should* queue the change as other changes in the transaction, - * because we don't want to send the increments for unknown sequence to the - * plugin - it might get confused about which sequence it's related to etc. - */ -void -sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) -{ - SnapBuild *builder = ctx->snapshot_builder; - ReorderBufferTupleBuf *tuplebuf; - RelFileNode target_node; - XLogReaderState *r = buf->record; - char *tupledata = NULL; - Size tuplelen; - Size datalen = 0; - TransactionId xid = XLogRecGetXid(r); - uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; - xl_seq_rec *xlrec; - Snapshot snapshot; - RepOriginId origin_id = XLogRecGetOrigin(r); - bool transactional; - - /* only decode changes flagged with XLOG_SEQ_LOG */ - if (info != XLOG_SEQ_LOG) - elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); - - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - - /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. - */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) - return; - - /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); - if (target_node.dbNode != ctx->slot->data.database) - return; - - /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) - return; - - tupledata = XLogRecGetData(r); - datalen = XLogRecGetDataLen(r); - tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); - - /* extract the WAL record, with "created" flag */ - xlrec = (xl_seq_rec *) XLogRecGetData(r); - - /* XXX how could we have sequence change without data? */ - if(!datalen || !tupledata) - return; - - tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); - DecodeSeqTuple(tupledata, datalen, tuplebuf); - - /* - * Should we handle the sequence increment as transactional or not? - * - * If the sequence was created in a still-running transaction, treat - * it as transactional and queue the increments. Otherwise it needs - * to be treated as non-transactional, in which case we send it to - * the plugin right away. - */ - transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, - target_node, - xlrec->created); - - /* Skip the change if already processed (per the snapshot). */ - if (transactional && - !SnapBuildProcessChange(builder, xid, buf->origptr)) - return; - else if (!transactional && - (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || - SnapBuildXactNeedsSkip(builder, buf->origptr))) - return; - - /* Queue the increment (or send immediately if not transactional). */ - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); - ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, - origin_id, target_node, transactional, - xlrec->created, tuplebuf); -} -- cgit v1.2.3