summaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeAppend.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeAppend.c')
-rw-r--r--src/backend/executor/nodeAppend.c461
1 files changed, 448 insertions, 13 deletions
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 15e4115bd6d..7da8ffe0652 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
+ appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -191,6 +204,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
@@ -198,6 +213,17 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
/*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
+ /*
* Record the lowest appendplans index which is a valid partial plan.
*/
if (i >= node->first_partial_plan && j < firstvalid)
@@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_asyncrequests = NULL;
+ appendstate->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ appendstate->as_needrequest = NULL;
+ appendstate->as_eventset = NULL;
+
+ if (nasyncplans > 0)
+ {
+ appendstate->as_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) appendstate;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ appendstate->as_asyncrequests[i] = areq;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
@@ -232,31 +289,59 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ /*
+ * If this is the first call after Init or ReScan, we need to do the
+ * initialization work.
+ */
+ if (!node->as_begun)
{
+ Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
+ Assert(!node->as_syncdone);
+
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin executing them. */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+ Assert(node->as_syncdone ||
+ (node->as_whichplan >= 0 &&
+ node->as_whichplan < node->as_nplans));
+
+ /* And we're initialized. */
+ node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from an async subplan if any
+ */
+ if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate)
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /*
+ * wait or poll async events if any. We do this before checking for
+ * the end of iteration, because it might drain the remaining async
+ * subplans.
+ */
+ if (node->as_nasyncremain > 0)
+ ExecAppendAsyncEventWait(node);
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
@@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node)
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
+ if (nasyncplans > 0)
+ {
+ bms_free(node->as_valid_asyncplans);
+ node->as_valid_asyncplans = NULL;
+ }
}
for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
+ node->as_begun = false;
}
/* ----------------------------------------------------------------
@@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node)
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
+ /* Nothing to do if syncdone */
+ if (node->as_syncdone)
+ return false;
+
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_valid_subplans == NULL)
+ if (node->as_nasyncplans > 0)
+ {
+ /* We'd have filled as_valid_subplans already */
+ Assert(node->as_valid_subplans);
+ }
+ else if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
@@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ /* Set as_syncdone if in async mode */
+ if (node->as_nasyncplans > 0)
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
@@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * Asynchronous Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware Appends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ /* If we've yet to determine the valid subplans then do so now. */
+ if (node->as_valid_subplans == NULL)
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ classify_matching_subplans(node);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there's any sync subplan that isn't complete. */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * given node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it. */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's as_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ Assert(node->as_valid_asyncplans == NULL);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as_valid_subplans))
+ {
+ node->as_syncdone = true;
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ {
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+ node->as_syncdone = bms_is_empty(node->as_valid_subplans);
+
+ /* Save valid async subplans. */
+ node->as_valid_asyncplans = valid_asyncplans;
+ node->as_nasyncremain = bms_num_members(valid_asyncplans);
+}