Commit af330393 authored by Tom Lane's avatar Tom Lane

Fix worst memory leaks in tqueue.c.

TupleQueueReaderNext() leaks like a sieve if it has to do any tuple
disassembly/reconstruction.  While we could try to clean up its allocations
piecemeal, it seems like a better idea just to insist that it should be run
in a short-lived memory context, so that any transient space goes away
automatically.  I chose to have nodeGather.c switch into its existing
per-tuple context before the call, rather than inventing a separate
context inside tqueue.c.

This is sufficient to stop all leakage in the simple case I exhibited
earlier today (see link below), but it does not deal with leaks induced
in more complex cases by tqueue.c's insistence on using TopMemoryContext
for data that it's not actually trying hard to keep track of.  That issue
is intertwined with another major source of inefficiency, namely failure
to cache lookup results across calls, so it seems best to deal with it
separately.

In passing, improve some comments, and modify gather_readnext's method for
deciding when it's visited all the readers so that it's more obviously
correct.  (I'm not actually convinced that the previous code *is*
correct in the case of a reader deletion; it certainly seems fragile.)

Discussion: <32763.1469821037@sss.pgh.pa.us>
parent bf4ae685
...@@ -214,8 +214,11 @@ ExecGather(GatherState *node) ...@@ -214,8 +214,11 @@ ExecGather(GatherState *node)
/* /*
* Reset per-tuple memory context to free any expression evaluation * Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. Note we can't do this * storage allocated in the previous tuple cycle. Note we can't do this
* until we're done projecting. * until we're done projecting. This will also clear any previous tuple
* returned by a TupleQueueReader; to make sure we don't leave a dangling
* pointer around, clear the working slot first.
*/ */
ExecClearTuple(node->funnel_slot);
econtext = node->ps.ps_ExprContext; econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext); ResetExprContext(econtext);
...@@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate) ...@@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate); PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot; TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot; TupleTableSlot *fslot = gatherstate->funnel_slot;
MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup; HeapTuple tup;
while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
{ {
if (gatherstate->reader != NULL) if (gatherstate->reader != NULL)
{ {
MemoryContext oldContext;
/* Run TupleQueueReaders in per-tuple context */
oldContext = MemoryContextSwitchTo(tupleContext);
tup = gather_readnext(gatherstate); tup = gather_readnext(gatherstate);
MemoryContextSwitchTo(oldContext);
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
...@@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate)
fslot, /* slot in which to store the tuple */ fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this InvalidBuffer, /* buffer associated with this
* tuple */ * tuple */
true); /* pfree this pointer if not from heap */ false); /* slot should not pfree tuple */
return fslot; return fslot;
} }
} }
...@@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate)
static HeapTuple static HeapTuple
gather_readnext(GatherState *gatherstate) gather_readnext(GatherState *gatherstate)
{ {
int waitpos = gatherstate->nextreader; int nvisited = 0;
for (;;) for (;;)
{ {
...@@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate) ...@@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate)
*/ */
if (readerdone) if (readerdone)
{ {
Assert(!tup);
DestroyTupleQueueReader(reader); DestroyTupleQueueReader(reader);
--gatherstate->nreaders; --gatherstate->nreaders;
if (gatherstate->nreaders == 0) if (gatherstate->nreaders == 0)
...@@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate) ...@@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate)
ExecShutdownGatherWorkers(gatherstate); ExecShutdownGatherWorkers(gatherstate);
return NULL; return NULL;
} }
else
{
memmove(&gatherstate->reader[gatherstate->nextreader], memmove(&gatherstate->reader[gatherstate->nextreader],
&gatherstate->reader[gatherstate->nextreader + 1], &gatherstate->reader[gatherstate->nextreader + 1],
sizeof(TupleQueueReader *) sizeof(TupleQueueReader *)
* (gatherstate->nreaders - gatherstate->nextreader)); * (gatherstate->nreaders - gatherstate->nextreader));
if (gatherstate->nextreader >= gatherstate->nreaders) if (gatherstate->nextreader >= gatherstate->nreaders)
gatherstate->nextreader = 0; gatherstate->nextreader = 0;
if (gatherstate->nextreader < waitpos)
--waitpos;
}
continue; continue;
} }
...@@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate) ...@@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate)
* every tuple, but it turns out to be much more efficient to keep * every tuple, but it turns out to be much more efficient to keep
* reading from the same queue until that would require blocking. * reading from the same queue until that would require blocking.
*/ */
gatherstate->nextreader = gatherstate->nextreader++;
(gatherstate->nextreader + 1) % gatherstate->nreaders; if (gatherstate->nextreader >= gatherstate->nreaders)
gatherstate->nextreader = 0;
/* Have we visited every TupleQueueReader? */ /* Have we visited every (surviving) TupleQueueReader? */
if (gatherstate->nextreader == waitpos) nvisited++;
if (nvisited >= gatherstate->nreaders)
{ {
/* /*
* If (still) running plan locally, return NULL so caller can * If (still) running plan locally, return NULL so caller can
...@@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate) ...@@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate)
WaitLatch(MyLatch, WL_LATCH_SET, 0); WaitLatch(MyLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch); ResetLatch(MyLatch);
nvisited = 0;
} }
} }
} }
......
...@@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader) ...@@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
/* /*
* Fetch a tuple from a tuple queue reader. * Fetch a tuple from a tuple queue reader.
* *
* The return value is NULL if there are no remaining tuples or if
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
* The returned tuple, if any, is allocated in CurrentMemoryContext.
* That should be a short-lived (tuple-lifespan) context, because we are
* pretty cavalier about leaking memory in that context if we have to do
* tuple remapping.
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call * accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned. * this with nowait = true even if nothing is returned.
*
* The return value is NULL if there are no remaining queues or if
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when queue is detached and otherwise to false.
*/ */
HeapTuple HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
...@@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) ...@@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
* OK, we got a message. Process it. * OK, we got a message. Process it.
* *
* One-byte messages are mode switch messages, so that we can switch * One-byte messages are mode switch messages, so that we can switch
* between "control" and "data" mode. When in "data" mode, each * between "control" and "data" mode. Otherwise, when in "data" mode,
* message (unless exactly one byte) is a tuple. When in "control" * each message is a tuple. When in "control" mode, each message
* mode, each message provides a transient-typmod-to-tupledesc mapping * provides a transient-typmod-to-tupledesc mapping to let us
* so we can interpret future tuples. * interpret future tuples. Both of those cases certainly require
* more than one byte, so no confusion is possible.
*/ */
if (nbytes == 1) if (nbytes == 1)
{ {
......
...@@ -17,15 +17,17 @@ ...@@ -17,15 +17,17 @@
#include "storage/shm_mq.h" #include "storage/shm_mq.h"
#include "tcop/dest.h" #include "tcop/dest.h"
/* Opaque struct, only known inside tqueue.c. */
typedef struct TupleQueueReader TupleQueueReader;
/* Use this to send tuples to a shm_mq. */ /* Use this to send tuples to a shm_mq. */
extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */ /* Use these to receive tuples from a shm_mq. */
typedef struct TupleQueueReader TupleQueueReader;
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle,
TupleDesc tupledesc); TupleDesc tupledesc);
extern void DestroyTupleQueueReader(TupleQueueReader *funnel); extern void DestroyTupleQueueReader(TupleQueueReader *reader);
extern HeapTuple TupleQueueReaderNext(TupleQueueReader *, extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
bool nowait, bool *done); bool nowait, bool *done);
#endif /* TQUEUE_H */ #endif /* TQUEUE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment