Commit 36a1281f authored by Tomas Vondra's avatar Tomas Vondra

Separate per-batch and per-tuple memory contexts in COPY

In batching mode, COPY was using the same (per-tuple) memory context for
allocations with longer lifetime. This was confusing but harmless, until
commit 31f38174 added COPY FROM ... WHERE feature, introducing a risk
of memory leak.

The "per-tuple" memory context was reset only when starting new batch,
but as the rows may be filtered out by the WHERE clauses, that may not
happen at all.  The WHERE clause however has to be evaluated for all
rows, before filtering them out.

This commit separates the per-tuple and per-batch contexts, removing the
ambiguity.  Expressions (both defaults and WHERE clause) are evaluated
in the per-tuple context, while tuples are formed in the batch context.
This allows resetting the contexts at appropriate times.

The main complexity is related to partitioning, in which case we need to
reset the batch context after forming the tuple (which happens before
routing to leaf partition).  Instead of switching between two contexts
as before, we simply copy the last tuple aside, reset the context and
then copy the tuple back.  The performance impact is negligible, and
juggling with two contexts is not free either.

Discussion: https://www.postgresql.org/message-id/flat/CALAY4q_DdpWDuB5-Zyi-oTtO2uSk8pmy+dupiRe3AvAc++1imA@mail.gmail.com
parent 4be058fe
...@@ -2323,9 +2323,9 @@ CopyFrom(CopyState cstate) ...@@ -2323,9 +2323,9 @@ CopyFrom(CopyState cstate)
ExprContext *econtext; ExprContext *econtext;
TupleTableSlot *myslot; TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext; MemoryContext oldcontext = CurrentMemoryContext;
MemoryContext batchcontext;
PartitionTupleRouting *proute = NULL; PartitionTupleRouting *proute = NULL;
ExprContext *secondaryExprContext = NULL;
ErrorContextCallback errcallback; ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true); CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */ int hi_options = 0; /* start with default heap_insert options */
...@@ -2639,20 +2639,10 @@ CopyFrom(CopyState cstate) ...@@ -2639,20 +2639,10 @@ CopyFrom(CopyState cstate)
* Normally, when performing bulk inserts we just flush the insert * Normally, when performing bulk inserts we just flush the insert
* buffer whenever it becomes full, but for the partitioned table * buffer whenever it becomes full, but for the partitioned table
* case, we flush it whenever the current tuple does not belong to the * case, we flush it whenever the current tuple does not belong to the
* same partition as the previous tuple, and since we flush the * same partition as the previous tuple.
* previous partition's buffer once the new tuple has already been
* built, we're unable to reset the estate since we'd free the memory
* in which the new tuple is stored. To work around this we maintain
* a secondary expression context and alternate between these when the
* partition changes. This does mean we do store the first new tuple
* in a different context than subsequent tuples, but that does not
* matter, providing we don't free anything while it's still needed.
*/ */
if (proute) if (proute)
{
insertMethod = CIM_MULTI_CONDITIONAL; insertMethod = CIM_MULTI_CONDITIONAL;
secondaryExprContext = CreateExprContext(estate);
}
else else
insertMethod = CIM_MULTI; insertMethod = CIM_MULTI;
...@@ -2685,6 +2675,14 @@ CopyFrom(CopyState cstate) ...@@ -2685,6 +2675,14 @@ CopyFrom(CopyState cstate)
errcallback.previous = error_context_stack; errcallback.previous = error_context_stack;
error_context_stack = &errcallback; error_context_stack = &errcallback;
/*
* Set up memory context for batches. For cases without batching we could
* use the per-tuple context, but it's simpler to just use it every time.
*/
batchcontext = AllocSetContextCreate(CurrentMemoryContext,
"batch context",
ALLOCSET_DEFAULT_SIZES);
for (;;) for (;;)
{ {
TupleTableSlot *slot; TupleTableSlot *slot;
...@@ -2692,22 +2690,24 @@ CopyFrom(CopyState cstate) ...@@ -2692,22 +2690,24 @@ CopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (nBufferedTuples == 0)
{
/* /*
* Reset the per-tuple exprcontext. We can only do this if the * Reset the per-tuple exprcontext. We do this after every tuple, to
* tuple buffer is empty. (Calling the context the per-tuple * clean-up after expression evaluations etc.
* memory context is a bit of a misnomer now.)
*/ */
ResetPerTupleExprContext(estate); ResetPerTupleExprContext(estate);
}
/* Switch into its memory context */ /*
* Switch to per-tuple context before calling NextCopyFrom, which does
* evaluate default expressions etc. and requires per-tuple context.
*/
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
if (!NextCopyFrom(cstate, econtext, values, nulls)) if (!NextCopyFrom(cstate, econtext, values, nulls))
break; break;
/* Switch into per-batch memory context before forming the tuple. */
MemoryContextSwitchTo(batchcontext);
/* And now we can form the input tuple. */ /* And now we can form the input tuple. */
tuple = heap_form_tuple(tupDesc, values, nulls); tuple = heap_form_tuple(tupDesc, values, nulls);
...@@ -2756,7 +2756,7 @@ CopyFrom(CopyState cstate) ...@@ -2756,7 +2756,7 @@ CopyFrom(CopyState cstate)
*/ */
if (nBufferedTuples > 0) if (nBufferedTuples > 0)
{ {
ExprContext *swapcontext; MemoryContext oldcontext;
CopyFromInsertBatch(cstate, estate, mycid, hi_options, CopyFromInsertBatch(cstate, estate, mycid, hi_options,
prevResultRelInfo, myslot, bistate, prevResultRelInfo, myslot, bistate,
...@@ -2765,29 +2765,29 @@ CopyFrom(CopyState cstate) ...@@ -2765,29 +2765,29 @@ CopyFrom(CopyState cstate)
nBufferedTuples = 0; nBufferedTuples = 0;
bufferedTuplesSize = 0; bufferedTuplesSize = 0;
Assert(secondaryExprContext); /*
* The tuple is already allocated in the batch context, which
* we want to reset. So to keep the tuple we copy it into the
* short-lived (per-tuple) context, reset the batch context
* and then copy it back into the per-batch one.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
tuple = heap_copytuple(tuple);
MemoryContextSwitchTo(oldcontext);
/* cleanup the old batch */
MemoryContextReset(batchcontext);
/* copy the tuple back to the per-batch context */
oldcontext = MemoryContextSwitchTo(batchcontext);
tuple = heap_copytuple(tuple);
MemoryContextSwitchTo(oldcontext);
/* /*
* Normally we reset the per-tuple context whenever * Also push the tuple copy to the slot (resetting the context
* the bufferedTuples array is empty at the beginning * invalidated the slot contents).
* of the loop, however, it is possible since we flush */
* the buffer here that the buffer is never empty at ExecStoreHeapTuple(tuple, slot, false);
* the start of the loop. To prevent the per-tuple
* context from never being reset we maintain a second
* context and alternate between them when the
* partition changes. We can now reset
* secondaryExprContext as this is no longer needed,
* since we just flushed any tuples stored in it. We
* also now switch over to the other context. This
* does mean that the first tuple in the buffer won't
* be in the same context as the others, but that does
* not matter since we only reset it after the flush.
*/
ReScanExprContext(secondaryExprContext);
swapcontext = secondaryExprContext;
secondaryExprContext = estate->es_per_tuple_exprcontext;
estate->es_per_tuple_exprcontext = swapcontext;
} }
nPartitionChanges++; nPartitionChanges++;
...@@ -2893,10 +2893,10 @@ CopyFrom(CopyState cstate) ...@@ -2893,10 +2893,10 @@ CopyFrom(CopyState cstate)
slot = execute_attr_map_slot(map->attrMap, slot, new_slot); slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
/* /*
* Get the tuple in the per-tuple context, so that it will be * Get the tuple in the per-batch context, so that it will be
* freed after each batch insert. * freed after each batch insert.
*/ */
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldcontext = MemoryContextSwitchTo(batchcontext);
tuple = ExecCopySlotHeapTuple(slot); tuple = ExecCopySlotHeapTuple(slot);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
...@@ -2972,6 +2972,9 @@ CopyFrom(CopyState cstate) ...@@ -2972,6 +2972,9 @@ CopyFrom(CopyState cstate)
firstBufferedLineNo); firstBufferedLineNo);
nBufferedTuples = 0; nBufferedTuples = 0;
bufferedTuplesSize = 0; bufferedTuplesSize = 0;
/* free memory occupied by tuples from the batch */
MemoryContextReset(batchcontext);
} }
} }
else else
...@@ -3053,6 +3056,8 @@ CopyFrom(CopyState cstate) ...@@ -3053,6 +3056,8 @@ CopyFrom(CopyState cstate)
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
MemoryContextDelete(batchcontext);
/* /*
* In the old protocol, tell pqcomm that we can process normal protocol * In the old protocol, tell pqcomm that we can process normal protocol
* messages again. * messages again.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment