diff options
Diffstat (limited to 'src/backend/executor/nodeAppend.c')
-rw-r--r-- | src/backend/executor/nodeAppend.c | 461 |
1 files changed, 448 insertions, 13 deletions
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 15e4115bd6d..7da8ffe0652 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,10 +57,13 @@ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -78,12 +81,18 @@ struct ParallelAppendState }; #define INVALID_SUBPLAN_INDEX -1 +#define EVENT_BUFFER_SIZE 16 static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); static void mark_invalid_subplans_as_finished(AppendState *node); +static void ExecAppendAsyncBegin(AppendState *node); +static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); +static void ExecAppendAsyncEventWait(AppendState *node); +static void classify_matching_subplans(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags) AppendState *appendstate = makeNode(AppendState); PlanState **appendplanstates; Bitmapset *validsubplans; + Bitmapset *asyncplans; int nplans; + int nasyncplans; int firstvalid; int i, j; @@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_syncdone = false; + appendstate->as_begun = false; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -191,6 +204,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * While at it, find out the first valid partial plan. */ j = 0; + asyncplans = NULL; + nasyncplans = 0; firstvalid = nplans; i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) @@ -198,6 +213,17 @@ ExecInitAppend(Append *node, EState *estate, int eflags) Plan *initNode = (Plan *) list_nth(node->appendplans, i); /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + + /* * Record the lowest appendplans index which is a valid partial plan. */ if (i >= node->first_partial_plan && j < firstvalid) @@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* Initialize async state */ + appendstate->as_asyncplans = asyncplans; + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_asyncrequests = NULL; + appendstate->as_asyncresults = (TupleTableSlot **) + palloc0(nasyncplans * sizeof(TupleTableSlot *)); + appendstate->as_needrequest = NULL; + appendstate->as_eventset = NULL; + + if (nasyncplans > 0) + { + appendstate->as_asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) appendstate; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + appendstate->as_asyncrequests[i] = areq; + } + } + /* * Miscellaneous initialization */ @@ -232,31 +289,59 @@ static TupleTableSlot * ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + TupleTableSlot *result; - if (node->as_whichplan < 0) + /* + * If this is the first call after Init or ReScan, we need to do the + * initialization work. + */ + if (!node->as_begun) { + Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX); + Assert(!node->as_syncdone); + /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + /* If there are any async subplans, begin executing them. */ + if (node->as_nasyncplans > 0) + ExecAppendAsyncBegin(node); + /* - * If no subplan has been chosen, we must choose one before + * If no sync subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && - !node->choose_next_subplan(node)) + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + + Assert(node->as_syncdone || + (node->as_whichplan >= 0 && + node->as_whichplan < node->as_nplans)); + + /* And we're initialized. */ + node->as_begun = true; } for (;;) { PlanState *subnode; - TupleTableSlot *result; CHECK_FOR_INTERRUPTS(); /* - * figure out which subplan we are currently processing + * try to get a tuple from an async subplan if any + */ + if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) + { + if (ExecAppendAsyncGetNext(node, &result)) + return result; + Assert(!node->as_syncdone); + Assert(bms_is_empty(node->as_needrequest)); + } + + /* + * figure out which sync subplan we are currently processing */ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; @@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate) return result; } - /* choose new subplan; if none, we're done */ - if (!node->choose_next_subplan(node)) + /* + * wait or poll async events if any. We do this before checking for + * the end of iteration, because it might drain the remaining async + * subplans. + */ + if (node->as_nasyncremain > 0) + ExecAppendAsyncEventWait(node); + + /* choose new sync subplan; if no sync/async subplans, we're done */ + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } } @@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node) void ExecReScanAppend(AppendState *node) { + int nasyncplans = node->as_nasyncplans; int i; /* @@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node) { bms_free(node->as_valid_subplans); node->as_valid_subplans = NULL; + if (nasyncplans > 0) + { + bms_free(node->as_valid_asyncplans); + node->as_valid_asyncplans = NULL; + } } for (i = 0; i < node->as_nplans; i++) @@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node) ExecReScan(subnode); } + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->as_needrequest); + node->as_needrequest = NULL; + } + /* Let choose_next_subplan_* function handle setting the first subplan */ node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_syncdone = false; + node->as_begun = false; } /* ---------------------------------------------------------------- @@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) /* ---------------------------------------------------------------- * choose_next_subplan_locally * - * Choose next subplan for a non-parallel-aware Append, + * Choose next sync subplan for a non-parallel-aware Append, * returning false if there are no more. * ---------------------------------------------------------------- */ @@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node) /* We should never be called when there are no subplans */ Assert(node->as_nplans > 0); + /* Nothing to do if syncdone */ + if (node->as_syncdone) + return false; + /* * If first call then have the bms member function choose the first valid - * subplan by initializing whichplan to -1. If there happen to be no - * valid subplans then the bms member function will handle that by - * returning a negative number which will allow us to exit returning a + * sync subplan by initializing whichplan to -1. If there happen to be + * no valid sync subplans then the bms member function will handle that + * by returning a negative number which will allow us to exit returning a * false value. */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_valid_subplans == NULL) + if (node->as_nasyncplans > 0) + { + /* We'd have filled as_valid_subplans already */ + Assert(node->as_valid_subplans); + } + else if (node->as_valid_subplans == NULL) node->as_valid_subplans = ExecFindMatchingSubPlans(node->as_prune_state); @@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node) nextplan = bms_prev_member(node->as_valid_subplans, whichplan); if (nextplan < 0) + { + /* Set as_syncdone if in async mode */ + if (node->as_nasyncplans > 0) + node->as_syncdone = true; return false; + } node->as_whichplan = nextplan; @@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node) node->as_pstate->pa_finished[i] = true; } } + +/* ---------------------------------------------------------------- + * Asynchronous Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendAsyncBegin + * + * Begin executing designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncBegin(AppendState *node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no async subplans. */ + Assert(node->as_nasyncplans > 0); + + /* If we've yet to determine the valid subplans then do so now. */ + if (node->as_valid_subplans == NULL) + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + classify_matching_subplans(node); + + /* Nothing to do if there are no valid async subplans. */ + if (node->as_nasyncremain == 0) + return; + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncGetNext + * + * Get the next tuple from any of the asynchronous subplans. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) +{ + *result = NULL; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + while (node->as_nasyncremain > 0) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll async events. */ + ExecAppendAsyncEventWait(node); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + /* Break from loop if there's any sync subplan that isn't complete. */ + if (!node->as_syncdone) + break; + } + + /* + * If all sync subplans are complete, we're totally done scanning the + * given node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the sync subplans. + */ + if (node->as_syncdone) + { + Assert(node->as_nasyncremain == 0); + *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncRequest + * + * Request a tuple asynchronously. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) +{ + Bitmapset *needrequest; + int i; + + /* Nothing to do if there are no async subplans needing a new request. */ + if (bms_is_empty(node->as_needrequest)) + return false; + + /* + * If there are any asynchronously-generated results that have not yet + * been returned, we have nothing to do; just return one of them. + */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + /* Make a new request for each of the async subplans that need it. */ + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + i = -1; + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } + bms_free(needrequest); + + /* Return one of the asynchronously-generated results if any. */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncEventWait + * + * Wait or poll for file descriptor events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncEventWait(AppendState *node) +{ + long timeout = node->as_syncdone ? -1 : 0; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, + node->as_nasyncplans + 1); + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + Assert(areq->callback_pending); + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } +} + +/* ---------------------------------------------------------------- + * ExecAsyncAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncAppendResponse(AsyncRequest *areq) +{ + AppendState *node = (AppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + --node->as_nasyncremain; + return; + } + + /* Save result so we can return it. */ + Assert(node->as_nasyncresults < node->as_nasyncplans); + node->as_asyncresults[node->as_nasyncresults++] = slot; + + /* + * Mark the subplan that returned a result as ready for a new request. We + * don't launch another one here immediately because it might complete. + */ + node->as_needrequest = bms_add_member(node->as_needrequest, + areq->request_index); +} + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's as_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's as_valid_asyncplans. + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(AppendState *node) +{ + Bitmapset *valid_asyncplans; + + Assert(node->as_valid_asyncplans == NULL); + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->as_valid_subplans)) + { + node->as_syncdone = true; + node->as_nasyncremain = 0; + return; + } + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) + { + node->as_nasyncremain = 0; + return; + } + + /* Get valid async subplans. */ + valid_asyncplans = bms_copy(node->as_asyncplans); + valid_asyncplans = bms_int_members(valid_asyncplans, + node->as_valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + node->as_valid_subplans = bms_del_members(node->as_valid_subplans, + valid_asyncplans); + node->as_syncdone = bms_is_empty(node->as_valid_subplans); + + /* Save valid async subplans. */ + node->as_valid_asyncplans = valid_asyncplans; + node->as_nasyncremain = bms_num_members(valid_asyncplans); +} |