summaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc/shm_mq.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-08-31 15:10:24 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-08-31 15:10:24 -0400
commit6708e447efb5046c95bdcf900b6da97f56f97ae8 (patch)
treee3d331eec9f26181d2a0983f303bd8eb49e9fea8 /src/backend/storage/ipc/shm_mq.c
parent4b1dd62a257a469f92fef4f4cce37beab6c0b98b (diff)
Clean up shm_mq cleanup.
The logic around shm_mq_detach was a few bricks shy of a load, because (contrary to the comments for shm_mq_attach) all it did was update the shared shm_mq state. That left us leaking a bit of process-local memory, but much worse, the on_dsm_detach callback for shm_mq_detach was still armed. That means that whenever we ultimately detach from the DSM segment, we'd run shm_mq_detach again for already-detached, possibly long-dead queues. This accidentally fails to fail today, because we only ever re-use a shm_mq's memory for another shm_mq, and multiple detach attempts on the last such shm_mq are fairly harmless. But it's gonna bite us someday, so let's clean it up. To do that, change shm_mq_detach's API so it takes a shm_mq_handle not the underlying shm_mq. This makes the callers simpler in most cases anyway. Also fix a few places in parallel.c that were just pfree'ing the handle structs rather than doing proper cleanup. Back-patch to v10 because of the risk that the revenant shm_mq_detach callbacks would cause a live bug sometime. Since this is an API change, it's too late to do it in 9.6. (We could make a variant patch that preserves API, but I'm not excited enough to do that.) Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
Diffstat (limited to 'src/backend/storage/ipc/shm_mq.c')
-rw-r--r--src/backend/storage/ipc/shm_mq.c43
1 files changed, 36 insertions, 7 deletions
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f45a67cc278..770559a03e3 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -83,7 +83,9 @@ struct shm_mq
* This structure is a backend-private handle for access to a queue.
*
* mqh_queue is a pointer to the queue we've attached, and mqh_segment is
- * a pointer to the dynamic shared memory segment that contains it.
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
*
* If this queue is intended to connect the current process with a background
* worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
MemoryContext mqh_context;
};
+static void shm_mq_detach_internal(shm_mq *mq);
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
mqh->mqh_queue = mq;
mqh->mqh_segment = seg;
- mqh->mqh_buffer = NULL;
mqh->mqh_handle = handle;
+ mqh->mqh_buffer = NULL;
mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0;
- mqh->mqh_context = CurrentMemoryContext;
mqh->mqh_partial_bytes = 0;
+ mqh->mqh_expected_bytes = 0;
mqh->mqh_length_word_complete = false;
mqh->mqh_counterparty_attached = false;
+ mqh->mqh_context = CurrentMemoryContext;
if (seg != NULL)
on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
@@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
}
/*
- * Detach a shared message queue.
+ * Detach from a shared message queue, and destroy the shm_mq_handle.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+ /* Notify counterparty that we're outta here. */
+ shm_mq_detach_internal(mqh->mqh_queue);
+
+ /* Cancel on_dsm_detach callback, if any. */
+ if (mqh->mqh_segment)
+ cancel_on_dsm_detach(mqh->mqh_segment,
+ shm_mq_detach_callback,
+ PointerGetDatum(mqh->mqh_queue));
+
+ /* Release local memory associated with handle. */
+ if (mqh->mqh_buffer != NULL)
+ pfree(mqh->mqh_buffer);
+ pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
*
* The purpose of this function is to make sure that the process
* with which we're communicating doesn't block forever waiting for us to
@@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
* detaches, the receiver can read any messages remaining in the queue;
* further reads will return SHM_MQ_DETACHED. If the receiver detaches,
* further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much. We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
*/
-void
-shm_mq_detach(shm_mq *mq)
+static void
+shm_mq_detach_internal(shm_mq *mq)
{
volatile shm_mq *vmq = mq;
PGPROC *victim;
@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
{
shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
- shm_mq_detach(mq);
+ shm_mq_detach_internal(mq);
}