diff options
author | Bruce Momjian <bruce@momjian.us> | 2014-05-06 12:12:18 -0400 |
---|---|---|
committer | Bruce Momjian <bruce@momjian.us> | 2014-05-06 12:12:18 -0400 |
commit | 0a7832005792fa6dad171f9cadb8d587fe0dd800 (patch) | |
tree | 365cfc42c521a52607e41394b08ef44d338d8fc1 /src/backend/replication/logical/logical.c | |
parent | fb85cd4320414c3f6e9c8bc69ec944200ae1e493 (diff) |
pgindent run for 9.4
This includes removing tabs after periods in C comments, which was
applied to back branches, so this change should not effect backpatching.
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 135 |
1 files changed, 73 insertions, 62 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1d08b50da39..438a3fb152d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -8,21 +8,21 @@ * src/backend/replication/logical/logical.c * * NOTES - * This file coordinates interaction between the various modules that - * together provide logical decoding, primarily by providing so - * called LogicalDecodingContexts. The goal is to encapsulate most of the - * internal complexity for consumers of logical decoding, so they can - * create and consume a changestream with a low amount of code. Builtin - * consumers are the walsender and SQL SRF interface, but it's possible to - * add further ones without changing core code, e.g. to consume changes in - * a bgworker. + * This file coordinates interaction between the various modules that + * together provide logical decoding, primarily by providing so + * called LogicalDecodingContexts. The goal is to encapsulate most of the + * internal complexity for consumers of logical decoding, so they can + * create and consume a changestream with a low amount of code. Builtin + * consumers are the walsender and SQL SRF interface, but it's possible to + * add further ones without changing core code, e.g. to consume changes in + * a bgworker. * - * The idea is that a consumer provides three callbacks, one to read WAL, - * one to prepare a data write, and a final one for actually writing since - * their implementation depends on the type of consumer. Check - * logicalfuncs.c for an example implementation of a fairly simple consumer - * and a implementation of a WAL reading callback that's suitable for - * simple consumers. + * The idea is that a consumer provides three callbacks, one to read WAL, + * one to prepare a data write, and a final one for actually writing since + * their implementation depends on the type of consumer. Check + * logicalfuncs.c for an example implementation of a fairly simple consumer + * and a implementation of a WAL reading callback that's suitable for + * simple consumers. *------------------------------------------------------------------------- */ @@ -56,13 +56,13 @@ typedef struct LogicalErrorCallbackState /* wrappers around output plugin callbacks */ static void output_plugin_error_callback(void *arg); static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, - bool is_init); + bool is_init); static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); + Relation relation, ReorderBufferChange *change); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); @@ -90,18 +90,18 @@ CheckLogicalDecodingRequirements(void) * * There's basically three things missing to allow this: * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to + * LSN belongs to * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. + * the primary cannot remove rows we need. * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. + * dbase_redo. There can't be any active ones due to HS recovery + * conflicts, so that should be relatively easy. * ---- */ if (RecoveryInProgress()) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + errmsg("logical decoding cannot be used while in recovery"))); } /* @@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options, LogicalOutputPluginWriterWrite do_write) { ReplicationSlot *slot; - MemoryContext context, old_context; + MemoryContext context, + old_context; LogicalDecodingContext *ctx; /* shorter lines... */ @@ -133,7 +134,10 @@ StartupDecodingContext(List *output_plugin_options, ctx->context = context; - /* (re-)load output plugins, so we detect a bad (removed) output plugin now. */ + /* + * (re-)load output plugins, so we detect a bad (removed) output plugin + * now. + */ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); /* @@ -195,10 +199,10 @@ CreateInitDecodingContext(char *plugin, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) { - TransactionId xmin_horizon = InvalidTransactionId; + TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; LogicalDecodingContext *ctx; - MemoryContext old_context; + MemoryContext old_context; /* shorter lines... */ slot = MyReplicationSlot; @@ -219,8 +223,8 @@ CreateInitDecodingContext(char *plugin, if (slot->data.database != MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" was not created in this database", - NameStr(slot->data.name)))); + errmsg("replication slot \"%s\" was not created in this database", + NameStr(slot->data.name)))); if (IsTransactionState() && GetTopTransactionIdIfAny() != InvalidTransactionId) @@ -252,9 +256,9 @@ CreateInitDecodingContext(char *plugin, */ if (!RecoveryInProgress()) { - XLogRecPtr flushptr; + XLogRecPtr flushptr; - /* start at current insert position*/ + /* start at current insert position */ slot->data.restart_lsn = GetXLogInsertRecPtr(); /* make sure we have enough information to start */ @@ -307,8 +311,8 @@ CreateInitDecodingContext(char *plugin, LWLockRelease(ProcArrayLock); /* - * tell the snapshot builder to only assemble snapshot once reaching - * the a running_xact's record with the respective xmin. + * tell the snapshot builder to only assemble snapshot once reaching the a + * running_xact's record with the respective xmin. */ xmin_horizon = slot->data.catalog_xmin; @@ -316,7 +320,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -352,7 +356,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, { LogicalDecodingContext *ctx; ReplicationSlot *slot; - MemoryContext old_context; + MemoryContext old_context; /* shorter lines... */ slot = MyReplicationSlot; @@ -370,8 +374,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, if (slot->data.database != MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - (errmsg("replication slot \"%s\" was not created in this database", - NameStr(slot->data.name))))); + (errmsg("replication slot \"%s\" was not created in this database", + NameStr(slot->data.name))))); if (start_lsn == InvalidXLogRecPtr) { @@ -385,14 +389,14 @@ CreateDecodingContext(XLogRecPtr start_lsn, * pretty common for a client to acknowledge a LSN it doesn't have to * do anything for, and thus didn't store persistently, because the * xlog records didn't result in anything relevant for logical - * decoding. Clients have to be able to do that to support - * synchronous replication. + * decoding. Clients have to be able to do that to support synchronous + * replication. */ start_lsn = slot->data.confirmed_flush; elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding", - (uint32)(start_lsn >> 32), (uint32)start_lsn, - (uint32)(slot->data.confirmed_flush >> 32), - (uint32)slot->data.confirmed_flush); + (uint32) (start_lsn >> 32), (uint32) start_lsn, + (uint32) (slot->data.confirmed_flush >> 32), + (uint32) slot->data.confirmed_flush); } ctx = StartupDecodingContext(output_plugin_options, @@ -409,10 +413,10 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errmsg("starting logical decoding for slot %s", NameStr(slot->data.name)), errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X", - (uint32)(slot->data.confirmed_flush >> 32), - (uint32)slot->data.confirmed_flush, - (uint32)(slot->data.restart_lsn >> 32), - (uint32)slot->data.restart_lsn))); + (uint32) (slot->data.confirmed_flush >> 32), + (uint32) slot->data.confirmed_flush, + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn))); return ctx; } @@ -438,8 +442,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) startptr = ctx->slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32)(ctx->slot->data.restart_lsn >> 32), - (uint32)ctx->slot->data.restart_lsn); + (uint32) (ctx->slot->data.restart_lsn >> 32), + (uint32) ctx->slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -543,14 +547,15 @@ static void output_plugin_error_callback(void *arg) { LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg; + /* not all callbacks have an associated LSN */ if (state->report_location != InvalidXLogRecPtr) errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X", NameStr(state->ctx->slot->data.name), NameStr(state->ctx->slot->data.plugin), state->callback_name, - (uint32)(state->report_location >> 32), - (uint32)state->report_location); + (uint32) (state->report_location >> 32), + (uint32) state->report_location); else errcontext("slot \"%s\", output plugin \"%s\", in the %s callback", NameStr(state->ctx->slot->data.name), @@ -643,7 +648,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -652,7 +657,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "commit"; - state.report_location = txn->final_lsn; /* beginning of commit record */ + state.report_location = txn->final_lsn; /* beginning of commit record */ errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; @@ -672,7 +677,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) + Relation relation, ReorderBufferChange *change) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -690,6 +695,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + /* * report this change's lsn so replies from clients can give an up2date * answer. This won't ever be enough (and shouldn't be!) to confirm @@ -715,7 +721,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) { - bool updated_xmin = false; + bool updated_xmin = false; ReplicationSlot *slot; slot = MyReplicationSlot; @@ -725,16 +731,17 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) SpinLockAcquire(&slot->mutex); /* - * don't overwrite if we already have a newer xmin. This can - * happen if we restart decoding in a slot. + * don't overwrite if we already have a newer xmin. This can happen if we + * restart decoding in a slot. */ if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin)) { } + /* - * If the client has already confirmed up to this lsn, we directly - * can mark this as accepted. This can happen if we restart - * decoding in a slot. + * If the client has already confirmed up to this lsn, we directly can + * mark this as accepted. This can happen if we restart decoding in a + * slot. */ else if (current_lsn <= slot->data.confirmed_flush) { @@ -744,6 +751,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) /* our candidate can directly be used */ updated_xmin = true; } + /* * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. @@ -770,7 +778,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn) { - bool updated_lsn = false; + bool updated_lsn = false; ReplicationSlot *slot; slot = MyReplicationSlot; @@ -781,13 +789,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart SpinLockAcquire(&slot->mutex); - /* don't overwrite if have a newer restart lsn*/ + /* don't overwrite if have a newer restart lsn */ if (restart_lsn <= slot->data.restart_lsn) { } + /* - * We might have already flushed far enough to directly accept this lsn, in - * this case there is no need to check for existing candidate LSNs + * We might have already flushed far enough to directly accept this lsn, + * in this case there is no need to check for existing candidate LSNs */ else if (current_lsn <= slot->data.confirmed_flush) { @@ -797,6 +806,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart /* our candidate can directly be used */ updated_lsn = true; } + /* * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. A missed @@ -896,6 +906,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); } + /* * Now the new xmin is safely on disk, we can let the global value * advance. We do not take ProcArrayLock or similar since we only |