diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-02-03 12:46:18 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-02-03 12:49:46 -0500 |
commit | 69d34408e5e7adcef8ef2f4e9c4f2919637e9a06 (patch) | |
tree | bce5efdc1891f9a86505228090b8838fba710833 /src | |
parent | 25e44518c16461d66fb6cec2063035d591db1def (diff) |
Allow parallel custom and foreign scans.
This patch doesn't put the new infrastructure to use anywhere, and
indeed it's not clear how it could ever be used for something like
postgres_fdw which has to send an SQL query and wait for a reply,
but there might be FDWs or custom scan providers that are CPU-bound,
so let's give them a way to join club parallel.
KaiGai Kohei, reviewed by me.
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/execParallel.c | 26 | ||||
-rw-r--r-- | src/backend/executor/nodeCustom.c | 45 | ||||
-rw-r--r-- | src/backend/executor/nodeForeignscan.c | 62 | ||||
-rw-r--r-- | src/include/executor/nodeCustom.h | 11 | ||||
-rw-r--r-- | src/include/executor/nodeForeignscan.h | 8 | ||||
-rw-r--r-- | src/include/foreign/fdwapi.h | 14 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 14 |
7 files changed, 179 insertions, 1 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 29e450a571c..95e8e41d2bb 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,8 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeCustom.h" +#include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecSeqScanEstimate((SeqScanState *) planstate, e->pcxt); break; + case T_ForeignScanState: + ExecForeignScanEstimate((ForeignScanState *) planstate, + e->pcxt); + break; + case T_CustomScanState: + ExecCustomScanEstimate((CustomScanState *) planstate, + e->pcxt); + break; default: break; } @@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); break; + case T_ForeignScanState: + ExecForeignScanInitializeDSM((ForeignScanState *) planstate, + d->pcxt); + break; + case T_CustomScanState: + ExecCustomScanInitializeDSM((CustomScanState *) planstate, + d->pcxt); + break; default: break; } @@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); break; + case T_ForeignScanState: + ExecForeignScanInitializeWorker((ForeignScanState *) planstate, + toc); + break; + case T_CustomScanState: + ExecCustomScanInitializeWorker((CustomScanState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 640289e2773..322abca282a 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/nodeCustom.h" #include "nodes/execnodes.h" @@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node) node->methods->CustomName))); node->methods->RestrPosCustomScan(node); } + +void +ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomScan) + { + node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + methods->InitializeWorkerCustomScan(node, toc, coordinate); + } +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 64a07bcc771..388c9227498 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node) ExecScanReScan(&node->ss); } + +/* ---------------------------------------------------------------- + * ExecForeignScanEstimate + * + * Informs size of the parallel coordination information, if any + * ---------------------------------------------------------------- + */ +void +ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->EstimateDSMForeignScan) + { + node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialize the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeDSMForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialization according to the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeWorkerForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); + } +} diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index e244942d79a..410a3ad14db 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -12,6 +12,7 @@ #ifndef NODECUSTOM_H #define NODECUSTOM_H +#include "access/parallel.h" #include "nodes/execnodes.h" /* @@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node); extern void ExecCustomMarkPos(CustomScanState *node); extern void ExecCustomRestrPos(CustomScanState *node); +/* + * Parallel execution support + */ +extern void ExecCustomScanEstimate(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeDSM(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeWorker(CustomScanState *node, + shm_toc *toc); + #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index a92ce5c22a3..c2553295fab 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -14,6 +14,7 @@ #ifndef NODEFOREIGNSCAN_H #define NODEFOREIGNSCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags); @@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node); extern void ExecEndForeignScan(ForeignScanState *node); extern void ExecReScanForeignScan(ForeignScanState *node); +extern void ExecForeignScanEstimate(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeDSM(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeWorker(ForeignScanState *node, + shm_toc *toc); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index db73233f65b..e16fbf34ec8 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/relation.h" @@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt, Oid serverOid); +typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt); +typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate); +typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, + shm_toc *toc, + void *coordinate); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -177,6 +186,11 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + + /* Support functions for parallelism under Gather node */ + EstimateDSMForeignScan_function EstimateDSMForeignScan; + InitializeDSMForeignScan_function InitializeDSMForeignScan; + InitializeWorkerForeignScan_function InitializeWorkerForeignScan; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 07cd20ac504..064a0509c4d 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1585,6 +1585,7 @@ typedef struct ForeignScanState { ScanState ss; /* its first field is NodeTag */ List *fdw_recheck_quals; /* original quals not in ss.ps.qual */ + Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; void *fdw_state; /* foreign-data wrapper can keep state here */ @@ -1603,6 +1604,8 @@ typedef struct ForeignScanState * the BeginCustomScan method. * ---------------- */ +struct ParallelContext; /* avoid including parallel.h here */ +struct shm_toc; /* avoid including shm_toc.h here */ struct ExplainState; /* avoid including explain.h here */ struct CustomScanState; @@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods void (*ReScanCustomScan) (struct CustomScanState *node); void (*MarkPosCustomScan) (struct CustomScanState *node); void (*RestrPosCustomScan) (struct CustomScanState *node); - + /* Optional: parallel execution support */ + Size (*EstimateDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt); + void (*InitializeDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt, + void *coordinate); + void (*InitializeWorkerCustomScan) (struct CustomScanState *node, + struct shm_toc *toc, + void *coordinate); /* Optional: print additional information in EXPLAIN */ void (*ExplainCustomScan) (struct CustomScanState *node, List *ancestors, @@ -1631,6 +1642,7 @@ typedef struct CustomScanState ScanState ss; uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */ List *custom_ps; /* list of child PlanState nodes, if any */ + Size pscan_len; /* size of parallel coordination information */ const CustomExecMethods *methods; } CustomScanState; |