summaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/vacuum.c135
1 files changed, 130 insertions, 5 deletions
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index bb34e252e45..d625d17bf46 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -42,6 +42,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker_internals.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -68,6 +69,14 @@ static MemoryContext vac_context = NULL;
static BufferAccessStrategy vac_strategy;
+/*
+ * Variables for cost-based parallel vacuum. See comments atop
+ * compute_parallel_delay to understand how it works.
+ */
+pg_atomic_uint32 *VacuumSharedCostBalance = NULL;
+pg_atomic_uint32 *VacuumActiveNWorkers = NULL;
+int VacuumCostBalanceLocal = 0;
+
/* non-export function prototypes */
static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
static List *get_all_vacuum_rels(int options);
@@ -76,6 +85,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
+static double compute_parallel_delay(void);
static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
/*
@@ -94,12 +104,16 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
bool freeze = false;
bool full = false;
bool disable_page_skipping = false;
+ bool parallel_option = false;
ListCell *lc;
/* Set default value */
params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT;
+ /* By default parallel vacuum is enabled */
+ params.nworkers = 0;
+
/* Parse options list */
foreach(lc, vacstmt->options)
{
@@ -129,6 +143,39 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
params.index_cleanup = get_vacopt_ternary_value(opt);
else if (strcmp(opt->defname, "truncate") == 0)
params.truncate = get_vacopt_ternary_value(opt);
+ else if (strcmp(opt->defname, "parallel") == 0)
+ {
+ parallel_option = true;
+ if (opt->arg == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parallel option requires a value between 0 and %d",
+ MAX_PARALLEL_WORKER_LIMIT),
+ parser_errposition(pstate, opt->location)));
+ }
+ else
+ {
+ int nworkers;
+
+ nworkers = defGetInt32(opt);
+ if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parallel vacuum degree must be between 0 and %d",
+ MAX_PARALLEL_WORKER_LIMIT),
+ parser_errposition(pstate, opt->location)));
+
+ /*
+ * Disable parallel vacuum, if user has specified parallel
+ * degree as zero.
+ */
+ if (nworkers == 0)
+ params.nworkers = -1;
+ else
+ params.nworkers = nworkers;
+ }
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -152,6 +199,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
!(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
Assert(!(params.options & VACOPT_SKIPTOAST));
+ if ((params.options & VACOPT_FULL) && parallel_option)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot specify both FULL and PARALLEL options")));
+
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present.
*/
@@ -383,6 +435,9 @@ vacuum(List *relations, VacuumParams *params,
VacuumPageHit = 0;
VacuumPageMiss = 0;
VacuumPageDirty = 0;
+ VacuumCostBalanceLocal = 0;
+ VacuumSharedCostBalance = NULL;
+ VacuumActiveNWorkers = NULL;
/*
* Loop to process each selected relation.
@@ -1941,16 +1996,26 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
void
vacuum_delay_point(void)
{
+ double msec = 0;
+
/* Always check for interrupts */
CHECK_FOR_INTERRUPTS();
- /* Nap if appropriate */
- if (VacuumCostActive && !InterruptPending &&
- VacuumCostBalance >= VacuumCostLimit)
- {
- double msec;
+ if (!VacuumCostActive || InterruptPending)
+ return;
+ /*
+ * For parallel vacuum, the delay is computed based on the shared cost
+ * balance. See compute_parallel_delay.
+ */
+ if (VacuumSharedCostBalance != NULL)
+ msec = compute_parallel_delay();
+ else if (VacuumCostBalance >= VacuumCostLimit)
msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
+
+ /* Nap if appropriate */
+ if (msec > 0)
+ {
if (msec > VacuumCostDelay * 4)
msec = VacuumCostDelay * 4;
@@ -1967,6 +2032,66 @@ vacuum_delay_point(void)
}
/*
+ * Computes the vacuum delay for parallel workers.
+ *
+ * The basic idea of a cost-based vacuum delay for parallel vacuum is to allow
+ * each worker to sleep proportional to the work done by it. We achieve this
+ * by allowing all parallel vacuum workers including the leader process to
+ * have a shared view of cost related parameters (mainly VacuumCostBalance).
+ * We allow each worker to update it as and when it has incurred any cost and
+ * then based on that decide whether it needs to sleep. We compute the time
+ * to sleep for a worker based on the cost it has incurred
+ * (VacuumCostBalanceLocal) and then reduce the VacuumSharedCostBalance by
+ * that amount. This avoids letting the workers sleep who have done less or
+ * no I/O as compared to other workers and therefore can ensure that workers
+ * who are doing more I/O got throttled more.
+ *
+ * We allow any worker to sleep only if it has performed the I/O above a
+ * certain threshold, which is calculated based on the number of active
+ * workers (VacuumActiveNWorkers), and the overall cost balance is more than
+ * VacuumCostLimit set by the system. The testing reveals that we achieve
+ * the required throttling if we allow a worker that has done more than 50%
+ * of its share of work to sleep.
+ */
+static double
+compute_parallel_delay(void)
+{
+ double msec = 0;
+ uint32 shared_balance;
+ int nworkers;
+
+ /* Parallel vacuum must be active */
+ Assert(VacuumSharedCostBalance);
+
+ nworkers = pg_atomic_read_u32(VacuumActiveNWorkers);
+
+ /* At least count itself */
+ Assert(nworkers >= 1);
+
+ /* Update the shared cost balance value atomically */
+ shared_balance = pg_atomic_add_fetch_u32(VacuumSharedCostBalance, VacuumCostBalance);
+
+ /* Compute the total local balance for the current worker */
+ VacuumCostBalanceLocal += VacuumCostBalance;
+
+ if ((shared_balance >= VacuumCostLimit) &&
+ (VacuumCostBalanceLocal > 0.5 * (VacuumCostLimit / nworkers)))
+ {
+ /* Compute sleep time based on the local cost balance */
+ msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit;
+ pg_atomic_sub_fetch_u32(VacuumSharedCostBalance, VacuumCostBalanceLocal);
+ VacuumCostBalanceLocal = 0;
+ }
+
+ /*
+ * Reset the local balance as we accumulated it into the shared value.
+ */
+ VacuumCostBalance = 0;
+
+ return msec;
+}
+
+/*
* A wrapper function of defGetBoolean().
*
* This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED