Commit 6708e447 authored by Tom Lane's avatar Tom Lane

Clean up shm_mq cleanup.

The logic around shm_mq_detach was a few bricks shy of a load, because
(contrary to the comments for shm_mq_attach) all it did was update the
shared shm_mq state.  That left us leaking a bit of process-local
memory, but much worse, the on_dsm_detach callback for shm_mq_detach
was still armed.  That means that whenever we ultimately detach from
the DSM segment, we'd run shm_mq_detach again for already-detached,
possibly long-dead queues.  This accidentally fails to fail today,
because we only ever re-use a shm_mq's memory for another shm_mq, and
multiple detach attempts on the last such shm_mq are fairly harmless.
But it's gonna bite us someday, so let's clean it up.

To do that, change shm_mq_detach's API so it takes a shm_mq_handle
not the underlying shm_mq.  This makes the callers simpler in most
cases anyway.  Also fix a few places in parallel.c that were just
pfree'ing the handle structs rather than doing proper cleanup.

Back-patch to v10 because of the risk that the revenant shm_mq_detach
callbacks would cause a live bug sometime.  Since this is an API
change, it's too late to do it in 9.6.  (We could make a variant
patch that preserves API, but I'm not excited enough to do that.)

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
parent 4b1dd62a
...@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/ */
any_registrations_failed = true; any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].bgwhandle = NULL;
pfree(pcxt->worker[i].error_mqh); shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
} }
} }
...@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt) ...@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
{ {
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh); shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
} }
} }
...@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) ...@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */ case 'X': /* Terminate, indicating clean exit */
{ {
pfree(pcxt->worker[i].error_mqh); shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
break; break;
} }
......
...@@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self) ...@@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self)
{ {
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
shm_mq_detach(shm_mq_get_queue(tqueue->queue)); if (tqueue->queue != NULL)
shm_mq_detach(tqueue->queue);
tqueue->queue = NULL;
} }
/* /*
...@@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self) ...@@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self)
{ {
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
/* We probably already detached from queue, but let's be sure */
if (tqueue->queue != NULL)
shm_mq_detach(tqueue->queue);
if (tqueue->tmpcontext != NULL) if (tqueue->tmpcontext != NULL)
MemoryContextDelete(tqueue->tmpcontext); MemoryContextDelete(tqueue->tmpcontext);
if (tqueue->recordhtab != NULL) if (tqueue->recordhtab != NULL)
...@@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) ...@@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
void void
DestroyTupleQueueReader(TupleQueueReader *reader) DestroyTupleQueueReader(TupleQueueReader *reader)
{ {
shm_mq_detach(shm_mq_get_queue(reader->queue)); shm_mq_detach(reader->queue);
if (reader->typmodmap != NULL) if (reader->typmodmap != NULL)
hash_destroy(reader->typmodmap); hash_destroy(reader->typmodmap);
/* Is it worth trying to free substructure of the remap tree? */ /* Is it worth trying to free substructure of the remap tree? */
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/builtins.h" #include "utils/builtins.h"
static shm_mq *pq_mq;
static shm_mq_handle *pq_mq_handle; static shm_mq_handle *pq_mq_handle;
static bool pq_mq_busy = false; static bool pq_mq_busy = false;
static pid_t pq_mq_parallel_master_pid = 0; static pid_t pq_mq_parallel_master_pid = 0;
...@@ -56,7 +55,6 @@ void ...@@ -56,7 +55,6 @@ void
pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
{ {
PqCommMethods = &PqCommMqMethods; PqCommMethods = &PqCommMqMethods;
pq_mq = shm_mq_get_queue(mqh);
pq_mq_handle = mqh; pq_mq_handle = mqh;
whereToSendOutput = DestRemote; whereToSendOutput = DestRemote;
FrontendProtocol = PG_PROTOCOL_LATEST; FrontendProtocol = PG_PROTOCOL_LATEST;
...@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) ...@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
static void static void
pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg) pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
{ {
pq_mq = NULL;
pq_mq_handle = NULL; pq_mq_handle = NULL;
whereToSendOutput = DestNone; whereToSendOutput = DestNone;
} }
...@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len) ...@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
*/ */
if (pq_mq_busy) if (pq_mq_busy)
{ {
if (pq_mq != NULL) if (pq_mq_handle != NULL)
shm_mq_detach(pq_mq); shm_mq_detach(pq_mq_handle);
pq_mq = NULL;
pq_mq_handle = NULL; pq_mq_handle = NULL;
return EOF; return EOF;
} }
...@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len) ...@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
* be generated late in the shutdown sequence, after all DSMs have already * be generated late in the shutdown sequence, after all DSMs have already
* been detached. * been detached.
*/ */
if (pq_mq == NULL) if (pq_mq_handle == NULL)
return 0; return 0;
pq_mq_busy = true; pq_mq_busy = true;
......
...@@ -83,7 +83,9 @@ struct shm_mq ...@@ -83,7 +83,9 @@ struct shm_mq
* This structure is a backend-private handle for access to a queue. * This structure is a backend-private handle for access to a queue.
* *
* mqh_queue is a pointer to the queue we've attached, and mqh_segment is * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
* a pointer to the dynamic shared memory segment that contains it. * an optional pointer to the dynamic shared memory segment that contains it.
* (If mqh_segment is provided, we register an on_dsm_detach callback to
* make sure we detach from the queue before detaching from DSM.)
* *
* If this queue is intended to connect the current process with a background * If this queue is intended to connect the current process with a background
* worker that started it, the user can pass a pointer to the worker handle * worker that started it, the user can pass a pointer to the worker handle
...@@ -139,6 +141,7 @@ struct shm_mq_handle ...@@ -139,6 +141,7 @@ struct shm_mq_handle
MemoryContext mqh_context; MemoryContext mqh_context;
}; };
static void shm_mq_detach_internal(shm_mq *mq);
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
const void *data, bool nowait, Size *bytes_written); const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
...@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) ...@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
mqh->mqh_queue = mq; mqh->mqh_queue = mq;
mqh->mqh_segment = seg; mqh->mqh_segment = seg;
mqh->mqh_buffer = NULL;
mqh->mqh_handle = handle; mqh->mqh_handle = handle;
mqh->mqh_buffer = NULL;
mqh->mqh_buflen = 0; mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0; mqh->mqh_consume_pending = 0;
mqh->mqh_context = CurrentMemoryContext;
mqh->mqh_partial_bytes = 0; mqh->mqh_partial_bytes = 0;
mqh->mqh_expected_bytes = 0;
mqh->mqh_length_word_complete = false; mqh->mqh_length_word_complete = false;
mqh->mqh_counterparty_attached = false; mqh->mqh_counterparty_attached = false;
mqh->mqh_context = CurrentMemoryContext;
if (seg != NULL) if (seg != NULL)
on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
...@@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) ...@@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
} }
/* /*
* Detach a shared message queue. * Detach from a shared message queue, and destroy the shm_mq_handle.
*/
void
shm_mq_detach(shm_mq_handle *mqh)
{
/* Notify counterparty that we're outta here. */
shm_mq_detach_internal(mqh->mqh_queue);
/* Cancel on_dsm_detach callback, if any. */
if (mqh->mqh_segment)
cancel_on_dsm_detach(mqh->mqh_segment,
shm_mq_detach_callback,
PointerGetDatum(mqh->mqh_queue));
/* Release local memory associated with handle. */
if (mqh->mqh_buffer != NULL)
pfree(mqh->mqh_buffer);
pfree(mqh);
}
/*
* Notify counterparty that we're detaching from shared message queue.
* *
* The purpose of this function is to make sure that the process * The purpose of this function is to make sure that the process
* with which we're communicating doesn't block forever waiting for us to * with which we're communicating doesn't block forever waiting for us to
...@@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) ...@@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
* detaches, the receiver can read any messages remaining in the queue; * detaches, the receiver can read any messages remaining in the queue;
* further reads will return SHM_MQ_DETACHED. If the receiver detaches, * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
* further attempts to send messages will likewise return SHM_MQ_DETACHED. * further attempts to send messages will likewise return SHM_MQ_DETACHED.
*
* This is separated out from shm_mq_detach() because if the on_dsm_detach
* callback fires, we only want to do this much. We do not try to touch
* the local shm_mq_handle, as it may have been pfree'd already.
*/ */
void static void
shm_mq_detach(shm_mq *mq) shm_mq_detach_internal(shm_mq *mq)
{ {
volatile shm_mq *vmq = mq; volatile shm_mq *vmq = mq;
PGPROC *victim; PGPROC *victim;
...@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg) ...@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
{ {
shm_mq *mq = (shm_mq *) DatumGetPointer(arg); shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
shm_mq_detach(mq); shm_mq_detach_internal(mq);
} }
...@@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, ...@@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
/* Associate worker handle with shm_mq. */ /* Associate worker handle with shm_mq. */
extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *); extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
/* Break connection. */ /* Break connection, release handle resources. */
extern void shm_mq_detach(shm_mq *); extern void shm_mq_detach(shm_mq_handle *mqh);
/* Get the shm_mq from handle. */ /* Get the shm_mq from handle. */
extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment