summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c203
1 files changed, 160 insertions, 43 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c3e54af2591..bbf3506be04 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -32,6 +32,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
#include "commands/trigger.h"
@@ -101,7 +102,7 @@ typedef struct SlotErrCallbackArg
} SlotErrCallbackArg;
static MemoryContext ApplyContext = NULL;
-static MemoryContext ApplyCacheContext = NULL;
+MemoryContext ApplyCacheContext = NULL;
WalReceiverConn *wrconn = NULL;
@@ -109,6 +110,7 @@ Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
bool in_remote_transaction = false;
+static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -117,6 +119,30 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
/*
+ * Should this worker apply changes for given relation.
+ *
+ * This is mainly needed for initial relation data sync as that runs in
+ * separate worker process running in parallel and we need some way to skip
+ * changes coming to the main apply worker during the sync of a table.
+ *
+ * Note we need to do smaller or equals comparison for SYNCDONE state because
+ * it might hold position of end of intitial slot consistent point WAL
+ * record + 1 (ie start of next record) and next record can be COMMIT of
+ * transaction we are now processing (which is what we set remote_final_lsn
+ * to in apply_handle_begin).
+ */
+static bool
+should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
+{
+ if (am_tablesync_worker())
+ return MyLogicalRepWorker->relid == rel->localreloid;
+ else
+ return (rel->state == SUBREL_STATE_READY ||
+ (rel->state == SUBREL_STATE_SYNCDONE &&
+ rel->statelsn <= remote_final_lsn));
+}
+
+/*
* Make sure that we started local transaction.
*
* Also switches to ApplyContext as necessary.
@@ -398,6 +424,8 @@ apply_handle_begin(StringInfo s)
replorigin_session_origin_timestamp = begin_data.committime;
replorigin_session_origin_lsn = begin_data.final_lsn;
+ remote_final_lsn = begin_data.final_lsn;
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -418,7 +446,10 @@ apply_handle_commit(StringInfo s)
Assert(commit_data.commit_lsn == replorigin_session_origin_lsn);
Assert(commit_data.committime == replorigin_session_origin_timestamp);
- if (IsTransactionState())
+ Assert(commit_data.commit_lsn == remote_final_lsn);
+
+ /* The synchronization worker runs in single transaction. */
+ if (IsTransactionState() && !am_tablesync_worker())
{
CommitTransactionCommand();
@@ -427,6 +458,9 @@ apply_handle_commit(StringInfo s)
in_remote_transaction = false;
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
}
@@ -442,7 +476,8 @@ apply_handle_origin(StringInfo s)
* ORIGIN message can only come inside remote transaction and before
* any actual writes.
*/
- if (!in_remote_transaction || IsTransactionState())
+ if (!in_remote_transaction ||
+ (IsTransactionState() && !am_tablesync_worker()))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order")));
@@ -515,6 +550,15 @@ apply_handle_insert(StringInfo s)
relid = logicalrep_read_insert(s, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ return;
+ }
/* Initialize the executor state. */
estate = create_estate_for_relation(rel);
@@ -607,6 +651,15 @@ apply_handle_update(StringInfo s)
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
&newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ return;
+ }
/* Check if we can do the update. */
check_relation_updatable(rel);
@@ -716,6 +769,15 @@ apply_handle_delete(StringInfo s)
relid = logicalrep_read_delete(s, &oldtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ return;
+ }
/* Check if we can do the delete. */
check_relation_updatable(rel);
@@ -927,10 +989,8 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
* Apply main loop.
*/
static void
-ApplyLoop(void)
+LogicalRepApplyLoop(XLogRecPtr last_received)
{
- XLogRecPtr last_received = InvalidXLogRecPtr;
-
/* Init the ApplyContext which we use for easier cleanup. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
"ApplyContext",
@@ -1014,15 +1074,18 @@ ApplyLoop(void)
}
else if (c == 'k')
{
- XLogRecPtr endpos;
+ XLogRecPtr end_lsn;
TimestampTz timestamp;
bool reply_requested;
- endpos = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
timestamp = pq_getmsgint64(&s);
reply_requested = pq_getmsgbyte(&s);
- send_feedback(endpos, reply_requested, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
+
+ send_feedback(last_received, reply_requested, false);
UpdateWorkerStats(last_received, timestamp, true);
}
/* other message types are purposefully ignored */
@@ -1030,6 +1093,9 @@ ApplyLoop(void)
len = walrcv_receive(wrconn, &buf, &fd);
}
+
+ /* confirm all writes at once */
+ send_feedback(last_received, false, false);
}
if (!in_remote_transaction)
@@ -1038,15 +1104,13 @@ ApplyLoop(void)
* If we didn't get any transactions for a while there might be
* unconsumed invalidation messages in the queue, consume them now.
*/
- StartTransactionCommand();
- /* Check for subscription change */
+ AcceptInvalidationMessages();
if (!MySubscriptionValid)
reread_subscription();
- CommitTransactionCommand();
- }
- /* confirm all writes at once */
- send_feedback(last_received, false, false);
+ /* Process any table synchronization changes. */
+ process_syncing_tables(last_received);
+ }
/* Cleanup the memory. */
MemoryContextResetAndDeleteChildren(ApplyContext);
@@ -1054,7 +1118,11 @@ ApplyLoop(void)
/* Check if we need to exit the streaming loop. */
if (endofstream)
+ {
+ TimeLineID tli;
+ walrcv_endstreaming(wrconn, &tli);
break;
+ }
/*
* Wait for more data or latch.
@@ -1222,6 +1290,14 @@ reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
+ bool started_tx = false;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
/* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
@@ -1319,6 +1395,9 @@ reread_subscription(void)
MemoryContextSwitchTo(oldctx);
+ if (started_tx)
+ CommitTransactionCommand();
+
MySubscriptionValid = true;
}
@@ -1339,11 +1418,8 @@ ApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetObjectId(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
- RepOriginId originid;
XLogRecPtr origin_startpos;
- char *err;
- int server_version;
- TimeLineID startpointTLI;
+ char *myslotname;
WalRcvStreamOptions options;
/* Attach to slot */
@@ -1402,49 +1478,90 @@ ApplyWorkerMain(Datum main_arg)
subscription_change_cb,
(Datum) 0);
- ereport(LOG,
- (errmsg("logical replication apply for subscription \"%s\" has started",
- MySubscription->name)));
-
- /* Setup replication origin tracking. */
- snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- originid = replorigin_create(originname);
- replorigin_session_setup(originid);
- replorigin_session_origin = originid;
- origin_startpos = replorigin_session_get_progress(false);
+ if (am_tablesync_worker())
+ elog(LOG, "logical replication sync for subscription %s, table %s started",
+ MySubscription->name, get_rel_name(MyLogicalRepWorker->relid));
+ else
+ elog(LOG, "logical replication apply for subscription %s started",
+ MySubscription->name);
CommitTransactionCommand();
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
- wrconn = walrcv_connect(MySubscription->conninfo, true,
- MySubscription->name, &err);
- if (wrconn == NULL)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+
+ if (am_tablesync_worker())
+ {
+ char *syncslotname;
+
+ /* This is table synchroniation worker, call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+
+ /* The slot name needs to be allocated in permanent memory context. */
+ oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ myslotname = pstrdup(syncslotname);
+ MemoryContextSwitchTo(oldctx);
+
+ pfree(syncslotname);
+ }
+ else
+ {
+ /* This is main apply worker */
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+ int server_version;
+
+ myslotname = MySubscription->slotname;
+
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ origin_startpos = replorigin_session_get_progress(false);
+ CommitTransactionCommand();
+
+ wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname,
+ &err);
+ if (wrconn == NULL)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /*
+ * We don't really use the output identify_system for anything
+ * but it does some initializations on the upstream so let's still
+ * call it.
+ */
+ (void) walrcv_identify_system(wrconn, &startpointTLI,
+ &server_version);
+
+ }
/*
- * We don't really use the output identify_system for anything
- * but it does some initializations on the upstream so let's still
- * call it.
+ * Setup callback for syscache so that we know when something
+ * changes in the subscription relation state.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+ invalidate_syncing_table_states,
+ (Datum) 0);
/* Build logical replication streaming options. */
options.logical = true;
options.startpoint = origin_startpos;
- options.slotname = MySubscription->slotname;
+ options.slotname = myslotname;
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
- /* Start streaming from the slot. */
+ /* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options);
/* Run the main loop. */
- ApplyLoop();
+ LogicalRepApplyLoop(origin_startpos);
walrcv_disconnect(wrconn);