Commit 4de60244 authored by David Rowley's avatar David Rowley

Fix missing call to table_finish_bulk_insert during COPY

86b85044 abstracted calls to heap functions in COPY FROM to support a
generic table AM.  However, when performing a copy into a partitioned
table, this commit neglected to call table_finish_bulk_insert for each
partition.  Before 86b85044, when we always called the heap functions,
there was no need to call heapam_finish_bulk_insert for partitions since
it only did any work when performing a copy without WAL. For partitioned
tables, this was unsupported anyway, so there was no issue. With pluggable
storage, we can't make any assumptions about what the table AM might want
to do in its equivalent function, so we'd better ensure we always call
table_finish_bulk_insert each partition that's received a row.

For now, we make the table_finish_bulk_insert call whenever we evict a
CopyMultiInsertBuffer out of the CopyMultiInsertInfo.  This does mean
that it's possible that we call table_finish_bulk_insert multiple times
per partition, which is not a problem other than being an inefficiency.
Improving this requires a more invasive patch, so let's leave that for
another day.

In passing, move the table_finish_bulk_insert for the target of the COPY
command so that it's only called when we're actually performing bulk
inserts.  We don't need to call this when inserting 1 row at a time.

Reported-by: Robert Haas
Discussion: https://postgr.es/m/CA+TgmoYK=6BpxiJ0tN-p9wtH0BTAfbdxzHhwou0mdud4+BkYuQ@mail.gmail.com
parent 95bbe5d8
...@@ -2518,7 +2518,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, ...@@ -2518,7 +2518,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
* The buffer must be flushed before cleanup. * The buffer must be flushed before cleanup.
*/ */
static inline void static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer) CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{ {
int i; int i;
...@@ -2534,6 +2535,9 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer) ...@@ -2534,6 +2535,9 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]); ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
miinfo->ti_options);
pfree(buffer); pfree(buffer);
} }
...@@ -2585,7 +2589,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) ...@@ -2585,7 +2589,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
} }
CopyMultiInsertBufferCleanup(buffer); CopyMultiInsertBufferCleanup(miinfo, buffer);
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
} }
} }
...@@ -2599,7 +2603,7 @@ CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) ...@@ -2599,7 +2603,7 @@ CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
ListCell *lc; ListCell *lc;
foreach(lc, miinfo->multiInsertBuffers) foreach(lc, miinfo->multiInsertBuffers)
CopyMultiInsertBufferCleanup(lfirst(lc)); CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
list_free(miinfo->multiInsertBuffers); list_free(miinfo->multiInsertBuffers);
} }
...@@ -3321,9 +3325,6 @@ CopyFrom(CopyState cstate) ...@@ -3321,9 +3325,6 @@ CopyFrom(CopyState cstate)
{ {
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
/* Tear down the multi-insert buffer data */
CopyMultiInsertInfoCleanup(&multiInsertInfo);
} }
/* Done, clean up */ /* Done, clean up */
...@@ -3366,8 +3367,14 @@ CopyFrom(CopyState cstate) ...@@ -3366,8 +3367,14 @@ CopyFrom(CopyState cstate)
FreeExecutorState(estate); FreeExecutorState(estate);
if (insertMethod != CIM_SINGLE)
{
table_finish_bulk_insert(cstate->rel, ti_options); table_finish_bulk_insert(cstate->rel, ti_options);
/* Tear down the multi-insert buffer data */
CopyMultiInsertInfoCleanup(&multiInsertInfo);
}
return processed; return processed;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment