summaryrefslogtreecommitdiff
path: root/src/backend/libpq/pqmq.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
committerRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
commit924bcf4f16d54c55310b28f77686608684734f42 (patch)
tree79f35bf679c6a68dbe725cc90248a553f2b9e019 /src/backend/libpq/pqmq.c
parent669c7d20e6374850593cb430d332e11a3992bbcf (diff)
Create an infrastructure for parallel computation in PostgreSQL.
This does four basic things. First, it provides convenience routines to coordinate the startup and shutdown of parallel workers. Second, it synchronizes various pieces of state (e.g. GUCs, combo CID mappings, transaction snapshot) from the parallel group leader to the worker processes. Third, it prohibits various operations that would result in unsafe changes to that state while parallelism is active. Finally, it propagates events that would result in an ErrorResponse, NoticeResponse, or NotifyResponse message being sent to the client from the parallel workers back to the master, from which they can then be sent on to the client. Robert Haas, Amit Kapila, Noah Misch, Rushabh Lathia, Jeevan Chalke. Suggestions and review from Andres Freund, Heikki Linnakangas, Noah Misch, Simon Riggs, Euler Taveira, and Jim Nasby.
Diffstat (limited to 'src/backend/libpq/pqmq.c')
-rw-r--r--src/backend/libpq/pqmq.c33
1 files changed, 32 insertions, 1 deletions
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 307fb60665c..f12f2d582e8 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -16,12 +16,15 @@
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
+#include "miscadmin.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
static shm_mq *pq_mq;
static shm_mq_handle *pq_mq_handle;
static bool pq_mq_busy = false;
+static pid_t pq_mq_parallel_master_pid = 0;
+static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
static void mq_comm_reset(void);
static int mq_flush(void);
@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
FrontendProtocol = PG_PROTOCOL_LATEST;
}
+/*
+ * Arrange to SendProcSignal() to the parallel master each time we transmit
+ * message data via the shm_mq.
+ */
+void
+pq_set_parallel_master(pid_t pid, BackendId backend_id)
+{
+ Assert(PqCommMethods == &PqCommMqMethods);
+ pq_mq_parallel_master_pid = pid;
+ pq_mq_parallel_master_backend_id = backend_id;
+}
+
static void
mq_comm_reset(void)
{
@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
iov[1].len = len;
Assert(pq_mq_handle != NULL);
- result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+ for (;;)
+ {
+ result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+
+ if (pq_mq_parallel_master_pid != 0)
+ SendProcSignal(pq_mq_parallel_master_pid,
+ PROCSIG_PARALLEL_MESSAGE,
+ pq_mq_parallel_master_backend_id);
+
+ if (result != SHM_MQ_WOULD_BLOCK)
+ break;
+
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(&MyProc->procLatch);
+ }
pq_mq_busy = false;