diff options
Diffstat (limited to 'src/include')
-rw-r--r-- | src/include/access/rmgrlist.h | 1 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.h | 4 | ||||
-rw-r--r-- | src/include/replication/logicalfuncs.h | 2 | ||||
-rw-r--r-- | src/include/replication/message.h | 41 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 13 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 22 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 2 |
7 files changed, 85 insertions, 0 deletions
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 3cfe6f7b546..a7a0ae224fd 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -46,3 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL) diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a0edf939b3a..6eab6c7c334 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5155,6 +5155,10 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ )); +DESCR("emit a textual logical decoding message"); +DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ )); +DESCR("emit a binary logical decoding message"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index c87a1dfebde..554041405c4 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS); +extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/message.h b/src/include/replication/message.h new file mode 100644 index 00000000000..8b968d5288e --- /dev/null +++ b/src/include/replication/message.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * message.h + * Exports from replication/logical/message.c + * + * Copyright (c) 2013-2016, PostgreSQL Global Development Group + * + * src/include/replication/message.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_MESSAGE_H +#define PG_LOGICAL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding message wal record. + */ +typedef struct xl_logical_message +{ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size message_size; /* size of the message */ + char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null + * terminated prefix of length + * prefix_size */ +} xl_logical_message; + +#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) + +extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, + size_t size, bool transactional); + +/* RMGR API*/ +#define XLOG_LOGICAL_MESSAGE 0x00 +void logicalmsg_redo(XLogReaderState *record); +void logicalmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_MESSAGE_H */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 577b12ea2ef..3a2ca985fbe 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) ( XLogRecPtr commit_lsn); /* + * Called for the generic logical decoding messages. + */ +typedef void (*LogicalDecodeMessageCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) ( @@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b52d06af928..4c54953a512 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -54,6 +54,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -98,6 +99,14 @@ typedef struct ReorderBufferChange ReorderBufferTupleBuf *newtuple; } tp; + /* Message with arbitrary data. */ + struct + { + char *prefix; + Size message_size; + char *message; + } msg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -274,6 +283,15 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* message callback signature */ +typedef void (*ReorderBufferMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + struct ReorderBuffer { /* @@ -300,6 +318,7 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferMessageCB message; /* * Pointer that will be passed untouched to the callbacks. @@ -350,6 +369,9 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 75955afd391..c4127a1cf75 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); extern void SnapBuildClearExportedSnapshot(void); extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); +extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, + TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); |