summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c77
1 files changed, 28 insertions, 49 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 58c98488d7b..e5a2856fd17 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
};
static HTAB *last_start_times = NULL;
ListCell *lc;
- bool started_tx = false;
+ bool started_tx;
bool should_exit = false;
Relation rel = NULL;
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- FetchRelationStates(&started_tx);
+ FetchRelationStates(NULL, NULL, &started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
@@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
+
if (rstate->state == SUBREL_STATE_SYNCDONE)
{
/*
@@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
/*
* Remove the tablesync origin tracking if exists.
@@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
*/
int nsyncworkers =
logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ struct tablesync_start_time_mapping *hentry;
+ bool found;
/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
- /*
- * If there are free sync worker slot(s), start a new sync
- * worker for the table.
- */
- if (nsyncworkers < max_sync_workers_per_subscription)
- {
- TimestampTz now = GetCurrentTimestamp();
- struct tablesync_start_time_mapping *hentry;
- bool found;
+ hentry = hash_search(last_start_times, &rstate->relid,
+ HASH_ENTER, &found);
+ if (!found)
+ hentry->last_start_time = 0;
- hentry = hash_search(last_start_times, &rstate->relid,
- HASH_ENTER, &found);
-
- if (!found ||
- TimestampDifferenceExceeds(hentry->last_start_time, now,
- wal_retrieve_retry_interval))
- {
- /*
- * Set the last_start_time even if we fail to start
- * the worker, so that we won't retry until
- * wal_retrieve_retry_interval has elapsed.
- */
- hentry->last_start_time = now;
- (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID,
- false);
- }
- }
+ launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
+ rstate->relid, &hentry->last_start_time);
}
}
}
@@ -1432,8 +1411,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
}
/*
- * Make sure that the copy command runs as the table owner, unless the
- * user has opted out of that behaviour.
+ * If the user did not opt to run as the owner of the subscription
+ * ('run_as_owner'), then copy the table as the owner of the table.
*/
run_as_owner = MySubscription->runasowner;
if (!run_as_owner)
@@ -1596,7 +1575,7 @@ run_tablesync_worker()
/* Logical Replication Tablesync worker entry point */
void
-TablesyncWorkerMain(Datum main_arg)
+TableSyncWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
@@ -1618,11 +1597,11 @@ TablesyncWorkerMain(Datum main_arg)
bool
AllTablesyncsReady(void)
{
- bool started_tx = false;
- bool has_subrels = false;
+ bool started_tx;
+ bool has_tables;
/* We need up-to-date sync state info for subscription tables here. */
- has_subrels = FetchRelationStates(&started_tx);
+ FetchRelationStates(&has_tables, NULL, &started_tx);
if (started_tx)
{
@@ -1634,7 +1613,7 @@ AllTablesyncsReady(void)
* Return false when there are no tables in subscription or not all tables
* are in ready state; true otherwise.
*/
- return has_subrels && (table_states_not_ready == NIL);
+ return has_tables && (table_states_not_ready == NIL);
}
/*
@@ -1649,10 +1628,10 @@ bool
HasSubscriptionTablesCached(void)
{
bool started_tx;
- bool has_subrels;
+ bool has_tables;
/* We need up-to-date subscription tables info here */
- has_subrels = FetchRelationStates(&started_tx);
+ FetchRelationStates(&has_tables, NULL, &started_tx);
if (started_tx)
{
@@ -1660,7 +1639,7 @@ HasSubscriptionTablesCached(void)
pgstat_report_stat(true);
}
- return has_subrels;
+ return has_tables;
}
/*