Commit 9f4992e2 authored by Robert Haas's avatar Robert Haas

Remove memory leak protection from Gather and Gather Merge nodes.

Before commit 6b65a7fe, tqueue.c could
perform tuple remapping and thus leak memory, which is why commit
af330393 made TupleQueueReaderNext
run in a short-lived context.  Now, however, tqueue.c has been reduced
to a shadow of its former self, and there shouldn't be any chance of
leaks any more.  Accordingly, remove some tuple copying and memory
context manipulation to speed up processing.

Patch by me, reviewed by Amit Kapila.  Some testing by Rafia Sabih.

Discussion: http://postgr.es/m/CAA4eK1LSDydwrNjmYSNkfJ3ZivGSWH9SVswh6QpNzsMdj_oOQA@mail.gmail.com
parent a852cfe9
...@@ -131,7 +131,6 @@ static TupleTableSlot * ...@@ -131,7 +131,6 @@ static TupleTableSlot *
ExecGather(PlanState *pstate) ExecGather(PlanState *pstate)
{ {
GatherState *node = castNode(GatherState, pstate); GatherState *node = castNode(GatherState, pstate);
TupleTableSlot *fslot = node->funnel_slot;
TupleTableSlot *slot; TupleTableSlot *slot;
ExprContext *econtext; ExprContext *econtext;
...@@ -205,11 +204,8 @@ ExecGather(PlanState *pstate) ...@@ -205,11 +204,8 @@ ExecGather(PlanState *pstate)
/* /*
* Reset per-tuple memory context to free any expression evaluation * Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. This will also clear * storage allocated in the previous tuple cycle.
* any previous tuple returned by a TupleQueueReader; to make sure we
* don't leave a dangling pointer around, clear the working slot first.
*/ */
ExecClearTuple(fslot);
econtext = node->ps.ps_ExprContext; econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext); ResetExprContext(econtext);
...@@ -258,7 +254,6 @@ gather_getnext(GatherState *gatherstate) ...@@ -258,7 +254,6 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate); PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot; TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot; TupleTableSlot *fslot = gatherstate->funnel_slot;
MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup; HeapTuple tup;
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
...@@ -267,12 +262,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -267,12 +262,7 @@ gather_getnext(GatherState *gatherstate)
if (gatherstate->nreaders > 0) if (gatherstate->nreaders > 0)
{ {
MemoryContext oldContext;
/* Run TupleQueueReaders in per-tuple context */
oldContext = MemoryContextSwitchTo(tupleContext);
tup = gather_readnext(gatherstate); tup = gather_readnext(gatherstate);
MemoryContextSwitchTo(oldContext);
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
...@@ -280,7 +270,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -280,7 +270,7 @@ gather_getnext(GatherState *gatherstate)
fslot, /* slot in which to store the tuple */ fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this InvalidBuffer, /* buffer associated with this
* tuple */ * tuple */
false); /* slot should not pfree tuple */ true); /* pfree tuple when done with it */
return fslot; return fslot;
} }
} }
......
...@@ -609,7 +609,7 @@ load_tuple_array(GatherMergeState *gm_state, int reader) ...@@ -609,7 +609,7 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
&tuple_buffer->done); &tuple_buffer->done);
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
break; break;
tuple_buffer->tuple[i] = heap_copytuple(tuple); tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++; tuple_buffer->nTuples++;
} }
} }
...@@ -673,7 +673,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ...@@ -673,7 +673,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
&tuple_buffer->done); &tuple_buffer->done);
if (!HeapTupleIsValid(tup)) if (!HeapTupleIsValid(tup))
return false; return false;
tup = heap_copytuple(tup);
/* /*
* Attempt to read more tuples in nowait mode and store them in the * Attempt to read more tuples in nowait mode and store them in the
...@@ -703,20 +702,13 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, ...@@ -703,20 +702,13 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
{ {
TupleQueueReader *reader; TupleQueueReader *reader;
HeapTuple tup; HeapTuple tup;
MemoryContext oldContext;
MemoryContext tupleContext;
/* Check for async events, particularly messages from workers. */ /* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Attempt to read a tuple. */ /* Attempt to read a tuple. */
reader = gm_state->reader[nreader - 1]; reader = gm_state->reader[nreader - 1];
/* Run TupleQueueReaders in per-tuple context */
tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
oldContext = MemoryContextSwitchTo(tupleContext);
tup = TupleQueueReaderNext(reader, nowait, done); tup = TupleQueueReaderNext(reader, nowait, done);
MemoryContextSwitchTo(oldContext);
return tup; return tup;
} }
......
...@@ -161,6 +161,8 @@ DestroyTupleQueueReader(TupleQueueReader *reader) ...@@ -161,6 +161,8 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* is set to true when there are no remaining tuples and otherwise to false. * is set to true when there are no remaining tuples and otherwise to false.
* *
* The returned tuple, if any, is allocated in CurrentMemoryContext. * The returned tuple, if any, is allocated in CurrentMemoryContext.
* Note that this routine must not leak memory! (We used to allow that,
* but not any more.)
* *
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call * accumulate bytes from a partially-read message, so it's useful to call
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment