summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/rmgrlist.h1
-rw-r--r--src/include/catalog/pg_proc.h4
-rw-r--r--src/include/replication/logicalfuncs.h2
-rw-r--r--src/include/replication/message.h41
-rw-r--r--src/include/replication/output_plugin.h13
-rw-r--r--src/include/replication/reorderbuffer.h22
-rw-r--r--src/include/replication/snapbuild.h2
7 files changed, 85 insertions, 0 deletions
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 3cfe6f7b546..a7a0ae224fd 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,3 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a0edf939b3a..6eab6c7c334 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5155,6 +5155,10 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
+DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
+DESCR("emit a binary logical decoding message");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1dfebde..554041405c4 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
#endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 00000000000..8b968d5288e
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ * Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size message_size; /* size of the message */
+ char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
+ * terminated prefix of length
+ * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+ size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE 0x00
+void logicalmsg_redo(XLogReaderState *record);
+void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 577b12ea2ef..3a2ca985fbe 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) (
XLogRecPtr commit_lsn);
/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
* Filter changes by origin.
*/
typedef bool (*LogicalDecodeFilterByOriginCB) (
@@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b52d06af928..4c54953a512 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -54,6 +54,7 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -98,6 +99,14 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
+ /* Message with arbitrary data. */
+ struct
+ {
+ char *prefix;
+ Size message_size;
+ char *message;
+ } msg;
+
/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
Snapshot snapshot;
@@ -274,6 +283,15 @@ typedef void (*ReorderBufferCommitCB) (
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
struct ReorderBuffer
{
/*
@@ -300,6 +318,7 @@ struct ReorderBuffer
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferCommitCB commit;
+ ReorderBufferMessageCB message;
/*
* Pointer that will be passed untouched to the callbacks.
@@ -350,6 +369,9 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 75955afd391..c4127a1cf75 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
extern void SnapBuildClearExportedSnapshot(void);
extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+ TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);