summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c88
1 files changed, 88 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9bc3a2d8deb..934aa13f2d3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options,
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_sequence_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
@@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
@@ -1204,6 +1215,42 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
{
@@ -1509,6 +1556,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
+stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)