diff options
Diffstat (limited to 'src/backend')
32 files changed, 2420 insertions, 240 deletions
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index 7306c16f05c..0414ce1945c 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -270,7 +270,8 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk) elog(ERROR, "wrong heap buffer passed to visibilitymap_set"); - Assert(!BufferIsValid(heapBuf) || BufferIsExclusiveLocked(heapBuf)); + Assert(!BufferIsValid(heapBuf) || + BufferIsLockedByMeInMode(heapBuf, BUFFER_LOCK_EXCLUSIVE)); /* Check that we have the right VM page pinned */ if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock) diff --git a/src/backend/access/rmgrdesc/gindesc.c b/src/backend/access/rmgrdesc/gindesc.c index 229675775ff..075c4a0ae93 100644 --- a/src/backend/access/rmgrdesc/gindesc.c +++ b/src/backend/access/rmgrdesc/gindesc.c @@ -130,6 +130,9 @@ gin_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, " isdata: %c isleaf: %c", (xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F', (xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F'); + if (xlrec->leftChildBlkno != InvalidBlockNumber) + appendStringInfo(buf, " children: %u/%u", + xlrec->leftChildBlkno, xlrec->rightChildBlkno); } break; case XLOG_GIN_VACUUM_PAGE: @@ -150,10 +153,27 @@ gin_desc(StringInfo buf, XLogReaderState *record) /* no further information */ break; case XLOG_GIN_UPDATE_META_PAGE: - /* no further information */ + { + ginxlogUpdateMeta *xlrec = (ginxlogUpdateMeta *) rec; + + appendStringInfo(buf, "ntuples: %d", xlrec->ntuples); + if (xlrec->prevTail != InvalidBlockNumber) + appendStringInfo(buf, " prevTail: %u", + xlrec->prevTail); + if (xlrec->newRightlink != InvalidBlockNumber) + appendStringInfo(buf, " newRightLink: %u", + xlrec->newRightlink); + } break; case XLOG_GIN_INSERT_LISTPAGE: - /* no further information */ + { + ginxlogInsertListPage *xlrec = (ginxlogInsertListPage *) rec; + + appendStringInfo(buf, "ntuples: %d", xlrec->ntuples); + if (xlrec->rightlink != InvalidBlockNumber) + appendStringInfo(buf, " rightlink: %u", + xlrec->rightlink); + } break; case XLOG_GIN_DELETE_LISTPAGE: appendStringInfo(buf, "ndeleted: %d", diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index d8e2fce2c99..33369fbe23a 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2817,7 +2817,7 @@ LookupGXactBySubid(Oid subid) } /* - * TwoPhaseGetXidByLockingProc + * TwoPhaseGetOldestXidInCommit * Return the oldest transaction ID from prepared transactions that are * currently in the commit critical section. * diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index c7571429e8e..496e0fa4ac6 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -258,7 +258,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) */ #ifdef USE_ASSERT_CHECKING if (!(flags & REGBUF_NO_CHANGE)) - Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer)); + Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) && + BufferIsDirty(buffer)); #endif if (block_id >= max_registered_block_id) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 884b6a23817..fcc86fd43be 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1063,6 +1063,7 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_txns, s.stream_count, s.stream_bytes, + s.mem_exceeded_count, s.total_txns, s.total_bytes, s.stats_reset @@ -1131,7 +1132,8 @@ CREATE VIEW pg_stat_user_functions AS P.proname AS funcname, pg_stat_get_function_calls(P.oid) AS calls, pg_stat_get_function_total_time(P.oid) AS total_time, - pg_stat_get_function_self_time(P.oid) AS self_time + pg_stat_get_function_self_time(P.oid) AS self_time, + pg_stat_get_function_stat_reset_time(P.oid) AS stats_reset FROM pg_proc P LEFT JOIN pg_namespace N ON (N.oid = P.pronamespace) WHERE P.prolang != 12 -- fast check to eliminate built-in functions AND pg_stat_get_function_calls(P.oid) IS NOT NULL; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 67b94b91cae..e5781155cdf 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -796,7 +796,7 @@ BeginCopyTo(ParseState *pstate, /* plan the query */ plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_OK, NULL); + CURSOR_OPT_PARALLEL_OK, NULL, NULL); /* * With row-level security and a user using "COPY relation TO", we diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index dfd2ab8e862..1ccc2e55c64 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -321,7 +321,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, /* plan the query */ plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_OK, params); + CURSOR_OPT_PARALLEL_OK, params, NULL); /* * Use a snapshot with an updated command ID to ensure this query sees diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 06191cd8a85..e6edae0845c 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -351,7 +351,7 @@ standard_ExplainOneQuery(Query *query, int cursorOptions, INSTR_TIME_SET_CURRENT(planstart); /* plan the query */ - plan = pg_plan_query(query, queryString, cursorOptions, params); + plan = pg_plan_query(query, queryString, cursorOptions, params, es); INSTR_TIME_SET_CURRENT(planduration); INSTR_TIME_SUBTRACT(planduration, planstart); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 188e26f0e6e..441de55ac24 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -426,7 +426,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, CHECK_FOR_INTERRUPTS(); /* Plan the query which will generate data for the refresh. */ - plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); + plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL, NULL); /* * Use a snapshot with an updated command ID to ensure this query sees diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index e7c8171c102..ec96c2efcd3 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -99,7 +99,8 @@ PerformCursorOpen(ParseState *pstate, DeclareCursorStmt *cstmt, ParamListInfo pa elog(ERROR, "non-SELECT statement in DECLARE CURSOR"); /* Plan the query, applying the specified options */ - plan = pg_plan_query(query, pstate->p_sourcetext, cstmt->options, params); + plan = pg_plan_query(query, pstate->p_sourcetext, cstmt->options, params, + NULL); /* * Create a portal and copy the plan and query string into its memory. diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index cf667c81211..0698aae37a7 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -1501,8 +1501,9 @@ row_is_in_frame(WindowObject winobj, int64 pos, TupleTableSlot *slot, /* following row that is not peer is out of frame */ if (pos > winstate->currentpos) { - if (fetch_tuple) - window_gettupleslot(winobj, pos, slot); + if (fetch_tuple) /* need to fetch tuple? */ + if (!window_gettupleslot(winobj, pos, slot)) + return -1; if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) return -1; } @@ -3721,6 +3722,7 @@ WinGetFuncArgInPartition(WindowObject winobj, int argno, int notnull_offset; int notnull_relpos; int forward; + bool myisout; Assert(WindowObjectIsValid(winobj)); winstate = winobj->winstate; @@ -3759,63 +3761,60 @@ WinGetFuncArgInPartition(WindowObject winobj, int argno, if (!null_treatment) /* IGNORE NULLS is not specified */ { + /* get tupple and evaluate in a partition */ datum = gettuple_eval_partition(winobj, argno, - abs_pos, isnull, isout); - if (!*isout && set_mark) + abs_pos, isnull, &myisout); + if (!myisout && set_mark) WinSetMarkPosition(winobj, abs_pos); + if (isout) + *isout = myisout; return datum; } + myisout = false; + datum = 0; + /* * Get the next nonnull value in the partition, moving forward or backward * until we find a value or reach the partition's end. */ do { + int nn_info; /* NOT NULL info */ + abs_pos += forward; - if (abs_pos < 0) - { - /* out of partition */ - if (isout) - *isout = true; - *isnull = true; - datum = 0; + if (abs_pos < 0) /* apparently out of partition */ break; - } - switch (get_notnull_info(winobj, abs_pos)) + /* check NOT NULL cached info */ + nn_info = get_notnull_info(winobj, abs_pos); + if (nn_info == NN_NOTNULL) /* this row is known to be NOT NULL */ + notnull_offset++; + + else if (nn_info == NN_NULL) /* this row is known to be NULL */ + continue; /* keep on moving forward or backward */ + + else /* need to check NULL or not */ { - case NN_NOTNULL: /* this row is known to be NOT NULL */ - notnull_offset++; - if (notnull_offset >= notnull_relpos) - { - /* prepare to exit this loop */ - datum = gettuple_eval_partition(winobj, argno, - abs_pos, isnull, isout); - } - break; - case NN_NULL: /* this row is known to be NULL */ - if (isout) - *isout = false; - *isnull = true; - datum = 0; - break; - default: /* need to check NULL or not */ - datum = gettuple_eval_partition(winobj, argno, - abs_pos, isnull, isout); - if (*isout) /* out of partition? */ - return datum; - - if (!*isnull) - notnull_offset++; - /* record the row status */ - put_notnull_info(winobj, abs_pos, *isnull); + /* get tupple and evaluate in a partition */ + datum = gettuple_eval_partition(winobj, argno, + abs_pos, isnull, &myisout); + if (myisout) /* out of partition? */ break; + if (!*isnull) + notnull_offset++; + /* record the row status */ + put_notnull_info(winobj, abs_pos, *isnull); } } while (notnull_offset < notnull_relpos); - if (!*isout && set_mark) + /* get tupple and evaluate in a partition */ + datum = gettuple_eval_partition(winobj, argno, + abs_pos, isnull, &myisout); + if (!myisout && set_mark) WinSetMarkPosition(winobj, abs_pos); + if (isout) + *isout = myisout; return datum; } diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README index 843368096fd..6c35baceedb 100644 --- a/src/backend/optimizer/README +++ b/src/backend/optimizer/README @@ -1500,3 +1500,113 @@ breaking down aggregation or grouping over a partitioned relation into aggregation or grouping over its partitions is called partitionwise aggregation. Especially when the partition keys match the GROUP BY clause, this can be significantly faster than the regular method. + +Eager aggregation +----------------- + +Eager aggregation is a query optimization technique that partially +pushes aggregation past a join, and finalizes it once all the +relations are joined. Eager aggregation may reduce the number of +input rows to the join and thus could result in a better overall plan. + +To prove that the transformation is correct, let's first consider the +case where only inner joins are involved. In this case, we partition +the tables in the FROM clause into two groups: those that contain at +least one aggregation column, and those that do not contain any +aggregation columns. Each group can be treated as a single relation +formed by the Cartesian product of the tables within that group. +Therefore, without loss of generality, we can assume that the FROM +clause contains exactly two relations, R1 and R2, where R1 represents +the relation containing all aggregation columns, and R2 represents the +relation without any aggregation columns. + +Let the query be of the form: + +SELECT G, AGG(A) +FROM R1 JOIN R2 ON J +GROUP BY G; + +where G is the set of grouping keys that may include columns from R1 +and/or R2; AGG(A) is an aggregate function over columns A from R1; J +is the join condition between R1 and R2. + +The transformation of eager aggregation is: + + GROUP BY G, AGG(A) on (R1 JOIN R2 ON J) + = + GROUP BY G, AGG(agg_A) on ((GROUP BY G1, AGG(A) AS agg_A on R1) JOIN R2 ON J) + +This equivalence holds under the following conditions: + +1) AGG is decomposable, meaning that it can be computed in two stages: +a partial aggregation followed by a final aggregation; +2) The set G1 used in the pre-aggregation of R1 includes: + * all columns from R1 that are part of the grouping keys G, and + * all columns from R1 that appear in the join condition J. +3) The grouping operator for any column in G1 must be compatible with +the operator used for that column in the join condition J. + +Since G1 includes all columns from R1 that appear in either the +grouping keys G or the join condition J, all rows within each partial +group have identical values for both the grouping keys and the +join-relevant columns from R1, assuming compatible operators are used. +As a result, the rows within a partial group are indistinguishable in +terms of their contribution to the aggregation and their behavior in +the join. This ensures that all rows in the same partial group share +the same "destiny": they either all match or all fail to match a given +row in R2. Because the aggregate function AGG is decomposable, +aggregating the partial results after the join yields the same final +result as aggregating after the full join, thereby preserving query +semantics. Q.E.D. + +In the case where there are any outer joins, the situation becomes +more complex due to join order constraints and the semantics of +null-extension in outer joins. If the relations that contain at least +one aggregation column cannot be treated as a single relation because +of the join order constraints, partial aggregation paths will not be +generated, and thus the transformation is not applicable. Otherwise, +let R1 be the relation containing all aggregation columns, and R2, R3, +... be the remaining relations. From the inner join case, under the +aforementioned conditions, we have the equivalence: + + GROUP BY G, AGG(A) on (R1 JOIN R2 JOIN R3 ...) + = + GROUP BY G, AGG(agg_A) on ((GROUP BY G1, AGG(A) AS agg_A on R1) JOIN R2 JOIN R3 ...) + +To preserve correctness when outer joins are involved, we require an +additional condition: + +4) R1 must not be on the nullable side of any outer join. + +This condition ensures that partial aggregation over R1 does not +suppress any null-extended rows that would be introduced by outer +joins. If R1 is on the nullable side of an outer join, the +NULL-extended rows produced by the outer join would not be available +when we perform the partial aggregation, while with a +non-eager-aggregation plan these rows are available for the top-level +aggregation. Pushing partial aggregation in this case may result in +the rows being grouped differently than expected, or produce incorrect +values from the aggregate functions. + +During the construction of the join tree, we evaluate each base or +join relation to determine if eager aggregation can be applied. If +feasible, we create a separate RelOptInfo called a "grouped relation" +and generate grouped paths by adding sorted and hashed partial +aggregation paths on top of the non-grouped paths. To limit planning +time, we consider only the cheapest or suitably-sorted non-grouped +paths in this step. + +Another way to generate grouped paths is to join a grouped relation +with a non-grouped relation. Joining two grouped relations is +currently not supported. + +To further limit planning time, we currently adopt a strategy where +partial aggregation is pushed only to the lowest feasible level in the +join tree where it provides a significant reduction in row count. +This strategy also helps ensure that all grouped paths for the same +grouped relation produce the same set of rows, which is important to +support a fundamental assumption of the planner. + +If we have generated a grouped relation for the topmost join relation, +we need to finalize its paths at the end. The final paths will +compete in the usual way with paths built from regular planning. diff --git a/src/backend/optimizer/geqo/geqo_eval.c b/src/backend/optimizer/geqo/geqo_eval.c index 7fcb1aa70d1..8005754c8c6 100644 --- a/src/backend/optimizer/geqo/geqo_eval.c +++ b/src/backend/optimizer/geqo/geqo_eval.c @@ -264,6 +264,9 @@ merge_clump(PlannerInfo *root, List *clumps, Clump *new_clump, int num_gene, /* Keep searching if join order is not valid */ if (joinrel) { + bool is_top_rel = bms_equal(joinrel->relids, + root->all_query_rels); + /* Create paths for partitionwise joins. */ generate_partitionwise_join_paths(root, joinrel); @@ -273,12 +276,28 @@ merge_clump(PlannerInfo *root, List *clumps, Clump *new_clump, int num_gene, * rel once we know the final targetlist (see * grouping_planner). */ - if (!bms_equal(joinrel->relids, root->all_query_rels)) + if (!is_top_rel) generate_useful_gather_paths(root, joinrel, false); /* Find and save the cheapest paths for this joinrel */ set_cheapest(joinrel); + /* + * Except for the topmost scan/join rel, consider generating + * partial aggregation paths for the grouped relation on top + * of the paths of this rel. After that, we're done creating + * paths for the grouped relation, so run set_cheapest(). + */ + if (joinrel->grouped_rel != NULL && !is_top_rel) + { + RelOptInfo *grouped_rel = joinrel->grouped_rel; + + Assert(IS_GROUPED_REL(grouped_rel)); + + generate_grouped_paths(root, grouped_rel, joinrel); + set_cheapest(grouped_rel); + } + /* Absorb new clump into old */ old_clump->joinrel = joinrel; old_clump->size += new_clump->size; diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 1f82239b4e0..ec27c1a4994 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -40,6 +40,7 @@ #include "optimizer/paths.h" #include "optimizer/plancat.h" #include "optimizer/planner.h" +#include "optimizer/prep.h" #include "optimizer/tlist.h" #include "parser/parse_clause.h" #include "parser/parsetree.h" @@ -47,6 +48,7 @@ #include "port/pg_bitutils.h" #include "rewrite/rewriteManip.h" #include "utils/lsyscache.h" +#include "utils/selfuncs.h" /* Bitmask flags for pushdown_safety_info.unsafeFlags */ @@ -77,7 +79,9 @@ typedef enum pushdown_safe_type /* These parameters are set by GUC */ bool enable_geqo = false; /* just in case GUC doesn't set it */ +bool enable_eager_aggregate = true; int geqo_threshold; +double min_eager_agg_group_size; int min_parallel_table_scan_size; int min_parallel_index_scan_size; @@ -90,6 +94,7 @@ join_search_hook_type join_search_hook = NULL; static void set_base_rel_consider_startup(PlannerInfo *root); static void set_base_rel_sizes(PlannerInfo *root); +static void setup_simple_grouped_rels(PlannerInfo *root); static void set_base_rel_pathlists(PlannerInfo *root); static void set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); @@ -114,6 +119,7 @@ static void set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); +static void set_grouped_rel_pathlist(PlannerInfo *root, RelOptInfo *rel); static void generate_orderedappend_paths(PlannerInfo *root, RelOptInfo *rel, List *live_childrels, List *all_child_pathkeys); @@ -183,6 +189,12 @@ make_one_rel(PlannerInfo *root, List *joinlist) set_base_rel_sizes(root); /* + * Build grouped relations for simple rels (i.e., base or "other" member + * relations) where possible. + */ + setup_simple_grouped_rels(root); + + /* * We should now have size estimates for every actual table involved in * the query, and we also know which if any have been deleted from the * query by join removal, pruned by partition pruning, or eliminated by @@ -324,6 +336,39 @@ set_base_rel_sizes(PlannerInfo *root) } /* + * setup_simple_grouped_rels + * For each simple relation, build a grouped simple relation if eager + * aggregation is possible and if this relation can produce grouped paths. + */ +static void +setup_simple_grouped_rels(PlannerInfo *root) +{ + Index rti; + + /* + * If there are no aggregate expressions or grouping expressions, eager + * aggregation is not possible. + */ + if (root->agg_clause_list == NIL || + root->group_expr_list == NIL) + return; + + for (rti = 1; rti < root->simple_rel_array_size; rti++) + { + RelOptInfo *rel = root->simple_rel_array[rti]; + + /* there may be empty slots corresponding to non-baserel RTEs */ + if (rel == NULL) + continue; + + Assert(rel->relid == rti); /* sanity check on array */ + Assert(IS_SIMPLE_REL(rel)); /* sanity check on rel */ + + (void) build_simple_grouped_rel(root, rel); + } +} + +/* * set_base_rel_pathlists * Finds all paths available for scanning each base-relation entry. * Sequential scan and any available indices are considered. @@ -559,6 +604,15 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, /* Now find the cheapest of the paths for this rel */ set_cheapest(rel); + /* + * If a grouped relation for this rel exists, build partial aggregation + * paths for it. + * + * Note that this can only happen after we've called set_cheapest() for + * this base rel, because we need its cheapest paths. + */ + set_grouped_rel_pathlist(root, rel); + #ifdef OPTIMIZER_DEBUG pprint(rel); #endif @@ -1305,6 +1359,35 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, add_paths_to_append_rel(root, rel, live_childrels); } +/* + * set_grouped_rel_pathlist + * If a grouped relation for the given 'rel' exists, build partial + * aggregation paths for it. + */ +static void +set_grouped_rel_pathlist(PlannerInfo *root, RelOptInfo *rel) +{ + RelOptInfo *grouped_rel; + + /* + * If there are no aggregate expressions or grouping expressions, eager + * aggregation is not possible. + */ + if (root->agg_clause_list == NIL || + root->group_expr_list == NIL) + return; + + /* Add paths to the grouped base relation if one exists. */ + grouped_rel = rel->grouped_rel; + if (grouped_rel) + { + Assert(IS_GROUPED_REL(grouped_rel)); + + generate_grouped_paths(root, grouped_rel, rel); + set_cheapest(grouped_rel); + } +} + /* * add_paths_to_append_rel @@ -3335,6 +3418,345 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r } /* + * generate_grouped_paths + * Generate paths for a grouped relation by adding sorted and hashed + * partial aggregation paths on top of paths of the ungrouped relation. + * + * The information needed is provided by the RelAggInfo structure stored in + * "grouped_rel". + */ +void +generate_grouped_paths(PlannerInfo *root, RelOptInfo *grouped_rel, + RelOptInfo *rel) +{ + RelAggInfo *agg_info = grouped_rel->agg_info; + AggClauseCosts agg_costs; + bool can_hash; + bool can_sort; + Path *cheapest_total_path = NULL; + Path *cheapest_partial_path = NULL; + double dNumGroups = 0; + double dNumPartialGroups = 0; + List *group_pathkeys = NIL; + + if (IS_DUMMY_REL(rel)) + { + mark_dummy_rel(grouped_rel); + return; + } + + /* + * We push partial aggregation only to the lowest possible level in the + * join tree that is deemed useful. + */ + if (!bms_equal(agg_info->apply_at, rel->relids) || + !agg_info->agg_useful) + return; + + MemSet(&agg_costs, 0, sizeof(AggClauseCosts)); + get_agg_clause_costs(root, AGGSPLIT_INITIAL_SERIAL, &agg_costs); + + /* + * Determine whether it's possible to perform sort-based implementations + * of grouping, and generate the pathkeys that represent the grouping + * requirements in that case. + */ + can_sort = grouping_is_sortable(agg_info->group_clauses); + if (can_sort) + { + RelOptInfo *top_grouped_rel; + List *top_group_tlist; + + top_grouped_rel = IS_OTHER_REL(rel) ? + rel->top_parent->grouped_rel : grouped_rel; + top_group_tlist = + make_tlist_from_pathtarget(top_grouped_rel->agg_info->target); + + group_pathkeys = + make_pathkeys_for_sortclauses(root, agg_info->group_clauses, + top_group_tlist); + } + + /* + * Determine whether we should consider hash-based implementations of + * grouping. + */ + Assert(root->numOrderedAggs == 0); + can_hash = (agg_info->group_clauses != NIL && + grouping_is_hashable(agg_info->group_clauses)); + + /* + * Consider whether we should generate partially aggregated non-partial + * paths. We can only do this if we have a non-partial path. + */ + if (rel->pathlist != NIL) + { + cheapest_total_path = rel->cheapest_total_path; + Assert(cheapest_total_path != NULL); + } + + /* + * If parallelism is possible for grouped_rel, then we should consider + * generating partially-grouped partial paths. However, if the ungrouped + * rel has no partial paths, then we can't. + */ + if (grouped_rel->consider_parallel && rel->partial_pathlist != NIL) + { + cheapest_partial_path = linitial(rel->partial_pathlist); + Assert(cheapest_partial_path != NULL); + } + + /* Estimate number of partial groups. */ + if (cheapest_total_path != NULL) + dNumGroups = estimate_num_groups(root, + agg_info->group_exprs, + cheapest_total_path->rows, + NULL, NULL); + if (cheapest_partial_path != NULL) + dNumPartialGroups = estimate_num_groups(root, + agg_info->group_exprs, + cheapest_partial_path->rows, + NULL, NULL); + + if (can_sort && cheapest_total_path != NULL) + { + ListCell *lc; + + /* + * Use any available suitably-sorted path as input, and also consider + * sorting the cheapest-total path and incremental sort on any paths + * with presorted keys. + * + * To save planning time, we ignore parameterized input paths unless + * they are the cheapest-total path. + */ + foreach(lc, rel->pathlist) + { + Path *input_path = (Path *) lfirst(lc); + Path *path; + bool is_sorted; + int presorted_keys; + + /* + * Ignore parameterized paths that are not the cheapest-total + * path. + */ + if (input_path->param_info && + input_path != cheapest_total_path) + continue; + + is_sorted = pathkeys_count_contained_in(group_pathkeys, + input_path->pathkeys, + &presorted_keys); + + /* + * Ignore paths that are not suitably or partially sorted, unless + * they are the cheapest total path (no need to deal with paths + * which have presorted keys when incremental sort is disabled). + */ + if (!is_sorted && input_path != cheapest_total_path && + (presorted_keys == 0 || !enable_incremental_sort)) + continue; + + /* + * Since the path originates from a non-grouped relation that is + * not aware of eager aggregation, we must ensure that it provides + * the correct input for partial aggregation. + */ + path = (Path *) create_projection_path(root, + grouped_rel, + input_path, + agg_info->agg_input); + + if (!is_sorted) + { + /* + * We've no need to consider both a sort and incremental sort. + * We'll just do a sort if there are no presorted keys and an + * incremental sort when there are presorted keys. + */ + if (presorted_keys == 0 || !enable_incremental_sort) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + group_pathkeys, + -1.0); + else + path = (Path *) create_incremental_sort_path(root, + grouped_rel, + path, + group_pathkeys, + presorted_keys, + -1.0); + } + + /* + * qual is NIL because the HAVING clause cannot be evaluated until + * the final value of the aggregate is known. + */ + path = (Path *) create_agg_path(root, + grouped_rel, + path, + agg_info->target, + AGG_SORTED, + AGGSPLIT_INITIAL_SERIAL, + agg_info->group_clauses, + NIL, + &agg_costs, + dNumGroups); + + add_path(grouped_rel, path); + } + } + + if (can_sort && cheapest_partial_path != NULL) + { + ListCell *lc; + + /* Similar to above logic, but for partial paths. */ + foreach(lc, rel->partial_pathlist) + { + Path *input_path = (Path *) lfirst(lc); + Path *path; + bool is_sorted; + int presorted_keys; + + is_sorted = pathkeys_count_contained_in(group_pathkeys, + input_path->pathkeys, + &presorted_keys); + + /* + * Ignore paths that are not suitably or partially sorted, unless + * they are the cheapest partial path (no need to deal with paths + * which have presorted keys when incremental sort is disabled). + */ + if (!is_sorted && input_path != cheapest_partial_path && + (presorted_keys == 0 || !enable_incremental_sort)) + continue; + + /* + * Since the path originates from a non-grouped relation that is + * not aware of eager aggregation, we must ensure that it provides + * the correct input for partial aggregation. + */ + path = (Path *) create_projection_path(root, + grouped_rel, + input_path, + agg_info->agg_input); + + if (!is_sorted) + { + /* + * We've no need to consider both a sort and incremental sort. + * We'll just do a sort if there are no presorted keys and an + * incremental sort when there are presorted keys. + */ + if (presorted_keys == 0 || !enable_incremental_sort) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + group_pathkeys, + -1.0); + else + path = (Path *) create_incremental_sort_path(root, + grouped_rel, + path, + group_pathkeys, + presorted_keys, + -1.0); + } + + /* + * qual is NIL because the HAVING clause cannot be evaluated until + * the final value of the aggregate is known. + */ + path = (Path *) create_agg_path(root, + grouped_rel, + path, + agg_info->target, + AGG_SORTED, + AGGSPLIT_INITIAL_SERIAL, + agg_info->group_clauses, + NIL, + &agg_costs, + dNumPartialGroups); + + add_partial_path(grouped_rel, path); + } + } + + /* + * Add a partially-grouped HashAgg Path where possible + */ + if (can_hash && cheapest_total_path != NULL) + { + Path *path; + + /* + * Since the path originates from a non-grouped relation that is not + * aware of eager aggregation, we must ensure that it provides the + * correct input for partial aggregation. + */ + path = (Path *) create_projection_path(root, + grouped_rel, + cheapest_total_path, + agg_info->agg_input); + + /* + * qual is NIL because the HAVING clause cannot be evaluated until the + * final value of the aggregate is known. + */ + path = (Path *) create_agg_path(root, + grouped_rel, + path, + agg_info->target, + AGG_HASHED, + AGGSPLIT_INITIAL_SERIAL, + agg_info->group_clauses, + NIL, + &agg_costs, + dNumGroups); + + add_path(grouped_rel, path); + } + + /* + * Now add a partially-grouped HashAgg partial Path where possible + */ + if (can_hash && cheapest_partial_path != NULL) + { + Path *path; + + /* + * Since the path originates from a non-grouped relation that is not + * aware of eager aggregation, we must ensure that it provides the + * correct input for partial aggregation. + */ + path = (Path *) create_projection_path(root, + grouped_rel, + cheapest_partial_path, + agg_info->agg_input); + + /* + * qual is NIL because the HAVING clause cannot be evaluated until the + * final value of the aggregate is known. + */ + path = (Path *) create_agg_path(root, + grouped_rel, + path, + agg_info->target, + AGG_HASHED, + AGGSPLIT_INITIAL_SERIAL, + agg_info->group_clauses, + NIL, + &agg_costs, + dNumPartialGroups); + + add_partial_path(grouped_rel, path); + } +} + +/* * make_rel_from_joinlist * Build access paths using a "joinlist" to guide the join path search. * @@ -3493,11 +3915,19 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels) * * After that, we're done creating paths for the joinrel, so run * set_cheapest(). + * + * In addition, we also run generate_grouped_paths() for the grouped + * relation of each just-processed joinrel, and run set_cheapest() for + * the grouped relation afterwards. */ foreach(lc, root->join_rel_level[lev]) { + bool is_top_rel; + rel = (RelOptInfo *) lfirst(lc); + is_top_rel = bms_equal(rel->relids, root->all_query_rels); + /* Create paths for partitionwise joins. */ generate_partitionwise_join_paths(root, rel); @@ -3507,12 +3937,28 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels) * once we know the final targetlist (see grouping_planner's and * its call to apply_scanjoin_target_to_paths). */ - if (!bms_equal(rel->relids, root->all_query_rels)) + if (!is_top_rel) generate_useful_gather_paths(root, rel, false); /* Find and save the cheapest paths for this rel */ set_cheapest(rel); + /* + * Except for the topmost scan/join rel, consider generating + * partial aggregation paths for the grouped relation on top of + * the paths of this rel. After that, we're done creating paths + * for the grouped relation, so run set_cheapest(). + */ + if (rel->grouped_rel != NULL && !is_top_rel) + { + RelOptInfo *grouped_rel = rel->grouped_rel; + + Assert(IS_GROUPED_REL(grouped_rel)); + + generate_grouped_paths(root, grouped_rel, rel); + set_cheapest(grouped_rel); + } + #ifdef OPTIMIZER_DEBUG pprint(rel); #endif @@ -4382,6 +4828,25 @@ generate_partitionwise_join_paths(PlannerInfo *root, RelOptInfo *rel) if (IS_DUMMY_REL(child_rel)) continue; + /* + * Except for the topmost scan/join rel, consider generating partial + * aggregation paths for the grouped relation on top of the paths of + * this partitioned child-join. After that, we're done creating paths + * for the grouped relation, so run set_cheapest(). + */ + if (child_rel->grouped_rel != NULL && + !bms_equal(IS_OTHER_REL(rel) ? + rel->top_parent_relids : rel->relids, + root->all_query_rels)) + { + RelOptInfo *grouped_rel = child_rel->grouped_rel; + + Assert(IS_GROUPED_REL(grouped_rel)); + + generate_grouped_paths(root, grouped_rel, child_rel); + set_cheapest(grouped_rel); + } + #ifdef OPTIMIZER_DEBUG pprint(child_rel); #endif diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 535248aa525..43b84d239ed 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -16,6 +16,7 @@ #include "miscadmin.h" #include "optimizer/appendinfo.h" +#include "optimizer/cost.h" #include "optimizer/joininfo.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" @@ -36,6 +37,9 @@ static bool has_legal_joinclause(PlannerInfo *root, RelOptInfo *rel); static bool restriction_is_constant_false(List *restrictlist, RelOptInfo *joinrel, bool only_pushed_down); +static void make_grouped_join_rel(PlannerInfo *root, RelOptInfo *rel1, + RelOptInfo *rel2, RelOptInfo *joinrel, + SpecialJoinInfo *sjinfo, List *restrictlist); static void populate_joinrel_with_paths(PlannerInfo *root, RelOptInfo *rel1, RelOptInfo *rel2, RelOptInfo *joinrel, SpecialJoinInfo *sjinfo, List *restrictlist); @@ -762,6 +766,10 @@ make_join_rel(PlannerInfo *root, RelOptInfo *rel1, RelOptInfo *rel2) return joinrel; } + /* Build a grouped join relation for 'joinrel' if possible. */ + make_grouped_join_rel(root, rel1, rel2, joinrel, sjinfo, + restrictlist); + /* Add paths to the join relation. */ populate_joinrel_with_paths(root, rel1, rel2, joinrel, sjinfo, restrictlist); @@ -874,6 +882,186 @@ add_outer_joins_to_relids(PlannerInfo *root, Relids input_relids, } /* + * make_grouped_join_rel + * Build a grouped join relation for the given "joinrel" if eager + * aggregation is applicable and the resulting grouped paths are considered + * useful. + * + * There are two strategies for generating grouped paths for a join relation: + * + * 1. Join a grouped (partially aggregated) input relation with a non-grouped + * input (e.g., AGG(B) JOIN A). + * + * 2. Apply partial aggregation (sorted or hashed) on top of existing + * non-grouped join paths (e.g., AGG(A JOIN B)). + * + * To limit planning effort and avoid an explosion of alternatives, we adopt a + * strategy where partial aggregation is only pushed to the lowest possible + * level in the join tree that is deemed useful. That is, if grouped paths can + * be built using the first strategy, we skip consideration of the second + * strategy for the same join level. + * + * Additionally, if there are multiple lowest useful levels where partial + * aggregation could be applied, such as in a join tree with relations A, B, + * and C where both "AGG(A JOIN B) JOIN C" and "A JOIN AGG(B JOIN C)" are valid + * placements, we choose only the first one encountered during join search. + * This avoids generating multiple versions of the same grouped relation based + * on different aggregation placements. + * + * These heuristics also ensure that all grouped paths for the same grouped + * relation produce the same set of rows, which is a basic assumption in the + * planner. + */ +static void +make_grouped_join_rel(PlannerInfo *root, RelOptInfo *rel1, + RelOptInfo *rel2, RelOptInfo *joinrel, + SpecialJoinInfo *sjinfo, List *restrictlist) +{ + RelOptInfo *grouped_rel; + RelOptInfo *grouped_rel1; + RelOptInfo *grouped_rel2; + bool rel1_empty; + bool rel2_empty; + Relids agg_apply_at; + + /* + * If there are no aggregate expressions or grouping expressions, eager + * aggregation is not possible. + */ + if (root->agg_clause_list == NIL || + root->group_expr_list == NIL) + return; + + /* Retrieve the grouped relations for the two input rels */ + grouped_rel1 = rel1->grouped_rel; + grouped_rel2 = rel2->grouped_rel; + + rel1_empty = (grouped_rel1 == NULL || IS_DUMMY_REL(grouped_rel1)); + rel2_empty = (grouped_rel2 == NULL || IS_DUMMY_REL(grouped_rel2)); + + /* Find or construct a grouped joinrel for this joinrel */ + grouped_rel = joinrel->grouped_rel; + if (grouped_rel == NULL) + { + RelAggInfo *agg_info = NULL; + + /* + * Prepare the information needed to create grouped paths for this + * join relation. + */ + agg_info = create_rel_agg_info(root, joinrel, rel1_empty == rel2_empty); + if (agg_info == NULL) + return; + + /* + * If grouped paths for the given join relation are not considered + * useful, and no grouped paths can be built by joining grouped input + * relations, skip building the grouped join relation. + */ + if (!agg_info->agg_useful && + (rel1_empty == rel2_empty)) + return; + + /* build the grouped relation */ + grouped_rel = build_grouped_rel(root, joinrel); + grouped_rel->reltarget = agg_info->target; + + if (rel1_empty != rel2_empty) + { + /* + * If there is exactly one grouped input relation, then we can + * build grouped paths by joining the input relations. Set size + * estimates for the grouped join relation based on the input + * relations, and update the set of relids where partial + * aggregation is applied to that of the grouped input relation. + */ + set_joinrel_size_estimates(root, grouped_rel, + rel1_empty ? rel1 : grouped_rel1, + rel2_empty ? rel2 : grouped_rel2, + sjinfo, restrictlist); + agg_info->apply_at = rel1_empty ? + grouped_rel2->agg_info->apply_at : + grouped_rel1->agg_info->apply_at; + } + else + { + /* + * Otherwise, grouped paths can be built by applying partial + * aggregation on top of existing non-grouped join paths. Set + * size estimates for the grouped join relation based on the + * estimated number of groups, and track the set of relids where + * partial aggregation is applied. Note that these values may be + * updated later if it is determined that grouped paths can be + * constructed by joining other input relations. + */ + grouped_rel->rows = agg_info->grouped_rows; + agg_info->apply_at = bms_copy(joinrel->relids); + } + + grouped_rel->agg_info = agg_info; + joinrel->grouped_rel = grouped_rel; + } + + Assert(IS_GROUPED_REL(grouped_rel)); + + /* We may have already proven this grouped join relation to be dummy. */ + if (IS_DUMMY_REL(grouped_rel)) + return; + + /* + * Nothing to do if there's no grouped input relation. Also, joining two + * grouped relations is not currently supported. + */ + if (rel1_empty == rel2_empty) + return; + + /* + * Get the set of relids where partial aggregation is applied among the + * given input relations. + */ + agg_apply_at = rel1_empty ? + grouped_rel2->agg_info->apply_at : + grouped_rel1->agg_info->apply_at; + + /* + * If it's not the designated level, skip building grouped paths. + * + * One exception is when it is a subset of the previously recorded level. + * In that case, we need to update the designated level to this one, and + * adjust the size estimates for the grouped join relation accordingly. + * For example, suppose partial aggregation can be applied on top of (B + * JOIN C). If we first construct the join as ((A JOIN B) JOIN C), we'd + * record the designated level as including all three relations (A B C). + * Later, when we consider (A JOIN (B JOIN C)), we encounter the smaller + * (B C) join level directly. Since this is a subset of the previous + * level and still valid for partial aggregation, we update the designated + * level to (B C), and adjust the size estimates accordingly. + */ + if (!bms_equal(agg_apply_at, grouped_rel->agg_info->apply_at)) + { + if (bms_is_subset(agg_apply_at, grouped_rel->agg_info->apply_at)) + { + /* Adjust the size estimates for the grouped join relation. */ + set_joinrel_size_estimates(root, grouped_rel, + rel1_empty ? rel1 : grouped_rel1, + rel2_empty ? rel2 : grouped_rel2, + sjinfo, restrictlist); + grouped_rel->agg_info->apply_at = agg_apply_at; + } + else + return; + } + + /* Make paths for the grouped join relation. */ + populate_joinrel_with_paths(root, + rel1_empty ? rel1 : grouped_rel1, + rel2_empty ? rel2 : grouped_rel2, + grouped_rel, + sjinfo, + restrictlist); +} + +/* * populate_joinrel_with_paths * Add paths to the given joinrel for given pair of joining relations. The * SpecialJoinInfo provides details about the join and the restrictlist @@ -1615,6 +1803,11 @@ try_partitionwise_join(PlannerInfo *root, RelOptInfo *rel1, RelOptInfo *rel2, adjust_child_relids(joinrel->relids, nappinfos, appinfos))); + /* Build a grouped join relation for 'child_joinrel' if possible */ + make_grouped_join_rel(root, child_rel1, child_rel2, + child_joinrel, child_sjinfo, + child_restrictlist); + /* And make paths for the child join */ populate_joinrel_with_paths(root, child_rel1, child_rel2, child_joinrel, child_sjinfo, diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c index 8b04d40d36d..879dcb4608e 100644 --- a/src/backend/optimizer/path/pathkeys.c +++ b/src/backend/optimizer/path/pathkeys.c @@ -2154,14 +2154,31 @@ right_merge_direction(PlannerInfo *root, PathKey *pathkey) * Because we the have the possibility of incremental sort, a prefix list of * keys is potentially useful for improving the performance of the requested * ordering. Thus we return 0, if no valuable keys are found, or the number - * of leading keys shared by the list and the requested ordering.. + * of leading keys shared by the list and the requested ordering. */ static int pathkeys_useful_for_ordering(PlannerInfo *root, List *pathkeys) { int n_common_pathkeys; - (void) pathkeys_count_contained_in(root->query_pathkeys, pathkeys, + (void) pathkeys_count_contained_in(root->sort_pathkeys, pathkeys, + &n_common_pathkeys); + + return n_common_pathkeys; +} + +/* + * pathkeys_useful_for_windowing + * Count the number of pathkeys that are useful for meeting the + * query's desired sort order for window function evaluation. + */ +static int +pathkeys_useful_for_windowing(PlannerInfo *root, List *pathkeys) +{ + int n_common_pathkeys; + + (void) pathkeys_count_contained_in(root->window_pathkeys, + pathkeys, &n_common_pathkeys); return n_common_pathkeys; @@ -2278,6 +2295,9 @@ truncate_useless_pathkeys(PlannerInfo *root, nuseful2 = pathkeys_useful_for_ordering(root, pathkeys); if (nuseful2 > nuseful) nuseful = nuseful2; + nuseful2 = pathkeys_useful_for_windowing(root, pathkeys); + if (nuseful2 > nuseful) + nuseful = nuseful2; nuseful2 = pathkeys_useful_for_grouping(root, pathkeys); if (nuseful2 > nuseful) nuseful = nuseful2; diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c index 3e3fec89252..b8d1c7e88a3 100644 --- a/src/backend/optimizer/plan/initsplan.c +++ b/src/backend/optimizer/plan/initsplan.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "access/nbtree.h" #include "catalog/pg_constraint.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" @@ -31,6 +32,7 @@ #include "optimizer/restrictinfo.h" #include "parser/analyze.h" #include "rewrite/rewriteManip.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/typcache.h" @@ -81,6 +83,12 @@ typedef struct JoinTreeItem } JoinTreeItem; +static bool is_partial_agg_memory_risky(PlannerInfo *root); +static void create_agg_clause_infos(PlannerInfo *root); +static void create_grouping_expr_infos(PlannerInfo *root); +static EquivalenceClass *get_eclass_for_sortgroupclause(PlannerInfo *root, + SortGroupClause *sgc, + Expr *expr); static void extract_lateral_references(PlannerInfo *root, RelOptInfo *brel, Index rtindex); static List *deconstruct_recurse(PlannerInfo *root, Node *jtnode, @@ -628,6 +636,368 @@ remove_useless_groupby_columns(PlannerInfo *root) } } +/* + * setup_eager_aggregation + * Check if eager aggregation is applicable, and if so collect suitable + * aggregate expressions and grouping expressions in the query. + */ +void +setup_eager_aggregation(PlannerInfo *root) +{ + /* + * Don't apply eager aggregation if disabled by user. + */ + if (!enable_eager_aggregate) + return; + + /* + * Don't apply eager aggregation if there are no available GROUP BY + * clauses. + */ + if (!root->processed_groupClause) + return; + + /* + * For now we don't try to support grouping sets. + */ + if (root->parse->groupingSets) + return; + + /* + * For now we don't try to support DISTINCT or ORDER BY aggregates. + */ + if (root->numOrderedAggs > 0) + return; + + /* + * If there are any aggregates that do not support partial mode, or any + * partial aggregates that are non-serializable, do not apply eager + * aggregation. + */ + if (root->hasNonPartialAggs || root->hasNonSerialAggs) + return; + + /* + * We don't try to apply eager aggregation if there are set-returning + * functions in targetlist. + */ + if (root->parse->hasTargetSRFs) + return; + + /* + * Eager aggregation only makes sense if there are multiple base rels in + * the query. + */ + if (bms_membership(root->all_baserels) != BMS_MULTIPLE) + return; + + /* + * Don't apply eager aggregation if any aggregate poses a risk of + * excessive memory usage during partial aggregation. + */ + if (is_partial_agg_memory_risky(root)) + return; + + /* + * Collect aggregate expressions and plain Vars that appear in the + * targetlist and havingQual. + */ + create_agg_clause_infos(root); + + /* + * If there are no suitable aggregate expressions, we cannot apply eager + * aggregation. + */ + if (root->agg_clause_list == NIL) + return; + + /* + * Collect grouping expressions that appear in grouping clauses. + */ + create_grouping_expr_infos(root); +} + +/* + * is_partial_agg_memory_risky + * Check if any aggregate poses a risk of excessive memory usage during + * partial aggregation. + * + * We check if any aggregate has a negative aggtransspace value, which + * indicates that its transition state data can grow unboundedly in size. + * Applying eager aggregation in such cases risks high memory usage since + * partial aggregation results might be stored in join hash tables or + * materialized nodes. + */ +static bool +is_partial_agg_memory_risky(PlannerInfo *root) +{ + ListCell *lc; + + foreach(lc, root->aggtransinfos) + { + AggTransInfo *transinfo = lfirst_node(AggTransInfo, lc); + + if (transinfo->aggtransspace < 0) + return true; + } + + return false; +} + +/* + * create_agg_clause_infos + * Search the targetlist and havingQual for Aggrefs and plain Vars, and + * create an AggClauseInfo for each Aggref node. + */ +static void +create_agg_clause_infos(PlannerInfo *root) +{ + List *tlist_exprs; + List *agg_clause_list = NIL; + List *tlist_vars = NIL; + Relids aggregate_relids = NULL; + bool eager_agg_applicable = true; + ListCell *lc; + + Assert(root->agg_clause_list == NIL); + Assert(root->tlist_vars == NIL); + + tlist_exprs = pull_var_clause((Node *) root->processed_tlist, + PVC_INCLUDE_AGGREGATES | + PVC_RECURSE_WINDOWFUNCS | + PVC_RECURSE_PLACEHOLDERS); + + /* + * Aggregates within the HAVING clause need to be processed in the same + * way as those in the targetlist. Note that HAVING can contain Aggrefs + * but not WindowFuncs. + */ + if (root->parse->havingQual != NULL) + { + List *having_exprs; + + having_exprs = pull_var_clause((Node *) root->parse->havingQual, + PVC_INCLUDE_AGGREGATES | + PVC_RECURSE_PLACEHOLDERS); + if (having_exprs != NIL) + { + tlist_exprs = list_concat(tlist_exprs, having_exprs); + list_free(having_exprs); + } + } + + foreach(lc, tlist_exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Aggref *aggref; + Relids agg_eval_at; + AggClauseInfo *ac_info; + + /* For now we don't try to support GROUPING() expressions */ + if (IsA(expr, GroupingFunc)) + { + eager_agg_applicable = false; + break; + } + + /* Collect plain Vars for future reference */ + if (IsA(expr, Var)) + { + tlist_vars = list_append_unique(tlist_vars, expr); + continue; + } + + aggref = castNode(Aggref, expr); + + Assert(aggref->aggorder == NIL); + Assert(aggref->aggdistinct == NIL); + + /* + * If there are any securityQuals, do not try to apply eager + * aggregation if any non-leakproof aggregate functions are present. + * This is overly strict, but for now... + */ + if (root->qual_security_level > 0 && + !get_func_leakproof(aggref->aggfnoid)) + { + eager_agg_applicable = false; + break; + } + + agg_eval_at = pull_varnos(root, (Node *) aggref); + + /* + * If all base relations in the query are referenced by aggregate + * functions, then eager aggregation is not applicable. + */ + aggregate_relids = bms_add_members(aggregate_relids, agg_eval_at); + if (bms_is_subset(root->all_baserels, aggregate_relids)) + { + eager_agg_applicable = false; + break; + } + + /* OK, create the AggClauseInfo node */ + ac_info = makeNode(AggClauseInfo); + ac_info->aggref = aggref; + ac_info->agg_eval_at = agg_eval_at; + + /* ... and add it to the list */ + agg_clause_list = list_append_unique(agg_clause_list, ac_info); + } + + list_free(tlist_exprs); + + if (eager_agg_applicable) + { + root->agg_clause_list = agg_clause_list; + root->tlist_vars = tlist_vars; + } + else + { + list_free_deep(agg_clause_list); + list_free(tlist_vars); + } +} + +/* + * create_grouping_expr_infos + * Create a GroupingExprInfo for each expression usable as grouping key. + * + * If any grouping expression is not suitable, we will just return with + * root->group_expr_list being NIL. + */ +static void +create_grouping_expr_infos(PlannerInfo *root) +{ + List *exprs = NIL; + List *sortgrouprefs = NIL; + List *ecs = NIL; + ListCell *lc, + *lc1, + *lc2, + *lc3; + + Assert(root->group_expr_list == NIL); + + foreach(lc, root->processed_groupClause) + { + SortGroupClause *sgc = lfirst_node(SortGroupClause, lc); + TargetEntry *tle = get_sortgroupclause_tle(sgc, root->processed_tlist); + TypeCacheEntry *tce; + Oid equalimageproc; + + Assert(tle->ressortgroupref > 0); + + /* + * For now we only support plain Vars as grouping expressions. + */ + if (!IsA(tle->expr, Var)) + return; + + /* + * Eager aggregation is only possible if equality implies image + * equality for each grouping key. Otherwise, placing keys with + * different byte images into the same group may result in the loss of + * information that could be necessary to evaluate upper qual clauses. + * + * For instance, the NUMERIC data type is not supported, as values + * that are considered equal by the equality operator (e.g., 0 and + * 0.0) can have different scales. + */ + tce = lookup_type_cache(exprType((Node *) tle->expr), + TYPECACHE_BTREE_OPFAMILY); + if (!OidIsValid(tce->btree_opf) || + !OidIsValid(tce->btree_opintype)) + return; + + equalimageproc = get_opfamily_proc(tce->btree_opf, + tce->btree_opintype, + tce->btree_opintype, + BTEQUALIMAGE_PROC); + if (!OidIsValid(equalimageproc) || + !DatumGetBool(OidFunctionCall1Coll(equalimageproc, + tce->typcollation, + ObjectIdGetDatum(tce->btree_opintype)))) + return; + + exprs = lappend(exprs, tle->expr); + sortgrouprefs = lappend_int(sortgrouprefs, tle->ressortgroupref); + ecs = lappend(ecs, get_eclass_for_sortgroupclause(root, sgc, tle->expr)); + } + + /* + * Construct a GroupingExprInfo for each expression. + */ + forthree(lc1, exprs, lc2, sortgrouprefs, lc3, ecs) + { + Expr *expr = (Expr *) lfirst(lc1); + int sortgroupref = lfirst_int(lc2); + EquivalenceClass *ec = (EquivalenceClass *) lfirst(lc3); + GroupingExprInfo *ge_info; + + ge_info = makeNode(GroupingExprInfo); + ge_info->expr = (Expr *) copyObject(expr); + ge_info->sortgroupref = sortgroupref; + ge_info->ec = ec; + + root->group_expr_list = lappend(root->group_expr_list, ge_info); + } +} + +/* + * get_eclass_for_sortgroupclause + * Given a group clause and an expression, find an existing equivalence + * class that the expression is a member of; return NULL if none. + */ +static EquivalenceClass * +get_eclass_for_sortgroupclause(PlannerInfo *root, SortGroupClause *sgc, + Expr *expr) +{ + Oid opfamily, + opcintype, + collation; + CompareType cmptype; + Oid equality_op; + List *opfamilies; + + /* Punt if the group clause is not sortable */ + if (!OidIsValid(sgc->sortop)) + return NULL; + + /* Find the operator in pg_amop --- failure shouldn't happen */ + if (!get_ordering_op_properties(sgc->sortop, + &opfamily, &opcintype, &cmptype)) + elog(ERROR, "operator %u is not a valid ordering operator", + sgc->sortop); + + /* Because SortGroupClause doesn't carry collation, consult the expr */ + collation = exprCollation((Node *) expr); + + /* + * EquivalenceClasses need to contain opfamily lists based on the family + * membership of mergejoinable equality operators, which could belong to + * more than one opfamily. So we have to look up the opfamily's equality + * operator and get its membership. + */ + equality_op = get_opfamily_member_for_cmptype(opfamily, + opcintype, + opcintype, + COMPARE_EQ); + if (!OidIsValid(equality_op)) /* shouldn't happen */ + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + COMPARE_EQ, opcintype, opcintype, opfamily); + opfamilies = get_mergejoin_opfamilies(equality_op); + if (!opfamilies) /* certainly should find some */ + elog(ERROR, "could not find opfamilies for equality operator %u", + equality_op); + + /* Now find a matching EquivalenceClass */ + return get_eclass_for_sort_expr(root, expr, opfamilies, opcintype, + collation, sgc->tleSortGroupRef, + NULL, false); +} + /***************************************************************************** * * LATERAL REFERENCES diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c index 5467e094ca7..eefc486a566 100644 --- a/src/backend/optimizer/plan/planmain.c +++ b/src/backend/optimizer/plan/planmain.c @@ -76,6 +76,9 @@ query_planner(PlannerInfo *root, root->placeholder_list = NIL; root->placeholder_array = NULL; root->placeholder_array_size = 0; + root->agg_clause_list = NIL; + root->group_expr_list = NIL; + root->tlist_vars = NIL; root->fkey_list = NIL; root->initial_rels = NIL; @@ -266,6 +269,12 @@ query_planner(PlannerInfo *root, extract_restriction_or_clauses(root); /* + * Check if eager aggregation is applicable, and if so, set up + * root->agg_clause_list and root->group_expr_list. + */ + setup_eager_aggregation(root); + + /* * Now expand appendrels by adding "otherrels" for their children. We * delay this to the end so that we have as much information as possible * available for each baserel, including all restriction clauses. That diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 0c9397a36c3..e8ea78c0c97 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -73,6 +73,12 @@ bool enable_distinct_reordering = true; /* Hook for plugins to get control in planner() */ planner_hook_type planner_hook = NULL; +/* Hook for plugins to get control after PlannerGlobal is initialized */ +planner_setup_hook_type planner_setup_hook = NULL; + +/* Hook for plugins to get control before PlannerGlobal is discarded */ +planner_shutdown_hook_type planner_shutdown_hook = NULL; + /* Hook for plugins to get control when grouping_planner() plans upper rels */ create_upper_paths_hook_type create_upper_paths_hook = NULL; @@ -232,7 +238,6 @@ static void add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *partially_grouped_rel, const AggClauseCosts *agg_costs, grouping_sets_data *gd, - double dNumGroups, GroupPathExtraData *extra); static RelOptInfo *create_partial_grouping_paths(PlannerInfo *root, RelOptInfo *grouped_rel, @@ -280,6 +285,23 @@ static void create_partial_unique_paths(PlannerInfo *root, RelOptInfo *input_rel * * Query optimizer entry point * + * Inputs: + * parse: an analyzed-and-rewritten query tree for an optimizable statement + * query_string: source text for the query tree (used for error reports) + * cursorOptions: bitmask of CURSOR_OPT_XXX flags, see parsenodes.h + * boundParams: passed-in parameter values, or NULL if none + * es: ExplainState if being called from EXPLAIN, else NULL + * + * The result is a PlannedStmt tree. + * + * PARAM_EXTERN Param nodes within the parse tree can be replaced by Consts + * using values from boundParams, if those values are marked PARAM_FLAG_CONST. + * Parameter values not so marked are still relied on for estimation purposes. + * + * The ExplainState pointer is not currently used by the core planner, but it + * is passed through to some planner hooks so that they can report information + * back to EXPLAIN extension hooks. + * * To support loadable plugins that monitor or modify planner behavior, * we provide a hook variable that lets a plugin get control before and * after the standard planning process. The plugin would normally call @@ -291,14 +313,16 @@ static void create_partial_unique_paths(PlannerInfo *root, RelOptInfo *input_rel *****************************************************************************/ PlannedStmt * planner(Query *parse, const char *query_string, int cursorOptions, - ParamListInfo boundParams) + ParamListInfo boundParams, ExplainState *es) { PlannedStmt *result; if (planner_hook) - result = (*planner_hook) (parse, query_string, cursorOptions, boundParams); + result = (*planner_hook) (parse, query_string, cursorOptions, + boundParams, es); else - result = standard_planner(parse, query_string, cursorOptions, boundParams); + result = standard_planner(parse, query_string, cursorOptions, + boundParams, es); pgstat_report_plan_id(result->planId, false); @@ -307,7 +331,7 @@ planner(Query *parse, const char *query_string, int cursorOptions, PlannedStmt * standard_planner(Query *parse, const char *query_string, int cursorOptions, - ParamListInfo boundParams) + ParamListInfo boundParams, ExplainState *es) { PlannedStmt *result; PlannerGlobal *glob; @@ -438,6 +462,10 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, tuple_fraction = 0.0; } + /* Allow plugins to take control after we've initialized "glob" */ + if (planner_setup_hook) + (*planner_setup_hook) (glob, parse, query_string, &tuple_fraction, es); + /* primary planning entry point (may recurse for subqueries) */ root = subquery_planner(glob, parse, NULL, NULL, false, tuple_fraction, NULL); @@ -617,6 +645,10 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->jitFlags |= PGJIT_DEFORM; } + /* Allow plugins to take control before we discard "glob" */ + if (planner_shutdown_hook) + (*planner_shutdown_hook) (glob, parse, query_string, result); + if (glob->partition_directory != NULL) DestroyPartitionDirectory(glob->partition_directory); @@ -4014,9 +4046,7 @@ create_ordinary_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, GroupPathExtraData *extra, RelOptInfo **partially_grouped_rel_p) { - Path *cheapest_path = input_rel->cheapest_total_path; RelOptInfo *partially_grouped_rel = NULL; - double dNumGroups; PartitionwiseAggregateType patype = PARTITIONWISE_AGGREGATE_NONE; /* @@ -4098,23 +4128,16 @@ create_ordinary_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, /* Gather any partially grouped partial paths. */ if (partially_grouped_rel && partially_grouped_rel->partial_pathlist) - { gather_grouping_paths(root, partially_grouped_rel); - set_cheapest(partially_grouped_rel); - } - /* - * Estimate number of groups. - */ - dNumGroups = get_number_of_groups(root, - cheapest_path->rows, - gd, - extra->targetList); + /* Now choose the best path(s) for partially_grouped_rel. */ + if (partially_grouped_rel && partially_grouped_rel->pathlist) + set_cheapest(partially_grouped_rel); /* Build final grouping paths */ add_paths_to_grouping_rel(root, input_rel, grouped_rel, partially_grouped_rel, agg_costs, gd, - dNumGroups, extra); + extra); /* Give a helpful error if we failed to find any implementation */ if (grouped_rel->pathlist == NIL) @@ -7059,16 +7082,42 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, RelOptInfo *partially_grouped_rel, const AggClauseCosts *agg_costs, - grouping_sets_data *gd, double dNumGroups, + grouping_sets_data *gd, GroupPathExtraData *extra) { Query *parse = root->parse; Path *cheapest_path = input_rel->cheapest_total_path; + Path *cheapest_partially_grouped_path = NULL; ListCell *lc; bool can_hash = (extra->flags & GROUPING_CAN_USE_HASH) != 0; bool can_sort = (extra->flags & GROUPING_CAN_USE_SORT) != 0; List *havingQual = (List *) extra->havingQual; AggClauseCosts *agg_final_costs = &extra->agg_final_costs; + double dNumGroups = 0; + double dNumFinalGroups = 0; + + /* + * Estimate number of groups for non-split aggregation. + */ + dNumGroups = get_number_of_groups(root, + cheapest_path->rows, + gd, + extra->targetList); + + if (partially_grouped_rel && partially_grouped_rel->pathlist) + { + cheapest_partially_grouped_path = + partially_grouped_rel->cheapest_total_path; + + /* + * Estimate number of groups for final phase of partial aggregation. + */ + dNumFinalGroups = + get_number_of_groups(root, + cheapest_partially_grouped_path->rows, + gd, + extra->targetList); + } if (can_sort) { @@ -7181,7 +7230,7 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, path = make_ordered_path(root, grouped_rel, path, - partially_grouped_rel->cheapest_total_path, + cheapest_partially_grouped_path, info->pathkeys, -1.0); @@ -7199,7 +7248,7 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, info->clauses, havingQual, agg_final_costs, - dNumGroups)); + dNumFinalGroups)); else add_path(grouped_rel, (Path *) create_group_path(root, @@ -7207,7 +7256,7 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, path, info->clauses, havingQual, - dNumGroups)); + dNumFinalGroups)); } } @@ -7249,19 +7298,17 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, */ if (partially_grouped_rel && partially_grouped_rel->pathlist) { - Path *path = partially_grouped_rel->cheapest_total_path; - add_path(grouped_rel, (Path *) create_agg_path(root, grouped_rel, - path, + cheapest_partially_grouped_path, grouped_rel->reltarget, AGG_HASHED, AGGSPLIT_FINAL_DESERIAL, root->processed_groupClause, havingQual, agg_final_costs, - dNumGroups)); + dNumFinalGroups)); } } @@ -7301,6 +7348,7 @@ create_partial_grouping_paths(PlannerInfo *root, { Query *parse = root->parse; RelOptInfo *partially_grouped_rel; + RelOptInfo *eager_agg_rel = NULL; AggClauseCosts *agg_partial_costs = &extra->agg_partial_costs; AggClauseCosts *agg_final_costs = &extra->agg_final_costs; Path *cheapest_partial_path = NULL; @@ -7312,6 +7360,15 @@ create_partial_grouping_paths(PlannerInfo *root, bool can_sort = (extra->flags & GROUPING_CAN_USE_SORT) != 0; /* + * Check whether any partially aggregated paths have been generated + * through eager aggregation. + */ + if (input_rel->grouped_rel && + !IS_DUMMY_REL(input_rel->grouped_rel) && + input_rel->grouped_rel->pathlist != NIL) + eager_agg_rel = input_rel->grouped_rel; + + /* * Consider whether we should generate partially aggregated non-partial * paths. We can only do this if we have a non-partial path, and only if * the parent of the input rel is performing partial partitionwise @@ -7332,11 +7389,13 @@ create_partial_grouping_paths(PlannerInfo *root, /* * If we can't partially aggregate partial paths, and we can't partially - * aggregate non-partial paths, then don't bother creating the new + * aggregate non-partial paths, and no partially aggregated paths were + * generated by eager aggregation, then don't bother creating the new * RelOptInfo at all, unless the caller specified force_rel_creation. */ if (cheapest_total_path == NULL && cheapest_partial_path == NULL && + eager_agg_rel == NULL && !force_rel_creation) return NULL; @@ -7562,6 +7621,51 @@ create_partial_grouping_paths(PlannerInfo *root, } /* + * Add any partially aggregated paths generated by eager aggregation to + * the new upper relation after applying projection steps as needed. + */ + if (eager_agg_rel) + { + /* Add the paths */ + foreach(lc, eager_agg_rel->pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* Shouldn't have any parameterized paths anymore */ + Assert(path->param_info == NULL); + + path = (Path *) create_projection_path(root, + partially_grouped_rel, + path, + partially_grouped_rel->reltarget); + + add_path(partially_grouped_rel, path); + } + + /* + * Likewise add the partial paths, but only if parallelism is possible + * for partially_grouped_rel. + */ + if (partially_grouped_rel->consider_parallel) + { + foreach(lc, eager_agg_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* Shouldn't have any parameterized paths anymore */ + Assert(path->param_info == NULL); + + path = (Path *) create_projection_path(root, + partially_grouped_rel, + path, + partially_grouped_rel->reltarget); + + add_partial_path(partially_grouped_rel, path); + } + } + } + + /* * If there is an FDW that's responsible for all baserels of the query, * let it consider adding partially grouped ForeignPaths. */ @@ -8124,13 +8228,6 @@ create_partitionwise_grouping_paths(PlannerInfo *root, add_paths_to_append_rel(root, partially_grouped_rel, partially_grouped_live_children); - - /* - * We need call set_cheapest, since the finalization step will use the - * cheapest path from the rel. - */ - if (partially_grouped_rel->pathlist) - set_cheapest(partially_grouped_rel); } /* If possible, create append paths for fully grouped children. */ diff --git a/src/backend/optimizer/util/appendinfo.c b/src/backend/optimizer/util/appendinfo.c index 5b3dc0d8653..69b8b0c2ae0 100644 --- a/src/backend/optimizer/util/appendinfo.c +++ b/src/backend/optimizer/util/appendinfo.c @@ -517,6 +517,57 @@ adjust_appendrel_attrs_mutator(Node *node, } /* + * We have to process RelAggInfo nodes specially. + */ + if (IsA(node, RelAggInfo)) + { + RelAggInfo *oldinfo = (RelAggInfo *) node; + RelAggInfo *newinfo = makeNode(RelAggInfo); + + newinfo->target = (PathTarget *) + adjust_appendrel_attrs_mutator((Node *) oldinfo->target, + context); + + newinfo->agg_input = (PathTarget *) + adjust_appendrel_attrs_mutator((Node *) oldinfo->agg_input, + context); + + newinfo->group_clauses = oldinfo->group_clauses; + + newinfo->group_exprs = (List *) + adjust_appendrel_attrs_mutator((Node *) oldinfo->group_exprs, + context); + + return (Node *) newinfo; + } + + /* + * We have to process PathTarget nodes specially. + */ + if (IsA(node, PathTarget)) + { + PathTarget *oldtarget = (PathTarget *) node; + PathTarget *newtarget = makeNode(PathTarget); + + /* Copy all flat-copiable fields */ + memcpy(newtarget, oldtarget, sizeof(PathTarget)); + + newtarget->exprs = (List *) + adjust_appendrel_attrs_mutator((Node *) oldtarget->exprs, + context); + + if (oldtarget->sortgrouprefs) + { + Size nbytes = list_length(oldtarget->exprs) * sizeof(Index); + + newtarget->sortgrouprefs = (Index *) palloc(nbytes); + memcpy(newtarget->sortgrouprefs, oldtarget->sortgrouprefs, nbytes); + } + + return (Node *) newtarget; + } + + /* * NOTE: we do not need to recurse into sublinks, because they should * already have been converted to subplans before we see them. */ diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index 0e523d2eb5b..cf1bc672137 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -16,6 +16,8 @@ #include <limits.h> +#include "access/nbtree.h" +#include "catalog/pg_constraint.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "optimizer/appendinfo.h" @@ -27,12 +29,16 @@ #include "optimizer/paths.h" #include "optimizer/placeholder.h" #include "optimizer/plancat.h" +#include "optimizer/planner.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" +#include "parser/parse_oper.h" #include "parser/parse_relation.h" #include "rewrite/rewriteManip.h" #include "utils/hsearch.h" #include "utils/lsyscache.h" +#include "utils/selfuncs.h" +#include "utils/typcache.h" typedef struct JoinHashEntry @@ -83,6 +89,14 @@ static void build_child_join_reltarget(PlannerInfo *root, RelOptInfo *childrel, int nappinfos, AppendRelInfo **appinfos); +static bool eager_aggregation_possible_for_relation(PlannerInfo *root, + RelOptInfo *rel); +static bool init_grouping_targets(PlannerInfo *root, RelOptInfo *rel, + PathTarget *target, PathTarget *agg_input, + List **group_clauses, List **group_exprs); +static bool is_var_in_aggref_only(PlannerInfo *root, Var *var); +static bool is_var_needed_by_join(PlannerInfo *root, Var *var, RelOptInfo *rel); +static Index get_expression_sortgroupref(PlannerInfo *root, Expr *expr); /* @@ -278,6 +292,8 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptInfo *parent) rel->joininfo = NIL; rel->has_eclass_joins = false; rel->consider_partitionwise_join = false; /* might get changed later */ + rel->agg_info = NULL; + rel->grouped_rel = NULL; rel->part_scheme = NULL; rel->nparts = -1; rel->boundinfo = NULL; @@ -409,6 +425,103 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptInfo *parent) } /* + * build_simple_grouped_rel + * Construct a new RelOptInfo representing a grouped version of the input + * simple relation. + */ +RelOptInfo * +build_simple_grouped_rel(PlannerInfo *root, RelOptInfo *rel) +{ + RelOptInfo *grouped_rel; + RelAggInfo *agg_info; + + /* + * We should have available aggregate expressions and grouping + * expressions, otherwise we cannot reach here. + */ + Assert(root->agg_clause_list != NIL); + Assert(root->group_expr_list != NIL); + + /* nothing to do for dummy rel */ + if (IS_DUMMY_REL(rel)) + return NULL; + + /* + * Prepare the information needed to create grouped paths for this simple + * relation. + */ + agg_info = create_rel_agg_info(root, rel, true); + if (agg_info == NULL) + return NULL; + + /* + * If grouped paths for the given simple relation are not considered + * useful, skip building the grouped relation. + */ + if (!agg_info->agg_useful) + return NULL; + + /* Track the set of relids at which partial aggregation is applied */ + agg_info->apply_at = bms_copy(rel->relids); + + /* build the grouped relation */ + grouped_rel = build_grouped_rel(root, rel); + grouped_rel->reltarget = agg_info->target; + grouped_rel->rows = agg_info->grouped_rows; + grouped_rel->agg_info = agg_info; + + rel->grouped_rel = grouped_rel; + + return grouped_rel; +} + +/* + * build_grouped_rel + * Build a grouped relation by flat copying the input relation and resetting + * the necessary fields. + */ +RelOptInfo * +build_grouped_rel(PlannerInfo *root, RelOptInfo *rel) +{ + RelOptInfo *grouped_rel; + + grouped_rel = makeNode(RelOptInfo); + memcpy(grouped_rel, rel, sizeof(RelOptInfo)); + + /* + * clear path info + */ + grouped_rel->pathlist = NIL; + grouped_rel->ppilist = NIL; + grouped_rel->partial_pathlist = NIL; + grouped_rel->cheapest_startup_path = NULL; + grouped_rel->cheapest_total_path = NULL; + grouped_rel->cheapest_parameterized_paths = NIL; + + /* + * clear partition info + */ + grouped_rel->part_scheme = NULL; + grouped_rel->nparts = -1; + grouped_rel->boundinfo = NULL; + grouped_rel->partbounds_merged = false; + grouped_rel->partition_qual = NIL; + grouped_rel->part_rels = NULL; + grouped_rel->live_parts = NULL; + grouped_rel->all_partrels = NULL; + grouped_rel->partexprs = NULL; + grouped_rel->nullable_partexprs = NULL; + grouped_rel->consider_partitionwise_join = false; + + /* + * clear size estimates + */ + grouped_rel->rows = 0; + + return grouped_rel; +} + +/* * find_base_rel * Find a base or otherrel relation entry, which must already exist. */ @@ -759,6 +872,8 @@ build_join_rel(PlannerInfo *root, joinrel->joininfo = NIL; joinrel->has_eclass_joins = false; joinrel->consider_partitionwise_join = false; /* might get changed later */ + joinrel->agg_info = NULL; + joinrel->grouped_rel = NULL; joinrel->parent = NULL; joinrel->top_parent = NULL; joinrel->top_parent_relids = NULL; @@ -945,6 +1060,8 @@ build_child_join_rel(PlannerInfo *root, RelOptInfo *outer_rel, joinrel->joininfo = NIL; joinrel->has_eclass_joins = false; joinrel->consider_partitionwise_join = false; /* might get changed later */ + joinrel->agg_info = NULL; + joinrel->grouped_rel = NULL; joinrel->parent = parent_joinrel; joinrel->top_parent = parent_joinrel->top_parent ? parent_joinrel->top_parent : parent_joinrel; joinrel->top_parent_relids = joinrel->top_parent->relids; @@ -2523,3 +2640,536 @@ build_child_join_reltarget(PlannerInfo *root, childrel->reltarget->cost.per_tuple = parentrel->reltarget->cost.per_tuple; childrel->reltarget->width = parentrel->reltarget->width; } + +/* + * create_rel_agg_info + * Create the RelAggInfo structure for the given relation if it can produce + * grouped paths. The given relation is the non-grouped one which has the + * reltarget already constructed. + * + * calculate_grouped_rows: if true, calculate the estimated number of grouped + * rows for the relation. If false, skip the estimation to avoid unnecessary + * planning overhead. + */ +RelAggInfo * +create_rel_agg_info(PlannerInfo *root, RelOptInfo *rel, + bool calculate_grouped_rows) +{ + ListCell *lc; + RelAggInfo *result; + PathTarget *agg_input; + PathTarget *target; + List *group_clauses = NIL; + List *group_exprs = NIL; + + /* + * The lists of aggregate expressions and grouping expressions should have + * been constructed. + */ + Assert(root->agg_clause_list != NIL); + Assert(root->group_expr_list != NIL); + + /* + * If this is a child rel, the grouped rel for its parent rel must have + * been created if it can. So we can just use parent's RelAggInfo if + * there is one, with appropriate variable substitutions. + */ + if (IS_OTHER_REL(rel)) + { + RelOptInfo *grouped_rel; + RelAggInfo *agg_info; + + grouped_rel = rel->top_parent->grouped_rel; + if (grouped_rel == NULL) + return NULL; + + Assert(IS_GROUPED_REL(grouped_rel)); + + /* Must do multi-level transformation */ + agg_info = (RelAggInfo *) + adjust_appendrel_attrs_multilevel(root, + (Node *) grouped_rel->agg_info, + rel, + rel->top_parent); + + agg_info->apply_at = NULL; /* caller will change this later */ + + if (calculate_grouped_rows) + { + agg_info->grouped_rows = + estimate_num_groups(root, agg_info->group_exprs, + rel->rows, NULL, NULL); + + /* + * The grouped paths for the given relation are considered useful + * iff the average group size is no less than + * min_eager_agg_group_size. + */ + agg_info->agg_useful = + (rel->rows / agg_info->grouped_rows) >= min_eager_agg_group_size; + } + + return agg_info; + } + + /* Check if it's possible to produce grouped paths for this relation. */ + if (!eager_aggregation_possible_for_relation(root, rel)) + return NULL; + + /* + * Create targets for the grouped paths and for the input paths of the + * grouped paths. + */ + target = create_empty_pathtarget(); + agg_input = create_empty_pathtarget(); + + /* ... and initialize these targets */ + if (!init_grouping_targets(root, rel, target, agg_input, + &group_clauses, &group_exprs)) + return NULL; + + /* + * Eager aggregation is not applicable if there are no available grouping + * expressions. + */ + if (group_clauses == NIL) + return NULL; + + /* Add aggregates to the grouping target */ + foreach(lc, root->agg_clause_list) + { + AggClauseInfo *ac_info = lfirst_node(AggClauseInfo, lc); + Aggref *aggref; + + Assert(IsA(ac_info->aggref, Aggref)); + + aggref = (Aggref *) copyObject(ac_info->aggref); + mark_partial_aggref(aggref, AGGSPLIT_INITIAL_SERIAL); + + add_column_to_pathtarget(target, (Expr *) aggref, 0); + } + + /* Set the estimated eval cost and output width for both targets */ + set_pathtarget_cost_width(root, target); + set_pathtarget_cost_width(root, agg_input); + + /* build the RelAggInfo result */ + result = makeNode(RelAggInfo); + result->target = target; + result->agg_input = agg_input; + result->group_clauses = group_clauses; + result->group_exprs = group_exprs; + result->apply_at = NULL; /* caller will change this later */ + + if (calculate_grouped_rows) + { + result->grouped_rows = estimate_num_groups(root, result->group_exprs, + rel->rows, NULL, NULL); + + /* + * The grouped paths for the given relation are considered useful iff + * the average group size is no less than min_eager_agg_group_size. + */ + result->agg_useful = + (rel->rows / result->grouped_rows) >= min_eager_agg_group_size; + } + + return result; +} + +/* + * eager_aggregation_possible_for_relation + * Check if it's possible to produce grouped paths for the given relation. + */ +static bool +eager_aggregation_possible_for_relation(PlannerInfo *root, RelOptInfo *rel) +{ + ListCell *lc; + int cur_relid; + + /* + * Check to see if the given relation is in the nullable side of an outer + * join. In this case, we cannot push a partial aggregation down to the + * relation, because the NULL-extended rows produced by the outer join + * would not be available when we perform the partial aggregation, while + * with a non-eager-aggregation plan these rows are available for the + * top-level aggregation. Doing so may result in the rows being grouped + * differently than expected, or produce incorrect values from the + * aggregate functions. + */ + cur_relid = -1; + while ((cur_relid = bms_next_member(rel->relids, cur_relid)) >= 0) + { + RelOptInfo *baserel = find_base_rel_ignore_join(root, cur_relid); + + if (baserel == NULL) + continue; /* ignore outer joins in rel->relids */ + + if (!bms_is_subset(baserel->nulling_relids, rel->relids)) + return false; + } + + /* + * For now we don't try to support PlaceHolderVars. + */ + foreach(lc, rel->reltarget->exprs) + { + Expr *expr = lfirst(lc); + + if (IsA(expr, PlaceHolderVar)) + return false; + } + + /* Caller should only pass base relations or joins. */ + Assert(rel->reloptkind == RELOPT_BASEREL || + rel->reloptkind == RELOPT_JOINREL); + + /* + * Check if all aggregate expressions can be evaluated on this relation + * level. + */ + foreach(lc, root->agg_clause_list) + { + AggClauseInfo *ac_info = lfirst_node(AggClauseInfo, lc); + + Assert(IsA(ac_info->aggref, Aggref)); + + /* + * Give up if any aggregate requires relations other than the current + * one. If the aggregate requires the current relation plus + * additional relations, grouping the current relation could make some + * input rows unavailable for the higher aggregate and may reduce the + * number of input rows it receives. If the aggregate does not + * require the current relation at all, it should not be grouped, as + * we do not support joining two grouped relations. + */ + if (!bms_is_subset(ac_info->agg_eval_at, rel->relids)) + return false; + } + + return true; +} + +/* + * init_grouping_targets + * Initialize the target for grouped paths (target) as well as the target + * for paths that generate input for the grouped paths (agg_input). + * + * We also construct the list of SortGroupClauses and the list of grouping + * expressions for the partial aggregation, and return them in *group_clause + * and *group_exprs. + * + * Return true if the targets could be initialized, false otherwise. + */ +static bool +init_grouping_targets(PlannerInfo *root, RelOptInfo *rel, + PathTarget *target, PathTarget *agg_input, + List **group_clauses, List **group_exprs) +{ + ListCell *lc; + List *possibly_dependent = NIL; + Index maxSortGroupRef; + + /* Identify the max sortgroupref */ + maxSortGroupRef = 0; + foreach(lc, root->processed_tlist) + { + Index ref = ((TargetEntry *) lfirst(lc))->ressortgroupref; + + if (ref > maxSortGroupRef) + maxSortGroupRef = ref; + } + + /* + * At this point, all Vars from this relation that are needed by upper + * joins or are required in the final targetlist should already be present + * in its reltarget. Therefore, we can safely iterate over this + * relation's reltarget->exprs to construct the PathTarget and grouping + * clauses for the grouped paths. + */ + foreach(lc, rel->reltarget->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sortgroupref; + + /* + * Given that PlaceHolderVar currently prevents us from doing eager + * aggregation, the source target cannot contain anything more complex + * than a Var. + */ + Assert(IsA(expr, Var)); + + /* + * Get the sortgroupref of the expr if it is found among, or can be + * deduced from, the original grouping expressions. + */ + sortgroupref = get_expression_sortgroupref(root, expr); + if (sortgroupref > 0) + { + SortGroupClause *sgc; + + /* Find the matching SortGroupClause */ + sgc = get_sortgroupref_clause(sortgroupref, root->processed_groupClause); + Assert(sgc->tleSortGroupRef <= maxSortGroupRef); + + /* + * If the target expression is to be used as a grouping key, it + * should be emitted by the grouped paths that have been pushed + * down to this relation level. + */ + add_column_to_pathtarget(target, expr, sortgroupref); + + /* + * ... and it also should be emitted by the input paths. + */ + add_column_to_pathtarget(agg_input, expr, sortgroupref); + + /* + * Record this SortGroupClause and grouping expression. Note that + * this SortGroupClause might have already been recorded. + */ + if (!list_member(*group_clauses, sgc)) + { + *group_clauses = lappend(*group_clauses, sgc); + *group_exprs = lappend(*group_exprs, expr); + } + } + else if (is_var_needed_by_join(root, (Var *) expr, rel)) + { + /* + * The expression is needed for an upper join but is neither in + * the GROUP BY clause nor derivable from it using EC (otherwise, + * it would have already been included in the targets above). We + * need to create a special SortGroupClause for this expression. + * + * It is important to include such expressions in the grouping + * keys. This is essential to ensure that an aggregated row from + * the partial aggregation matches the other side of the join if + * and only if each row in the partial group does. This ensures + * that all rows within the same partial group share the same + * 'destiny', which is crucial for maintaining correctness. + */ + SortGroupClause *sgc; + TypeCacheEntry *tce; + Oid equalimageproc; + + /* + * But first, check if equality implies image equality for this + * expression. If not, we cannot use it as a grouping key. See + * comments in create_grouping_expr_infos(). + */ + tce = lookup_type_cache(exprType((Node *) expr), + TYPECACHE_BTREE_OPFAMILY); + if (!OidIsValid(tce->btree_opf) || + !OidIsValid(tce->btree_opintype)) + return false; + + equalimageproc = get_opfamily_proc(tce->btree_opf, + tce->btree_opintype, + tce->btree_opintype, + BTEQUALIMAGE_PROC); + if (!OidIsValid(equalimageproc) || + !DatumGetBool(OidFunctionCall1Coll(equalimageproc, + tce->typcollation, + ObjectIdGetDatum(tce->btree_opintype)))) + return false; + + /* Create the SortGroupClause. */ + sgc = makeNode(SortGroupClause); + + /* Initialize the SortGroupClause. */ + sgc->tleSortGroupRef = ++maxSortGroupRef; + get_sort_group_operators(exprType((Node *) expr), + false, true, false, + &sgc->sortop, &sgc->eqop, NULL, + &sgc->hashable); + + /* This expression should be emitted by the grouped paths */ + add_column_to_pathtarget(target, expr, sgc->tleSortGroupRef); + + /* ... and it also should be emitted by the input paths. */ + add_column_to_pathtarget(agg_input, expr, sgc->tleSortGroupRef); + + /* Record this SortGroupClause and grouping expression */ + *group_clauses = lappend(*group_clauses, sgc); + *group_exprs = lappend(*group_exprs, expr); + } + else if (is_var_in_aggref_only(root, (Var *) expr)) + { + /* + * The expression is referenced by an aggregate function pushed + * down to this relation and does not appear elsewhere in the + * targetlist or havingQual. Add it to 'agg_input' but not to + * 'target'. + */ + add_new_column_to_pathtarget(agg_input, expr); + } + else + { + /* + * The expression may be functionally dependent on other + * expressions in the target, but we cannot verify this until all + * target expressions have been constructed. + */ + possibly_dependent = lappend(possibly_dependent, expr); + } + } + + /* + * Now we can verify whether an expression is functionally dependent on + * others. + */ + foreach(lc, possibly_dependent) + { + Var *tvar; + List *deps = NIL; + RangeTblEntry *rte; + + tvar = lfirst_node(Var, lc); + rte = root->simple_rte_array[tvar->varno]; + + if (check_functional_grouping(rte->relid, tvar->varno, + tvar->varlevelsup, + target->exprs, &deps)) + { + /* + * The expression is functionally dependent on other target + * expressions, so it can be included in the targets. Since it + * will not be used as a grouping key, a sortgroupref is not + * needed for it. + */ + add_new_column_to_pathtarget(target, (Expr *) tvar); + add_new_column_to_pathtarget(agg_input, (Expr *) tvar); + } + else + { + /* + * We may arrive here with a grouping expression that is proven + * redundant by EquivalenceClass processing, such as 't1.a' in the + * query below. + * + * select max(t1.c) from t t1, t t2 where t1.a = 1 group by t1.a, + * t1.b; + * + * For now we just give up in this case. + */ + return false; + } + } + + return true; +} + +/* + * is_var_in_aggref_only + * Check whether the given Var appears in aggregate expressions and not + * elsewhere in the targetlist or havingQual. + */ +static bool +is_var_in_aggref_only(PlannerInfo *root, Var *var) +{ + ListCell *lc; + + /* + * Search the list of aggregate expressions for the Var. + */ + foreach(lc, root->agg_clause_list) + { + AggClauseInfo *ac_info = lfirst_node(AggClauseInfo, lc); + List *vars; + + Assert(IsA(ac_info->aggref, Aggref)); + + if (!bms_is_member(var->varno, ac_info->agg_eval_at)) + continue; + + vars = pull_var_clause((Node *) ac_info->aggref, + PVC_RECURSE_AGGREGATES | + PVC_RECURSE_WINDOWFUNCS | + PVC_RECURSE_PLACEHOLDERS); + + if (list_member(vars, var)) + { + list_free(vars); + break; + } + + list_free(vars); + } + + return (lc != NULL && !list_member(root->tlist_vars, var)); +} + +/* + * is_var_needed_by_join + * Check if the given Var is needed by joins above the current rel. + */ +static bool +is_var_needed_by_join(PlannerInfo *root, Var *var, RelOptInfo *rel) +{ + Relids relids; + int attno; + RelOptInfo *baserel; + + /* + * Note that when checking if the Var is needed by joins above, we want to + * exclude cases where the Var is only needed in the final targetlist. So + * include "relation 0" in the check. + */ + relids = bms_copy(rel->relids); + relids = bms_add_member(relids, 0); + + baserel = find_base_rel(root, var->varno); + attno = var->varattno - baserel->min_attr; + + return bms_nonempty_difference(baserel->attr_needed[attno], relids); +} + +/* + * get_expression_sortgroupref + * Return the sortgroupref of the given "expr" if it is found among the + * original grouping expressions, or is known equal to any of the original + * grouping expressions due to equivalence relationships. Return 0 if no + * match is found. + */ +static Index +get_expression_sortgroupref(PlannerInfo *root, Expr *expr) +{ + ListCell *lc; + + Assert(IsA(expr, Var)); + + foreach(lc, root->group_expr_list) + { + GroupingExprInfo *ge_info = lfirst_node(GroupingExprInfo, lc); + ListCell *lc1; + + Assert(IsA(ge_info->expr, Var)); + Assert(ge_info->sortgroupref > 0); + + if (equal(expr, ge_info->expr)) + return ge_info->sortgroupref; + + if (ge_info->ec == NULL || + !bms_is_member(((Var *) expr)->varno, ge_info->ec->ec_relids)) + continue; + + /* + * Scan the EquivalenceClass, looking for a match to the given + * expression. We ignore child members here. + */ + foreach(lc1, ge_info->ec->ec_members) + { + EquivalenceMember *em = (EquivalenceMember *) lfirst(lc1); + + /* Child members should not exist in ec_members */ + Assert(!em->em_is_child); + + if (equal(expr, em->em_expr)) + return ge_info->sortgroupref; + } + } + + /* no match is found */ + return 0; +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c68c0481f42..93ed2eb368e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 && + rb->memExceededCount <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamTxns, rb->streamCount, rb->streamBytes, + rb->memExceededCount, rb->totalTxns, rb->totalBytes); @@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; + repSlotStat.mem_exceeded_count = rb->memExceededCount; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; @@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; + rb->memExceededCount = 0; rb->totalTxns = 0; rb->totalBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a5e165fb123..a54434151de 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -390,6 +390,7 @@ ReorderBufferAllocate(void) buffer->streamTxns = 0; buffer->streamCount = 0; buffer->streamBytes = 0; + buffer->memExceededCount = 0; buffer->totalTxns = 0; buffer->totalBytes = 0; @@ -3898,14 +3899,26 @@ static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; + bool update_stats = true; - /* - * Bail out if debug_logical_replication_streaming is buffered and we - * haven't exceeded the memory limit. - */ - if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED && - rb->size < logical_decoding_work_mem * (Size) 1024) + if (rb->size >= logical_decoding_work_mem * (Size) 1024) + { + /* + * Update the statistics as the memory usage has reached the limit. We + * report the statistics update later in this function since we can + * update the slot statistics altogether while streaming or + * serializing transactions in most cases. + */ + rb->memExceededCount += 1; + } + else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED) + { + /* + * Bail out if debug_logical_replication_streaming is buffered and we + * haven't exceeded the memory limit. + */ return; + } /* * If debug_logical_replication_streaming is immediate, loop until there's @@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); + + /* + * We've reported the memExceededCount update while streaming or + * serializing the transaction. + */ + update_stats = false; } + if (update_stats) + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * (Size) 1024); } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fe470de63f2..ca62b9cdcf0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -512,12 +512,12 @@ static BlockNumber ExtendBufferedRelShared(BufferManagerRelation bmr, BlockNumber extend_upto, Buffer *buffers, uint32 *extended_by); -static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); +static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, + bool skip_if_not_valid); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); static void BufferSync(int flags); -static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); @@ -533,6 +533,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); +static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, + IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, @@ -685,7 +687,6 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN BufferDesc *bufHdr; BufferTag tag; uint32 buf_state; - bool have_private_ref; Assert(BufferIsValid(recent_buffer)); @@ -713,38 +714,24 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN else { bufHdr = GetBufferDescriptor(recent_buffer - 1); - have_private_ref = GetPrivateRefCount(recent_buffer) > 0; /* - * Do we already have this buffer pinned with a private reference? If - * so, it must be valid and it is safe to check the tag without - * locking. If not, we have to lock the header first and then check. + * Is it still valid and holding the right tag? We do an unlocked tag + * comparison first, to make it unlikely that we'll increment the + * usage counter of the wrong buffer, if someone calls us with a very + * out of date recent_buffer. Then we'll check it again if we get the + * pin. */ - if (have_private_ref) - buf_state = pg_atomic_read_u32(&bufHdr->state); - else - buf_state = LockBufHdr(bufHdr); - - if ((buf_state & BM_VALID) && BufferTagsEqual(&tag, &bufHdr->tag)) + if (BufferTagsEqual(&tag, &bufHdr->tag) && + PinBuffer(bufHdr, NULL, true)) { - /* - * It's now safe to pin the buffer. We can't pin first and ask - * questions later, because it might confuse code paths like - * InvalidateBuffer() if we pinned a random non-matching buffer. - */ - if (have_private_ref) - PinBuffer(bufHdr, NULL); /* bump pin count */ - else - PinBuffer_Locked(bufHdr); /* pin for first time */ - - pgBufferUsage.shared_blks_hit++; - - return true; + if (BufferTagsEqual(&tag, &bufHdr->tag)) + { + pgBufferUsage.shared_blks_hit++; + return true; + } + UnpinBuffer(bufHdr); } - - /* If we locked the header above, now unlock. */ - if (!have_private_ref) - UnlockBufHdr(bufHdr, buf_state); } return false; @@ -1080,7 +1067,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) * already valid.) */ if (!isLocalBuf) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* Set BM_VALID, terminate IO, and wake up any waiters */ if (isLocalBuf) @@ -2036,7 +2023,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ buf = GetBufferDescriptor(existing_buf_id); - valid = PinBuffer(buf, strategy); + valid = PinBuffer(buf, strategy, false); /* Can release the mapping lock as soon as we've pinned it */ LWLockRelease(newPartitionLock); @@ -2098,7 +2085,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, existing_buf_hdr = GetBufferDescriptor(existing_buf_id); - valid = PinBuffer(existing_buf_hdr, strategy); + valid = PinBuffer(existing_buf_hdr, strategy, false); /* Can release the mapping lock as soon as we've pinned it */ LWLockRelease(newPartitionLock); @@ -2338,8 +2325,8 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) bool from_ring; /* - * Ensure, while the spinlock's not yet held, that there's a free refcount - * entry, and a resource owner slot for the pin. + * Ensure, before we pin a victim buffer, that there's a free refcount + * entry and resource owner slot for the pin. */ ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); @@ -2348,17 +2335,12 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) again: /* - * Select a victim buffer. The buffer is returned with its header - * spinlock still held! + * Select a victim buffer. The buffer is returned pinned and owned by + * this backend. */ buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); buf = BufferDescriptorGetBuffer(buf_hdr); - Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); - - /* Pin the buffer and then release the buffer spinlock */ - PinBuffer_Locked(buf_hdr); - /* * We shouldn't have any other pins for this buffer. */ @@ -2736,7 +2718,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, * Pin the existing buffer before releasing the partition lock, * preventing it from being evicted. */ - valid = PinBuffer(existing_hdr, strategy); + valid = PinBuffer(existing_hdr, strategy, false); LWLockRelease(partition_lock); UnpinBuffer(victim_buf_hdr); @@ -2837,7 +2819,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, } if (lock) - LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); TerminateBufferIO(buf_hdr, false, BM_VALID, true, false); } @@ -2850,14 +2832,40 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, } /* - * BufferIsExclusiveLocked + * BufferIsLockedByMe + * + * Checks if this backend has the buffer locked in any mode. + * + * Buffer must be pinned. + */ +bool +BufferIsLockedByMe(Buffer buffer) +{ + BufferDesc *bufHdr; + + Assert(BufferIsPinned(buffer)); + + if (BufferIsLocal(buffer)) + { + /* Content locks are not maintained for local buffers. */ + return true; + } + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + return LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)); + } +} + +/* + * BufferIsLockedByMeInMode * - * Checks if buffer is exclusive-locked. + * Checks if this backend has the buffer locked in the specified mode. * * Buffer must be pinned. */ bool -BufferIsExclusiveLocked(Buffer buffer) +BufferIsLockedByMeInMode(Buffer buffer, int mode) { BufferDesc *bufHdr; @@ -2870,9 +2878,23 @@ BufferIsExclusiveLocked(Buffer buffer) } else { + LWLockMode lw_mode; + + switch (mode) + { + case BUFFER_LOCK_EXCLUSIVE: + lw_mode = LW_EXCLUSIVE; + break; + case BUFFER_LOCK_SHARE: + lw_mode = LW_SHARED; + break; + default: + pg_unreachable(); + } + bufHdr = GetBufferDescriptor(buffer - 1); return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); + lw_mode); } } @@ -2901,8 +2923,7 @@ BufferIsDirty(Buffer buffer) else { bufHdr = GetBufferDescriptor(buffer - 1); - Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE)); + Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE)); } return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY; @@ -2936,8 +2957,7 @@ MarkBufferDirty(Buffer buffer) bufHdr = GetBufferDescriptor(buffer - 1); Assert(BufferIsPinned(buffer)); - Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE)); + Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE)); old_buf_state = pg_atomic_read_u32(&bufHdr->state); for (;;) @@ -3035,10 +3055,13 @@ ReleaseAndReadBuffer(Buffer buffer, * must have been done already. * * Returns true if buffer is BM_VALID, else false. This provision allows - * some callers to avoid an extra spinlock cycle. + * some callers to avoid an extra spinlock cycle. If skip_if_not_valid is + * true, then a false return value also indicates that the buffer was + * (recently) invalid and has not been pinned. */ static bool -PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) +PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, + bool skip_if_not_valid) { Buffer b = BufferDescriptorGetBuffer(buf); bool result; @@ -3054,11 +3077,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) uint32 buf_state; uint32 old_buf_state; - ref = NewPrivateRefCountEntry(b); - old_buf_state = pg_atomic_read_u32(&buf->state); for (;;) { + if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID))) + return false; + if (old_buf_state & BM_LOCKED) old_buf_state = WaitBufHdrUnlocked(buf); @@ -3088,6 +3112,8 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) { result = (buf_state & BM_VALID) != 0; + TrackNewBufferPin(b); + /* * Assume that we acquired a buffer pin for the purposes of * Valgrind buffer client checks (even in !result case) to @@ -3118,11 +3144,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * cannot meddle with that. */ result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; + + Assert(ref->refcount > 0); + ref->refcount++; + ResourceOwnerRememberBuffer(CurrentResourceOwner, b); } - ref->refcount++; - Assert(ref->refcount > 0); - ResourceOwnerRememberBuffer(CurrentResourceOwner, b); return result; } @@ -3151,8 +3178,6 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) static void PinBuffer_Locked(BufferDesc *buf) { - Buffer b; - PrivateRefCountEntry *ref; uint32 buf_state; /* @@ -3177,12 +3202,7 @@ PinBuffer_Locked(BufferDesc *buf) buf_state += BUF_REFCOUNT_ONE; UnlockBufHdr(buf, buf_state); - b = BufferDescriptorGetBuffer(buf); - - ref = NewPrivateRefCountEntry(b); - ref->refcount++; - - ResourceOwnerRememberBuffer(CurrentResourceOwner, b); + TrackNewBufferPin(BufferDescriptorGetBuffer(buf)); } /* @@ -3265,7 +3285,10 @@ UnpinBufferNoOwner(BufferDesc *buf) */ VALGRIND_MAKE_MEM_NOACCESS(BufHdrGetBlock(buf), BLCKSZ); - /* I'd better not still hold the buffer content lock */ + /* + * I'd better not still hold the buffer content lock. Can't use + * BufferIsLockedByMe(), as that asserts the buffer is pinned. + */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); /* @@ -3297,6 +3320,17 @@ UnpinBufferNoOwner(BufferDesc *buf) } } +inline void +TrackNewBufferPin(Buffer buf) +{ + PrivateRefCountEntry *ref; + + ref = NewPrivateRefCountEntry(buf); + ref->refcount++; + + ResourceOwnerRememberBuffer(CurrentResourceOwner, buf); +} + #define ST_SORT sort_checkpoint_bufferids #define ST_ELEMENT_TYPE CkptSortItem #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b) @@ -3327,7 +3361,7 @@ BufferSync(int flags) Oid last_tsid; binaryheap *ts_heap; int i; - int mask = BM_DIRTY; + uint32 mask = BM_DIRTY; WritebackContext wb_context; /* @@ -3935,11 +3969,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * buffer is clean by the time we've locked it.) */ PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); - - FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); tag = bufHdr->tag; @@ -4387,6 +4418,19 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, } /* + * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer + * before/after calling FlushBuffer(). + */ +static void +FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, + IOObject io_object, IOContext io_context) +{ + LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED); + FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + LWLockRelease(BufferDescriptorGetContentLock(buf)); +} + +/* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. * @@ -4975,9 +5019,7 @@ FlushRelationBuffers(Relation rel) (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); - FlushBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL); - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + FlushUnlockedBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL); UnpinBuffer(bufHdr); } else @@ -5072,9 +5114,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); - FlushBuffer(bufHdr, srelent->srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL); - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + FlushUnlockedBuffer(bufHdr, srelent->srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL); UnpinBuffer(bufHdr); } else @@ -5300,9 +5340,7 @@ FlushDatabaseBuffers(Oid dbid) (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); - FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); UnpinBuffer(bufHdr); } else @@ -5326,7 +5364,7 @@ FlushOneBuffer(Buffer buffer) bufHdr = GetBufferDescriptor(buffer - 1); - Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); + Assert(BufferIsLockedByMe(buffer)); FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); } @@ -5417,7 +5455,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) Assert(GetPrivateRefCount(buffer) > 0); /* here, either share or exclusive lock is OK */ - Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); + Assert(BufferIsLockedByMe(buffer)); /* * This routine might get called many times on the same page, if we are @@ -5900,8 +5938,7 @@ IsBufferCleanupOK(Buffer buffer) bufHdr = GetBufferDescriptor(buffer - 1); /* caller must hold exclusive lock on buffer */ - Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE)); + Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE)); buf_state = LockBufHdr(bufHdr); @@ -6250,7 +6287,7 @@ LockBufHdr(BufferDesc *desc) * Obviously the buffer could be locked by the time the value is returned, so * this is primarily useful in CAS style loops. */ -static uint32 +pg_noinline uint32 WaitBufHdrUnlocked(BufferDesc *buf) { SpinDelayStatus delayStatus; @@ -6577,10 +6614,8 @@ EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed) /* If it was dirty, try to clean it once. */ if (buf_state & BM_DIRTY) { - LWLockAcquire(BufferDescriptorGetContentLock(desc), LW_SHARED); - FlushBuffer(desc, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + FlushUnlockedBuffer(desc, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); *buffer_flushed = true; - LWLockRelease(BufferDescriptorGetContentLock(desc)); } /* This will return false if it becomes dirty or someone else pins it. */ diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 7d59a92bd1a..7fe34d3ef4c 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -159,13 +159,16 @@ ClockSweepTick(void) * StrategyGetBuffer * * Called by the bufmgr to get the next candidate buffer to use in - * BufferAlloc(). The only hard requirement BufferAlloc() has is that + * GetVictimBuffer(). The only hard requirement GetVictimBuffer() has is that * the selected buffer must not currently be pinned by anyone. * * strategy is a BufferAccessStrategy object, or NULL for default strategy. * - * To ensure that no one else can pin the buffer before we do, we must - * return the buffer with the buffer header spinlock still held. + * It is the callers responsibility to ensure the buffer ownership can be + * tracked via TrackNewBufferPin(). + * + * The buffer is pinned and marked as owned, using TrackNewBufferPin(), + * before returning. */ BufferDesc * StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring) @@ -173,7 +176,6 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r BufferDesc *buf; int bgwprocno; int trycounter; - uint32 local_buf_state; /* to avoid repeated (de-)referencing */ *from_ring = false; @@ -228,44 +230,79 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r trycounter = NBuffers; for (;;) { + uint32 old_buf_state; + uint32 local_buf_state; + buf = GetBufferDescriptor(ClockSweepTick()); /* - * If the buffer is pinned or has a nonzero usage_count, we cannot use - * it; decrement the usage_count (unless pinned) and keep scanning. + * Check whether the buffer can be used and pin it if so. Do this + * using a CAS loop, to avoid having to lock the buffer header. */ - local_buf_state = LockBufHdr(buf); - - if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0) + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { + local_buf_state = old_buf_state; + + /* + * If the buffer is pinned or has a nonzero usage_count, we cannot + * use it; decrement the usage_count (unless pinned) and keep + * scanning. + */ + + if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0) + { + if (--trycounter == 0) + { + /* + * We've scanned all the buffers without making any state + * changes, so all the buffers are pinned (or were when we + * looked at them). We could hope that someone will free + * one eventually, but it's probably better to fail than + * to risk getting stuck in an infinite loop. + */ + elog(ERROR, "no unpinned buffers available"); + } + break; + } + + if (unlikely(local_buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(buf); + continue; + } + if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0) { local_buf_state -= BUF_USAGECOUNT_ONE; - trycounter = NBuffers; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + local_buf_state)) + { + trycounter = NBuffers; + break; + } } else { - /* Found a usable buffer */ - if (strategy != NULL) - AddBufferToRing(strategy, buf); - *buf_state = local_buf_state; - return buf; + /* pin the buffer if the CAS succeeds */ + local_buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + local_buf_state)) + { + /* Found a usable buffer */ + if (strategy != NULL) + AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; + + TrackNewBufferPin(BufferDescriptorGetBuffer(buf)); + + return buf; + } } + } - else if (--trycounter == 0) - { - /* - * We've scanned all the buffers without making any state changes, - * so all the buffers are pinned (or were when we looked at them). - * We could hope that someone will free one eventually, but it's - * probably better to fail than to risk getting stuck in an - * infinite loop. - */ - UnlockBufHdr(buf, local_buf_state); - elog(ERROR, "no unpinned buffers available"); - } - UnlockBufHdr(buf, local_buf_state); } } @@ -614,13 +651,15 @@ FreeAccessStrategy(BufferAccessStrategy strategy) * GetBufferFromRing -- returns a buffer from the ring, or NULL if the * ring is empty / not usable. * - * The bufhdr spin lock is held on the returned buffer. + * The buffer is pinned and marked as owned, using TrackNewBufferPin(), before + * returning. */ static BufferDesc * GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; Buffer bufnum; + uint32 old_buf_state; uint32 local_buf_state; /* to avoid repeated (de-)referencing */ @@ -637,24 +676,48 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) if (bufnum == InvalidBuffer) return NULL; + buf = GetBufferDescriptor(bufnum - 1); + /* - * If the buffer is pinned we cannot use it under any circumstances. - * - * If usage_count is 0 or 1 then the buffer is fair game (we expect 1, - * since our own previous usage of the ring element would have left it - * there, but it might've been decremented by clock-sweep since then). A - * higher usage_count indicates someone else has touched the buffer, so we - * shouldn't re-use it. + * Check whether the buffer can be used and pin it if so. Do this using a + * CAS loop, to avoid having to lock the buffer header. */ - buf = GetBufferDescriptor(bufnum - 1); - local_buf_state = LockBufHdr(buf); - if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 - && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1) + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - *buf_state = local_buf_state; - return buf; + local_buf_state = old_buf_state; + + /* + * If the buffer is pinned we cannot use it under any circumstances. + * + * If usage_count is 0 or 1 then the buffer is fair game (we expect 1, + * since our own previous usage of the ring element would have left it + * there, but it might've been decremented by clock-sweep since then). + * A higher usage_count indicates someone else has touched the buffer, + * so we shouldn't re-use it. + */ + if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0 + || BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1) + break; + + if (unlikely(local_buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(buf); + continue; + } + + /* pin the buffer if the CAS succeeds */ + local_buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + local_buf_state)) + { + *buf_state = local_buf_state; + + TrackNewBufferPin(BufferDescriptorGetBuffer(buf)); + return buf; + } } - UnlockBufHdr(buf, local_buf_state); /* * Tell caller to allocate a new buffer with the normal allocation diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index d356830f756..7dd75a490aa 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -37,6 +37,7 @@ #include "catalog/pg_type.h" #include "commands/async.h" #include "commands/event_trigger.h" +#include "commands/explain_state.h" #include "commands/prepare.h" #include "common/pg_prng.h" #include "jit/jit.h" @@ -884,7 +885,7 @@ pg_rewrite_query(Query *query) */ PlannedStmt * pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, - ParamListInfo boundParams) + ParamListInfo boundParams, ExplainState *es) { PlannedStmt *plan; @@ -901,7 +902,7 @@ pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, ResetUsage(); /* call the optimizer */ - plan = planner(querytree, query_string, cursorOptions, boundParams); + plan = planner(querytree, query_string, cursorOptions, boundParams, es); if (log_planner_stats) ShowUsage("PLANNER STATISTICS"); @@ -997,7 +998,7 @@ pg_plan_queries(List *querytrees, const char *query_string, int cursorOptions, else { stmt = pg_plan_query(query, query_string, cursorOptions, - boundParams); + boundParams, NULL); } stmt_list = lappend(stmt_list, stmt); diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index 48f57e408e1..7ef06150df7 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -328,6 +328,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE] .pending_size = sizeof(PgStat_FunctionCounts), .flush_pending_cb = pgstat_function_flush_cb, + .reset_timestamp_cb = pgstat_function_reset_timestamp_cb, }, [PGSTAT_KIND_REPLSLOT] = { diff --git a/src/backend/utils/activity/pgstat_function.c b/src/backend/utils/activity/pgstat_function.c index 6214f93d36e..b5db9d15e07 100644 --- a/src/backend/utils/activity/pgstat_function.c +++ b/src/backend/utils/activity/pgstat_function.c @@ -214,6 +214,12 @@ pgstat_function_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) return true; } +void +pgstat_function_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((PgStatShared_Function *) header)->stats.stat_reset_timestamp = ts; +} + /* * find any existing PgStat_FunctionCounts entry for specified function * diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ccfb11c49bf..d210c261ac6 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -94,6 +94,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_txns); REPLSLOT_ACC(stream_count); REPLSLOT_ACC(stream_bytes); + REPLSLOT_ACC(mem_exceeded_count); REPLSLOT_ACC(total_txns); REPLSLOT_ACC(total_bytes); #undef REPLSLOT_ACC diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 7e89a8048d5..1fe33df2756 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -204,6 +204,24 @@ PG_STAT_GET_FUNCENTRY_FLOAT8_MS(total_time) PG_STAT_GET_FUNCENTRY_FLOAT8_MS(self_time) Datum +pg_stat_get_function_stat_reset_time(PG_FUNCTION_ARGS) +{ + Oid funcid = PG_GETARG_OID(0); + TimestampTz result; + PgStat_StatFuncEntry *funcentry; + + if ((funcentry = pgstat_fetch_stat_funcentry(funcid)) == NULL) + result = 0; + else + result = funcentry->stat_reset_timestamp; + + if (result == 0) + PG_RETURN_NULL(); + else + PG_RETURN_TIMESTAMPTZ(result); +} + +Datum pg_stat_get_backend_idset(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; @@ -2103,7 +2121,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 11 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2128,11 +2146,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2155,13 +2175,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[4] = Int64GetDatum(slotent->stream_txns); values[5] = Int64GetDatum(slotent->stream_count); values[6] = Int64GetDatum(slotent->stream_bytes); - values[7] = Int64GetDatum(slotent->total_txns); - values[8] = Int64GetDatum(slotent->total_bytes); + values[7] = Int64GetDatum(slotent->mem_exceeded_count); + values[8] = Int64GetDatum(slotent->total_txns); + values[9] = Int64GetDatum(slotent->total_bytes); if (slotent->stat_reset_timestamp == 0) - nulls[9] = true; + nulls[10] = true; else - values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 6bc6be13d2a..b176d5130e4 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -145,6 +145,13 @@ boot_val => 'false', }, +{ name => 'enable_eager_aggregate', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', + short_desc => 'Enables eager aggregation.', + flags => 'GUC_EXPLAIN', + variable => 'enable_eager_aggregate', + boot_val => 'true', +}, + { name => 'enable_parallel_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', short_desc => 'Enables the planner\'s use of parallel append plans.', flags => 'GUC_EXPLAIN', @@ -2427,6 +2434,15 @@ max => 'DBL_MAX', }, +{ name => 'min_eager_agg_group_size', type => 'real', context => 'PGC_USERSET', group => 'QUERY_TUNING_COST', + short_desc => 'Sets the minimum average group size required to consider applying eager aggregation.', + flags => 'GUC_EXPLAIN', + variable => 'min_eager_agg_group_size', + boot_val => '8.0', + min => '0.0', + max => 'DBL_MAX', +}, + { name => 'cursor_tuple_fraction', type => 'real', context => 'PGC_USERSET', group => 'QUERY_TUNING_OTHER', short_desc => 'Sets the planner\'s estimate of the fraction of a cursor\'s rows that will be retrieved.', flags => 'GUC_EXPLAIN', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c36fcb9ab61..c5d612ab552 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -428,6 +428,7 @@ #enable_group_by_reordering = on #enable_distinct_reordering = on #enable_self_join_elimination = on +#enable_eager_aggregate = on # - Planner Cost Constants - @@ -441,6 +442,7 @@ #min_parallel_table_scan_size = 8MB #min_parallel_index_scan_size = 512kB #effective_cache_size = 4GB +#min_eager_agg_group_size = 8.0 #jit_above_cost = 100000 # perform JIT compilation if available # and query more expensive than this; |