summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2016-04-13 17:38:54 -0700
committerAndres Freund <andres@anarazel.de>2016-04-13 17:38:54 -0700
commitbe65eddd80093a923b091dc60776aa6f966d1f07 (patch)
treee8e58ef17ea0fddbf173b032039c99dd6ac92f09 /src
parent80abbeba23d466b6541cf95082a9e1f36704424e (diff)
Add required database and origin filtering for logical messages.
Logical messages, added in 3fe3511d05, during decoding failed to filter messages emitted in other databases and messages emitted "under" a replication origin the output plugin isn't interested in. Add tests to verify that both types of filtering actually work. While touching message.sql remove hunk obsoleted by d25379e. Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because 3fe3511d05 had omitted doing so. 3fe3511d05 additionally didn't bump catversion, but 7a542700d has done so since. Author: Petr Jelinek Reported-By: Andres Freund Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c23
-rw-r--r--src/backend/replication/logical/message.c6
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/replication/message.h1
4 files changed, 22 insertions, 10 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3e80c4a0d86..0cdb0b8a92b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r);
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
if (message->transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr))
return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size);
}
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
- if (ctx->callbacks.filter_by_origin_cb == NULL)
- return false;
-
- return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
/*
* Consolidated commit record handling between the different form of commit
* records.
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index 684f7998263..efcc25ae957 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -31,6 +31,8 @@
#include "postgres.h"
+#include "miscadmin.h"
+
#include "access/xact.h"
#include "catalog/indexing.h"
@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
GetCurrentTransactionId();
}
+ xlrec.dbId = MyDatabaseId;
xlrec.transactional = transactional;
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size;
@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
XLogRegisterData((char *) prefix, xlrec.prefix_size);
XLogRegisterData((char *) message, size);
+ /* allow origin filtering */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
}
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 749060f9c75..7089a1c48f7 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 8b968d5288e..23b9cdb268b 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -19,6 +19,7 @@
*/
typedef struct xl_logical_message
{
+ Oid dbId; /* database Oid emitted from */
bool transactional; /* is message transactional? */
Size prefix_size; /* length of prefix */
Size message_size; /* size of the message */