summaryrefslogtreecommitdiff
path: root/src/backend/libpq/pqmq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/libpq/pqmq.c')
-rw-r--r--src/backend/libpq/pqmq.c33
1 files changed, 32 insertions, 1 deletions
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 307fb60665c..f12f2d582e8 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -16,12 +16,15 @@
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
+#include "miscadmin.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
static shm_mq *pq_mq;
static shm_mq_handle *pq_mq_handle;
static bool pq_mq_busy = false;
+static pid_t pq_mq_parallel_master_pid = 0;
+static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
static void mq_comm_reset(void);
static int mq_flush(void);
@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
FrontendProtocol = PG_PROTOCOL_LATEST;
}
+/*
+ * Arrange to SendProcSignal() to the parallel master each time we transmit
+ * message data via the shm_mq.
+ */
+void
+pq_set_parallel_master(pid_t pid, BackendId backend_id)
+{
+ Assert(PqCommMethods == &PqCommMqMethods);
+ pq_mq_parallel_master_pid = pid;
+ pq_mq_parallel_master_backend_id = backend_id;
+}
+
static void
mq_comm_reset(void)
{
@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
iov[1].len = len;
Assert(pq_mq_handle != NULL);
- result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+ for (;;)
+ {
+ result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+
+ if (pq_mq_parallel_master_pid != 0)
+ SendProcSignal(pq_mq_parallel_master_pid,
+ PROCSIG_PARALLEL_MESSAGE,
+ pq_mq_parallel_master_backend_id);
+
+ if (result != SHM_MQ_WOULD_BLOCK)
+ break;
+
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(&MyProc->procLatch);
+ }
pq_mq_busy = false;