summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-09-18 21:10:08 -0400
committerRobert Haas <rhaas@postgresql.org>2015-09-18 21:56:58 -0400
commit4a4e6893aa080b9094dadbe0e65f8a75fee41ac6 (patch)
treebdce94c526a3f138660c33f4fa39a2589c9175f6 /src/include
parentc00c3249e3247d24751d97ff6f26603810593414 (diff)
Glue layer to connect the executor to the shm_mq mechanism.
The shm_mq mechanism was built to send error (and notice) messages and tuples between backends. However, shm_mq itself only deals in raw bytes. Since commit 2bd9e412f92bc6a68f3e8bcb18e04955cc35001d, we have had infrastructure for one message to redirect protocol messages to a queue and for another backend to parse them and do useful things with them. This commit introduces a somewhat analogous facility for tuples by adding a new type of DestReceiver, DestTupleQueue, which writes each tuple generated by a query into a shm_mq, and a new TupleQueueFunnel facility which reads raw tuples out of the queue and reconstructs the HeapTuple format expected by the executor. The TupleQueueFunnel abstraction supports reading from multiple tuple streams at the same time, but only in round-robin fashion. Someone could imaginably want other policies, but this should be good enough to meet our short-term needs related to parallel query, and we can always extend it later. This also makes one minor addition to the shm_mq API that didn' seem worth breaking out as a separate patch. Extracted from Amit Kapila's parallel sequential scan patch. This code was originally written by me, and then it was revised by Amit, and then it was revised some more by me.
Diffstat (limited to 'src/include')
-rw-r--r--src/include/executor/tqueue.h31
-rw-r--r--src/include/storage/shm_mq.h3
-rw-r--r--src/include/tcop/dest.h3
3 files changed, 36 insertions, 1 deletions
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
new file mode 100644
index 00000000000..6f8eb73c9ae
--- /dev/null
+++ b/src/include/executor/tqueue.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.h
+ * Use shm_mq to send & receive tuples between parallel backends
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/tqueue.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef TQUEUE_H
+#define TQUEUE_H
+
+#include "storage/shm_mq.h"
+#include "tcop/dest.h"
+
+/* Use this to send tuples to a shm_mq. */
+extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
+
+/* Use these to receive tuples from a shm_mq. */
+typedef struct TupleQueueFunnel TupleQueueFunnel;
+extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
+extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
+extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
+ bool *done);
+
+#endif /* TQUEUE_H */
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 1a2ba040cb4..7621a358ab4 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
/* Break connection. */
extern void shm_mq_detach(shm_mq *);
+/* Get the shm_mq from handle. */
+extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
+
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
Size nbytes, const void *data, bool nowait);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 5bcca3fbcaa..b560672fd40 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -94,7 +94,8 @@ typedef enum
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
DestSQLFunction, /* results sent to SQL-language func mgr */
- DestTransientRel /* results sent to transient relation */
+ DestTransientRel, /* results sent to transient relation */
+ DestTupleQueue /* results sent to tuple queue */
} CommandDest;
/* ----------------