diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 203 |
1 files changed, 160 insertions, 43 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c3e54af2591..bbf3506be04 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -32,6 +32,7 @@ #include "catalog/namespace.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "commands/trigger.h" @@ -101,7 +102,7 @@ typedef struct SlotErrCallbackArg } SlotErrCallbackArg; static MemoryContext ApplyContext = NULL; -static MemoryContext ApplyCacheContext = NULL; +MemoryContext ApplyCacheContext = NULL; WalReceiverConn *wrconn = NULL; @@ -109,6 +110,7 @@ Subscription *MySubscription = NULL; bool MySubscriptionValid = false; bool in_remote_transaction = false; +static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -117,6 +119,30 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void reread_subscription(void); /* + * Should this worker apply changes for given relation. + * + * This is mainly needed for initial relation data sync as that runs in + * separate worker process running in parallel and we need some way to skip + * changes coming to the main apply worker during the sync of a table. + * + * Note we need to do smaller or equals comparison for SYNCDONE state because + * it might hold position of end of intitial slot consistent point WAL + * record + 1 (ie start of next record) and next record can be COMMIT of + * transaction we are now processing (which is what we set remote_final_lsn + * to in apply_handle_begin). + */ +static bool +should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) +{ + if (am_tablesync_worker()) + return MyLogicalRepWorker->relid == rel->localreloid; + else + return (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn)); +} + +/* * Make sure that we started local transaction. * * Also switches to ApplyContext as necessary. @@ -398,6 +424,8 @@ apply_handle_begin(StringInfo s) replorigin_session_origin_timestamp = begin_data.committime; replorigin_session_origin_lsn = begin_data.final_lsn; + remote_final_lsn = begin_data.final_lsn; + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -418,7 +446,10 @@ apply_handle_commit(StringInfo s) Assert(commit_data.commit_lsn == replorigin_session_origin_lsn); Assert(commit_data.committime == replorigin_session_origin_timestamp); - if (IsTransactionState()) + Assert(commit_data.commit_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) { CommitTransactionCommand(); @@ -427,6 +458,9 @@ apply_handle_commit(StringInfo s) in_remote_transaction = false; + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); } @@ -442,7 +476,8 @@ apply_handle_origin(StringInfo s) * ORIGIN message can only come inside remote transaction and before * any actual writes. */ - if (!in_remote_transaction || IsTransactionState()) + if (!in_remote_transaction || + (IsTransactionState() && !am_tablesync_worker())) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("ORIGIN message sent out of order"))); @@ -515,6 +550,15 @@ apply_handle_insert(StringInfo s) relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Initialize the executor state. */ estate = create_estate_for_relation(rel); @@ -607,6 +651,15 @@ apply_handle_update(StringInfo s) relid = logicalrep_read_update(s, &has_oldtup, &oldtup, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Check if we can do the update. */ check_relation_updatable(rel); @@ -716,6 +769,15 @@ apply_handle_delete(StringInfo s) relid = logicalrep_read_delete(s, &oldtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -927,10 +989,8 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) * Apply main loop. */ static void -ApplyLoop(void) +LogicalRepApplyLoop(XLogRecPtr last_received) { - XLogRecPtr last_received = InvalidXLogRecPtr; - /* Init the ApplyContext which we use for easier cleanup. */ ApplyContext = AllocSetContextCreate(TopMemoryContext, "ApplyContext", @@ -1014,15 +1074,18 @@ ApplyLoop(void) } else if (c == 'k') { - XLogRecPtr endpos; + XLogRecPtr end_lsn; TimestampTz timestamp; bool reply_requested; - endpos = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); timestamp = pq_getmsgint64(&s); reply_requested = pq_getmsgbyte(&s); - send_feedback(endpos, reply_requested, false); + if (last_received < end_lsn) + last_received = end_lsn; + + send_feedback(last_received, reply_requested, false); UpdateWorkerStats(last_received, timestamp, true); } /* other message types are purposefully ignored */ @@ -1030,6 +1093,9 @@ ApplyLoop(void) len = walrcv_receive(wrconn, &buf, &fd); } + + /* confirm all writes at once */ + send_feedback(last_received, false, false); } if (!in_remote_transaction) @@ -1038,15 +1104,13 @@ ApplyLoop(void) * If we didn't get any transactions for a while there might be * unconsumed invalidation messages in the queue, consume them now. */ - StartTransactionCommand(); - /* Check for subscription change */ + AcceptInvalidationMessages(); if (!MySubscriptionValid) reread_subscription(); - CommitTransactionCommand(); - } - /* confirm all writes at once */ - send_feedback(last_received, false, false); + /* Process any table synchronization changes. */ + process_syncing_tables(last_received); + } /* Cleanup the memory. */ MemoryContextResetAndDeleteChildren(ApplyContext); @@ -1054,7 +1118,11 @@ ApplyLoop(void) /* Check if we need to exit the streaming loop. */ if (endofstream) + { + TimeLineID tli; + walrcv_endstreaming(wrconn, &tli); break; + } /* * Wait for more data or latch. @@ -1222,6 +1290,14 @@ reread_subscription(void) { MemoryContext oldctx; Subscription *newsub; + bool started_tx = false; + + /* This function might be called inside or outside of transaction. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } /* Ensure allocations in permanent context. */ oldctx = MemoryContextSwitchTo(ApplyCacheContext); @@ -1319,6 +1395,9 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + if (started_tx) + CommitTransactionCommand(); + MySubscriptionValid = true; } @@ -1339,11 +1418,8 @@ ApplyWorkerMain(Datum main_arg) int worker_slot = DatumGetObjectId(main_arg); MemoryContext oldctx; char originname[NAMEDATALEN]; - RepOriginId originid; XLogRecPtr origin_startpos; - char *err; - int server_version; - TimeLineID startpointTLI; + char *myslotname; WalRcvStreamOptions options; /* Attach to slot */ @@ -1402,49 +1478,90 @@ ApplyWorkerMain(Datum main_arg) subscription_change_cb, (Datum) 0); - ereport(LOG, - (errmsg("logical replication apply for subscription \"%s\" has started", - MySubscription->name))); - - /* Setup replication origin tracking. */ - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); + if (am_tablesync_worker()) + elog(LOG, "logical replication sync for subscription %s, table %s started", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)); + else + elog(LOG, "logical replication apply for subscription %s started", + MySubscription->name); CommitTransactionCommand(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - wrconn = walrcv_connect(MySubscription->conninfo, true, - MySubscription->name, &err); - if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + + if (am_tablesync_worker()) + { + char *syncslotname; + + /* This is table synchroniation worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + + /* The slot name needs to be allocated in permanent memory context. */ + oldctx = MemoryContextSwitchTo(ApplyCacheContext); + myslotname = pstrdup(syncslotname); + MemoryContextSwitchTo(oldctx); + + pfree(syncslotname); + } + else + { + /* This is main apply worker */ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + int server_version; + + myslotname = MySubscription->slotname; + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname, + &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything + * but it does some initializations on the upstream so let's still + * call it. + */ + (void) walrcv_identify_system(wrconn, &startpointTLI, + &server_version); + + } /* - * We don't really use the output identify_system for anything - * but it does some initializations on the upstream so let's still - * call it. + * Setup callback for syscache so that we know when something + * changes in the subscription relation state. */ - (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version); + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); /* Build logical replication streaming options. */ options.logical = true; options.startpoint = origin_startpos; - options.slotname = MySubscription->slotname; + options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; - /* Start streaming from the slot. */ + /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); /* Run the main loop. */ - ApplyLoop(); + LogicalRepApplyLoop(origin_startpos); walrcv_disconnect(wrconn); |