summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorBruce Momjian <bruce@momjian.us>2014-05-06 12:12:18 -0400
committerBruce Momjian <bruce@momjian.us>2014-05-06 12:12:18 -0400
commit0a7832005792fa6dad171f9cadb8d587fe0dd800 (patch)
tree365cfc42c521a52607e41394b08ef44d338d8fc1 /src/backend/replication/logical/logical.c
parentfb85cd4320414c3f6e9c8bc69ec944200ae1e493 (diff)
pgindent run for 9.4
This includes removing tabs after periods in C comments, which was applied to back branches, so this change should not effect backpatching.
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c135
1 files changed, 73 insertions, 62 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d08b50da39..438a3fb152d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -8,21 +8,21 @@
* src/backend/replication/logical/logical.c
*
* NOTES
- * This file coordinates interaction between the various modules that
- * together provide logical decoding, primarily by providing so
- * called LogicalDecodingContexts. The goal is to encapsulate most of the
- * internal complexity for consumers of logical decoding, so they can
- * create and consume a changestream with a low amount of code. Builtin
- * consumers are the walsender and SQL SRF interface, but it's possible to
- * add further ones without changing core code, e.g. to consume changes in
- * a bgworker.
+ * This file coordinates interaction between the various modules that
+ * together provide logical decoding, primarily by providing so
+ * called LogicalDecodingContexts. The goal is to encapsulate most of the
+ * internal complexity for consumers of logical decoding, so they can
+ * create and consume a changestream with a low amount of code. Builtin
+ * consumers are the walsender and SQL SRF interface, but it's possible to
+ * add further ones without changing core code, e.g. to consume changes in
+ * a bgworker.
*
- * The idea is that a consumer provides three callbacks, one to read WAL,
- * one to prepare a data write, and a final one for actually writing since
- * their implementation depends on the type of consumer. Check
- * logicalfuncs.c for an example implementation of a fairly simple consumer
- * and a implementation of a WAL reading callback that's suitable for
- * simple consumers.
+ * The idea is that a consumer provides three callbacks, one to read WAL,
+ * one to prepare a data write, and a final one for actually writing since
+ * their implementation depends on the type of consumer. Check
+ * logicalfuncs.c for an example implementation of a fairly simple consumer
+ * and a implementation of a WAL reading callback that's suitable for
+ * simple consumers.
*-------------------------------------------------------------------------
*/
@@ -56,13 +56,13 @@ typedef struct LogicalErrorCallbackState
/* wrappers around output plugin callbacks */
static void output_plugin_error_callback(void *arg);
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
- bool is_init);
+ bool is_init);
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn);
+ XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change);
+ Relation relation, ReorderBufferChange *change);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
@@ -90,18 +90,18 @@ CheckLogicalDecodingRequirements(void)
*
* There's basically three things missing to allow this:
* 1) We need to be able to correctly and quickly identify the timeline a
- * LSN belongs to
+ * LSN belongs to
* 2) We need to force hot_standby_feedback to be enabled at all times so
- * the primary cannot remove rows we need.
+ * the primary cannot remove rows we need.
* 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
+ * dbase_redo. There can't be any active ones due to HS recovery
+ * conflicts, so that should be relatively easy.
* ----
*/
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while in recovery")));
+ errmsg("logical decoding cannot be used while in recovery")));
}
/*
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
LogicalOutputPluginWriterWrite do_write)
{
ReplicationSlot *slot;
- MemoryContext context, old_context;
+ MemoryContext context,
+ old_context;
LogicalDecodingContext *ctx;
/* shorter lines... */
@@ -133,7 +134,10 @@ StartupDecodingContext(List *output_plugin_options,
ctx->context = context;
- /* (re-)load output plugins, so we detect a bad (removed) output plugin now. */
+ /*
+ * (re-)load output plugins, so we detect a bad (removed) output plugin
+ * now.
+ */
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
/*
@@ -195,10 +199,10 @@ CreateInitDecodingContext(char *plugin,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
{
- TransactionId xmin_horizon = InvalidTransactionId;
+ TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
LogicalDecodingContext *ctx;
- MemoryContext old_context;
+ MemoryContext old_context;
/* shorter lines... */
slot = MyReplicationSlot;
@@ -219,8 +223,8 @@ CreateInitDecodingContext(char *plugin,
if (slot->data.database != MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" was not created in this database",
- NameStr(slot->data.name))));
+ errmsg("replication slot \"%s\" was not created in this database",
+ NameStr(slot->data.name))));
if (IsTransactionState() &&
GetTopTransactionIdIfAny() != InvalidTransactionId)
@@ -252,9 +256,9 @@ CreateInitDecodingContext(char *plugin,
*/
if (!RecoveryInProgress())
{
- XLogRecPtr flushptr;
+ XLogRecPtr flushptr;
- /* start at current insert position*/
+ /* start at current insert position */
slot->data.restart_lsn = GetXLogInsertRecPtr();
/* make sure we have enough information to start */
@@ -307,8 +311,8 @@ CreateInitDecodingContext(char *plugin,
LWLockRelease(ProcArrayLock);
/*
- * tell the snapshot builder to only assemble snapshot once reaching
- * the a running_xact's record with the respective xmin.
+ * tell the snapshot builder to only assemble snapshot once reaching the a
+ * running_xact's record with the respective xmin.
*/
xmin_horizon = slot->data.catalog_xmin;
@@ -316,7 +320,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- read_page, prepare_write, do_write);
+ read_page, prepare_write, do_write);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -352,7 +356,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
- MemoryContext old_context;
+ MemoryContext old_context;
/* shorter lines... */
slot = MyReplicationSlot;
@@ -370,8 +374,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
if (slot->data.database != MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- (errmsg("replication slot \"%s\" was not created in this database",
- NameStr(slot->data.name)))));
+ (errmsg("replication slot \"%s\" was not created in this database",
+ NameStr(slot->data.name)))));
if (start_lsn == InvalidXLogRecPtr)
{
@@ -385,14 +389,14 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* pretty common for a client to acknowledge a LSN it doesn't have to
* do anything for, and thus didn't store persistently, because the
* xlog records didn't result in anything relevant for logical
- * decoding. Clients have to be able to do that to support
- * synchronous replication.
+ * decoding. Clients have to be able to do that to support synchronous
+ * replication.
*/
start_lsn = slot->data.confirmed_flush;
elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
- (uint32)(start_lsn >> 32), (uint32)start_lsn,
- (uint32)(slot->data.confirmed_flush >> 32),
- (uint32)slot->data.confirmed_flush);
+ (uint32) (start_lsn >> 32), (uint32) start_lsn,
+ (uint32) (slot->data.confirmed_flush >> 32),
+ (uint32) slot->data.confirmed_flush);
}
ctx = StartupDecodingContext(output_plugin_options,
@@ -409,10 +413,10 @@ CreateDecodingContext(XLogRecPtr start_lsn,
(errmsg("starting logical decoding for slot %s",
NameStr(slot->data.name)),
errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
- (uint32)(slot->data.confirmed_flush >> 32),
- (uint32)slot->data.confirmed_flush,
- (uint32)(slot->data.restart_lsn >> 32),
- (uint32)slot->data.restart_lsn)));
+ (uint32) (slot->data.confirmed_flush >> 32),
+ (uint32) slot->data.confirmed_flush,
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn)));
return ctx;
}
@@ -438,8 +442,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
startptr = ctx->slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32)(ctx->slot->data.restart_lsn >> 32),
- (uint32)ctx->slot->data.restart_lsn);
+ (uint32) (ctx->slot->data.restart_lsn >> 32),
+ (uint32) ctx->slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -543,14 +547,15 @@ static void
output_plugin_error_callback(void *arg)
{
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
+
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
- (uint32)(state->report_location >> 32),
- (uint32)state->report_location);
+ (uint32) (state->report_location >> 32),
+ (uint32) state->report_location);
else
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
NameStr(state->ctx->slot->data.name),
@@ -643,7 +648,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void
commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -652,7 +657,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "commit";
- state.report_location = txn->final_lsn; /* beginning of commit record */
+ state.report_location = txn->final_lsn; /* beginning of commit record */
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
@@ -672,7 +677,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change)
+ Relation relation, ReorderBufferChange *change)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -690,6 +695,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
+
/*
* report this change's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
@@ -715,7 +721,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
void
LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
{
- bool updated_xmin = false;
+ bool updated_xmin = false;
ReplicationSlot *slot;
slot = MyReplicationSlot;
@@ -725,16 +731,17 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockAcquire(&slot->mutex);
/*
- * don't overwrite if we already have a newer xmin. This can
- * happen if we restart decoding in a slot.
+ * don't overwrite if we already have a newer xmin. This can happen if we
+ * restart decoding in a slot.
*/
if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
+
/*
- * If the client has already confirmed up to this lsn, we directly
- * can mark this as accepted. This can happen if we restart
- * decoding in a slot.
+ * If the client has already confirmed up to this lsn, we directly can
+ * mark this as accepted. This can happen if we restart decoding in a
+ * slot.
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
@@ -744,6 +751,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
/* our candidate can directly be used */
updated_xmin = true;
}
+
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly.
@@ -770,7 +778,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
void
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
{
- bool updated_lsn = false;
+ bool updated_lsn = false;
ReplicationSlot *slot;
slot = MyReplicationSlot;
@@ -781,13 +789,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
SpinLockAcquire(&slot->mutex);
- /* don't overwrite if have a newer restart lsn*/
+ /* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}
+
/*
- * We might have already flushed far enough to directly accept this lsn, in
- * this case there is no need to check for existing candidate LSNs
+ * We might have already flushed far enough to directly accept this lsn,
+ * in this case there is no need to check for existing candidate LSNs
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
@@ -797,6 +806,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
/* our candidate can directly be used */
updated_lsn = true;
}
+
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly. A missed
@@ -896,6 +906,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
}
+
/*
* Now the new xmin is safely on disk, we can let the global value
* advance. We do not take ProcArrayLock or similar since we only