diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/vacuum.c | 135 |
1 files changed, 130 insertions, 5 deletions
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index bb34e252e45..d625d17bf46 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -42,6 +42,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "postmaster/autovacuum.h" +#include "postmaster/bgworker_internals.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -68,6 +69,14 @@ static MemoryContext vac_context = NULL; static BufferAccessStrategy vac_strategy; +/* + * Variables for cost-based parallel vacuum. See comments atop + * compute_parallel_delay to understand how it works. + */ +pg_atomic_uint32 *VacuumSharedCostBalance = NULL; +pg_atomic_uint32 *VacuumActiveNWorkers = NULL; +int VacuumCostBalanceLocal = 0; + /* non-export function prototypes */ static List *expand_vacuum_rel(VacuumRelation *vrel, int options); static List *get_all_vacuum_rels(int options); @@ -76,6 +85,7 @@ static void vac_truncate_clog(TransactionId frozenXID, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params); +static double compute_parallel_delay(void); static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def); /* @@ -94,12 +104,16 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) bool freeze = false; bool full = false; bool disable_page_skipping = false; + bool parallel_option = false; ListCell *lc; /* Set default value */ params.index_cleanup = VACOPT_TERNARY_DEFAULT; params.truncate = VACOPT_TERNARY_DEFAULT; + /* By default parallel vacuum is enabled */ + params.nworkers = 0; + /* Parse options list */ foreach(lc, vacstmt->options) { @@ -129,6 +143,39 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) params.index_cleanup = get_vacopt_ternary_value(opt); else if (strcmp(opt->defname, "truncate") == 0) params.truncate = get_vacopt_ternary_value(opt); + else if (strcmp(opt->defname, "parallel") == 0) + { + parallel_option = true; + if (opt->arg == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel option requires a value between 0 and %d", + MAX_PARALLEL_WORKER_LIMIT), + parser_errposition(pstate, opt->location))); + } + else + { + int nworkers; + + nworkers = defGetInt32(opt); + if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel vacuum degree must be between 0 and %d", + MAX_PARALLEL_WORKER_LIMIT), + parser_errposition(pstate, opt->location))); + + /* + * Disable parallel vacuum, if user has specified parallel + * degree as zero. + */ + if (nworkers == 0) + params.nworkers = -1; + else + params.nworkers = nworkers; + } + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -152,6 +199,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) !(params.options & (VACOPT_FULL | VACOPT_FREEZE))); Assert(!(params.options & VACOPT_SKIPTOAST)); + if ((params.options & VACOPT_FULL) && parallel_option) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot specify both FULL and PARALLEL options"))); + /* * Make sure VACOPT_ANALYZE is specified if any column lists are present. */ @@ -383,6 +435,9 @@ vacuum(List *relations, VacuumParams *params, VacuumPageHit = 0; VacuumPageMiss = 0; VacuumPageDirty = 0; + VacuumCostBalanceLocal = 0; + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; /* * Loop to process each selected relation. @@ -1941,16 +1996,26 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode) void vacuum_delay_point(void) { + double msec = 0; + /* Always check for interrupts */ CHECK_FOR_INTERRUPTS(); - /* Nap if appropriate */ - if (VacuumCostActive && !InterruptPending && - VacuumCostBalance >= VacuumCostLimit) - { - double msec; + if (!VacuumCostActive || InterruptPending) + return; + /* + * For parallel vacuum, the delay is computed based on the shared cost + * balance. See compute_parallel_delay. + */ + if (VacuumSharedCostBalance != NULL) + msec = compute_parallel_delay(); + else if (VacuumCostBalance >= VacuumCostLimit) msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit; + + /* Nap if appropriate */ + if (msec > 0) + { if (msec > VacuumCostDelay * 4) msec = VacuumCostDelay * 4; @@ -1967,6 +2032,66 @@ vacuum_delay_point(void) } /* + * Computes the vacuum delay for parallel workers. + * + * The basic idea of a cost-based vacuum delay for parallel vacuum is to allow + * each worker to sleep proportional to the work done by it. We achieve this + * by allowing all parallel vacuum workers including the leader process to + * have a shared view of cost related parameters (mainly VacuumCostBalance). + * We allow each worker to update it as and when it has incurred any cost and + * then based on that decide whether it needs to sleep. We compute the time + * to sleep for a worker based on the cost it has incurred + * (VacuumCostBalanceLocal) and then reduce the VacuumSharedCostBalance by + * that amount. This avoids letting the workers sleep who have done less or + * no I/O as compared to other workers and therefore can ensure that workers + * who are doing more I/O got throttled more. + * + * We allow any worker to sleep only if it has performed the I/O above a + * certain threshold, which is calculated based on the number of active + * workers (VacuumActiveNWorkers), and the overall cost balance is more than + * VacuumCostLimit set by the system. The testing reveals that we achieve + * the required throttling if we allow a worker that has done more than 50% + * of its share of work to sleep. + */ +static double +compute_parallel_delay(void) +{ + double msec = 0; + uint32 shared_balance; + int nworkers; + + /* Parallel vacuum must be active */ + Assert(VacuumSharedCostBalance); + + nworkers = pg_atomic_read_u32(VacuumActiveNWorkers); + + /* At least count itself */ + Assert(nworkers >= 1); + + /* Update the shared cost balance value atomically */ + shared_balance = pg_atomic_add_fetch_u32(VacuumSharedCostBalance, VacuumCostBalance); + + /* Compute the total local balance for the current worker */ + VacuumCostBalanceLocal += VacuumCostBalance; + + if ((shared_balance >= VacuumCostLimit) && + (VacuumCostBalanceLocal > 0.5 * (VacuumCostLimit / nworkers))) + { + /* Compute sleep time based on the local cost balance */ + msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit; + pg_atomic_sub_fetch_u32(VacuumSharedCostBalance, VacuumCostBalanceLocal); + VacuumCostBalanceLocal = 0; + } + + /* + * Reset the local balance as we accumulated it into the shared value. + */ + VacuumCostBalance = 0; + + return msec; +} + +/* * A wrapper function of defGetBoolean(). * * This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED |