diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
| -rw-r--r-- | src/backend/replication/logical/logical.c | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9bc3a2d8deb..934aa13f2d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1204,6 +1215,42 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, + last_value, log_cnt, is_called); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) { @@ -1509,6 +1556,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, + last_value, log_cnt, is_called); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) |
