Commit 86b85044 authored by Andres Freund's avatar Andres Freund

tableam: Add table_multi_insert() and revamp/speed-up COPY FROM buffering.

This adds table_multi_insert(), and converts COPY FROM, the only user
of heap_multi_insert, to it.

A simple conversion of COPY FROM use slots would have yielded a
slowdown when inserting into a partitioned table for some
workloads. Different partitions might need different slots (both slot
types and their descriptors), and dropping / creating slots when
there's constant partition changes is measurable.

Thus instead revamp the COPY FROM buffering for partitioned tables to
allow to buffer inserts into multiple tables, flushing only when
limits are reached across all partition buffers. By only dropping
slots when there've been inserts into too many different partitions,
the aforementioned overhead is gone. By allowing larger batches, even
when there are frequent partition changes, we actuall speed such cases
up significantly.

By using slots COPY of very narrow rows into unlogged / temporary
might slow down very slightly (due to the indirect function calls).

Author: David Rowley, Andres Freund, Haribabu Kommi
Discussion:
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
    https://postgr.es/m/20190327054923.t3epfuewxfqdt22e@alap3.anarazel.de
parent 7bac3aca
...@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, ...@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
* temporary context before calling this, if that's a problem. * temporary context before calling this, if that's a problem.
*/ */
void void
heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CommandId cid, int options, BulkInsertState bistate) CommandId cid, int options, BulkInsertState bistate)
{ {
TransactionId xid = GetCurrentTransactionId(); TransactionId xid = GetCurrentTransactionId();
...@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, ...@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
saveFreeSpace = RelationGetTargetPageFreeSpace(relation, saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
HEAP_DEFAULT_FILLFACTOR); HEAP_DEFAULT_FILLFACTOR);
/* Toast and set header data in all the tuples */ /* Toast and set header data in all the slots */
heaptuples = palloc(ntuples * sizeof(HeapTuple)); heaptuples = palloc(ntuples * sizeof(HeapTuple));
for (i = 0; i < ntuples; i++) for (i = 0; i < ntuples; i++)
heaptuples[i] = heap_prepare_insert(relation, tuples[i], {
xid, cid, options); HeapTuple tuple;
tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
slots[i]->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slots[i]->tts_tableOid;
heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
options);
}
/* /*
* We're about to do the actual inserts -- but check for conflict first, * We're about to do the actual inserts -- but check for conflict first,
...@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, ...@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
} }
/* /* copy t_self fields back to the caller's slots */
* Copy t_self fields back to the caller's original tuples. This does
* nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
* probably faster to always copy than check.
*/
for (i = 0; i < ntuples; i++) for (i = 0; i < ntuples; i++)
tuples[i]->t_self = heaptuples[i]->t_self; slots[i]->tts_tid = heaptuples[i]->t_self;
pgstat_count_heap_insert(relation, ntuples); pgstat_count_heap_insert(relation, ntuples);
} }
......
...@@ -2516,6 +2516,7 @@ static const TableAmRoutine heapam_methods = { ...@@ -2516,6 +2516,7 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert = heapam_tuple_insert, .tuple_insert = heapam_tuple_insert,
.tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
.tuple_delete = heapam_tuple_delete, .tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update, .tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock, .tuple_lock = heapam_tuple_lock,
......
...@@ -90,9 +90,9 @@ typedef enum EolType ...@@ -90,9 +90,9 @@ typedef enum EolType
*/ */
typedef enum CopyInsertMethod typedef enum CopyInsertMethod
{ {
CIM_SINGLE, /* use heap_insert or fdw routine */ CIM_SINGLE, /* use table_insert or fdw routine */
CIM_MULTI, /* always use heap_multi_insert */ CIM_MULTI, /* always use table_multi_insert */
CIM_MULTI_CONDITIONAL /* use heap_multi_insert only if valid */ CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
} CopyInsertMethod; } CopyInsertMethod;
/* /*
...@@ -236,6 +236,55 @@ typedef struct ...@@ -236,6 +236,55 @@ typedef struct
} DR_copy; } DR_copy;
/*
* No more than this many tuples per CopyMultiInsertBuffer
*
* Caution: Don't make this too big, as we could end up with this many
* CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
* multiInsertBuffers list. Increasing this can cause quadratic growth in
* memory requirements during copies into partitioned tables with a large
* number of partitions.
*/
#define MAX_BUFFERED_TUPLES 1000
/*
* Flush buffers if there are >= this many bytes, as counted by the input
* size, of tuples stored.
*/
#define MAX_BUFFERED_BYTES 65535
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
/*
* Stores one or many CopyMultiInsertBuffers and details about the size and
* number of tuples which are stored in them. This allows multiple buffers to
* exist at once when COPYing into a partitioned table.
*/
typedef struct CopyMultiInsertInfo
{
List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
int bufferedTuples; /* number of tuples buffered over all buffers */
int bufferedBytes; /* number of bytes from all buffered tuples */
int nbuffers; /* number of buffers we're tracking */
CopyState cstate; /* Copy state for this CopyMultiInsertInfo */
EState *estate; /* Executor state used for COPY */
CommandId mycid; /* Command Id used for COPY */
int ti_options; /* table insert options */
} CopyMultiInsertInfo;
/* /*
* These macros centralize code used to process line_buf and raw_buf buffers. * These macros centralize code used to process line_buf and raw_buf buffers.
* They are macros because they often do continue/break control and to avoid * They are macros because they often do continue/break control and to avoid
...@@ -316,14 +365,7 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, ...@@ -316,14 +365,7 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
static void EndCopyTo(CopyState cstate); static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
Datum *values, bool *nulls);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int ti_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples,
uint64 firstBufferedLineNo);
static bool CopyReadLine(CopyState cstate); static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate); static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate);
...@@ -2073,33 +2115,27 @@ CopyTo(CopyState cstate) ...@@ -2073,33 +2115,27 @@ CopyTo(CopyState cstate)
if (cstate->rel) if (cstate->rel)
{ {
Datum *values; TupleTableSlot *slot;
bool *nulls;
TableScanDesc scandesc; TableScanDesc scandesc;
HeapTuple tuple;
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
slot = table_slot_create(cstate->rel, NULL);
processed = 0; processed = 0;
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
{ {
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Deconstruct the tuple ... faster than repeated heap_getattr */ /* Deconstruct the tuple ... */
heap_deform_tuple(tuple, tupDesc, values, nulls); slot_getallattrs(slot);
/* Format and send the data */ /* Format and send the data */
CopyOneRowTo(cstate, values, nulls); CopyOneRowTo(cstate, slot);
processed++; processed++;
} }
ExecDropSingleTupleTableSlot(slot);
table_endscan(scandesc); table_endscan(scandesc);
pfree(values);
pfree(nulls);
} }
else else
{ {
...@@ -2125,7 +2161,7 @@ CopyTo(CopyState cstate) ...@@ -2125,7 +2161,7 @@ CopyTo(CopyState cstate)
* Emit one row during CopyTo(). * Emit one row during CopyTo().
*/ */
static void static void
CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls) CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
{ {
bool need_delim = false; bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions; FmgrInfo *out_functions = cstate->out_functions;
...@@ -2142,11 +2178,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls) ...@@ -2142,11 +2178,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
CopySendInt16(cstate, list_length(cstate->attnumlist)); CopySendInt16(cstate, list_length(cstate->attnumlist));
} }
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
foreach(cur, cstate->attnumlist) foreach(cur, cstate->attnumlist)
{ {
int attnum = lfirst_int(cur); int attnum = lfirst_int(cur);
Datum value = values[attnum - 1]; Datum value = slot->tts_values[attnum - 1];
bool isnull = nulls[attnum - 1]; bool isnull = slot->tts_isnull[attnum - 1];
if (!cstate->binary) if (!cstate->binary)
{ {
...@@ -2305,47 +2344,337 @@ limit_printout_length(const char *str) ...@@ -2305,47 +2344,337 @@ limit_printout_length(const char *str)
return res; return res;
} }
/*
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
CopyMultiInsertBufferInit(ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
return buffer;
}
/*
* Make a new buffer for this ResultRelInfo.
*/
static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = CopyMultiInsertBufferInit(rri);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
/* Record that we're tracking this buffer */
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
}
/*
* Initialize an already allocated CopyMultiInsertInfo.
*
* If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
* for that table.
*/
static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyState cstate, EState *estate, CommandId mycid,
int ti_options)
{
miinfo->multiInsertBuffers = NIL;
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
miinfo->nbuffers = 0;
miinfo->cstate = cstate;
miinfo->estate = estate;
miinfo->mycid = mycid;
miinfo->ti_options = ti_options;
/*
* Only setup the buffer when not dealing with a partitioned table.
* Buffers for partitioned tables will just be setup when we need to send
* tuples their way for the first time.
*/
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
CopyMultiInsertInfoSetupBuffer(miinfo, rri);
}
/*
* Returns true if the buffers are full
*/
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
return true;
return false;
}
/*
* Returns true if we have no buffered tuples
*/
static inline bool
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
{
return miinfo->bufferedTuples == 0;
}
/*
* Write the tuples stored in 'buffer' out to the table.
*/
static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
MemoryContext oldcontext;
int i;
uint64 save_cur_lineno;
CopyState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
int nused = buffer->nused;
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
TupleTableSlot **slots = buffer->slots;
/*
* Print error context information correctly, if one of the operations
* below fail.
*/
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
/*
* table_multi_insert may leak memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
table_multi_insert(resultRelInfo->ri_RelationDesc,
slots,
nused,
mycid,
ti_options,
buffer->bistate);
MemoryContextSwitchTo(oldcontext);
for (i = 0; i < nused; i++)
{
/*
* If there are any indexes, update them for all the inserted tuples,
* and run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
List *recheckIndexes;
cstate->cur_lineno = buffer->linenos[i];
recheckIndexes =
ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
NIL);
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], recheckIndexes,
cstate->transition_capture);
list_free(recheckIndexes);
}
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT
* triggers anyway.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
{
cstate->cur_lineno = buffer->linenos[i];
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], NIL, cstate->transition_capture);
}
ExecClearTuple(slots[i]);
}
/* Mark that all slots are free */
buffer->nused = 0;
/* reset cur_lineno and line_buf_valid to what they were */
cstate->line_buf_valid = line_buf_valid;
cstate->cur_lineno = save_cur_lineno;
}
/*
* Drop used slots and free member for this buffer.
*
* The buffer must be flushed before cleanup.
*/
static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
{
int i;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
/* Remove back-link to ourself */
buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
pfree(buffer);
}
/*
* Write out all stored tuples in all buffers out to the tables.
*
* Once flushed we also trim the tracked buffers list down to size by removing
* the buffers created earliest first.
*
* Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
* used. When cleaning up old buffers we'll never remove the one for
* 'curr_rri'.
*/
static inline void
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
{
ListCell *lc;
foreach(lc, miinfo->multiInsertBuffers)
{
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
CopyMultiInsertBufferFlush(miinfo, buffer);
}
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
/*
* Trim the list of tracked buffers down if it exceeds the limit. Here we
* remove buffers starting with the ones we created first. It seems more
* likely that these older ones are less likely to be needed than ones
* that were just created.
*/
while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
/*
* We never want to remove the buffer that's currently being used, so
* if we happen to find that then move it to the end of the list.
*/
if (buffer->resultRelInfo == curr_rri)
{
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
}
CopyMultiInsertBufferCleanup(buffer);
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
}
}
/*
* Cleanup allocated buffers and free memory
*/
static inline void
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
{
ListCell *lc;
foreach(lc, miinfo->multiInsertBuffers)
CopyMultiInsertBufferCleanup(lfirst(lc));
list_free(miinfo->multiInsertBuffers);
}
/*
* Get the next TupleTableSlot that the next tuple should be stored in.
*
* Callers must ensure that the buffer is not full.
*/
static inline TupleTableSlot *
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
int nused = buffer->nused;
Assert(buffer != NULL);
Assert(nused < MAX_BUFFERED_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
return buffer->slots[nused];
}
/*
* Record the previously reserved TupleTableSlot that was reserved by
* MultiInsertInfoNextFreeSlot as being consumed.
*/
static inline void
CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
TupleTableSlot *slot, int tuplen, uint64 lineno)
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
Assert(slot == buffer->slots[buffer->nused]);
/* Store the line number so we can properly report any errors later */
buffer->linenos[buffer->nused] = lineno;
/* Record this slot as being used */
buffer->nused++;
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
miinfo->bufferedBytes += tuplen;
}
/* /*
* Copy FROM file to relation. * Copy FROM file to relation.
*/ */
uint64 uint64
CopyFrom(CopyState cstate) CopyFrom(CopyState cstate)
{ {
HeapTuple tuple;
TupleDesc tupDesc;
Datum *values;
bool *nulls;
ResultRelInfo *resultRelInfo; ResultRelInfo *resultRelInfo;
ResultRelInfo *target_resultRelInfo; ResultRelInfo *target_resultRelInfo;
ResultRelInfo *prevResultRelInfo = NULL; ResultRelInfo *prevResultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */ EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ModifyTableState *mtstate; ModifyTableState *mtstate;
ExprContext *econtext; ExprContext *econtext;
TupleTableSlot *myslot; TupleTableSlot *singleslot = NULL;
MemoryContext oldcontext = CurrentMemoryContext; MemoryContext oldcontext = CurrentMemoryContext;
MemoryContext batchcontext;
PartitionTupleRouting *proute = NULL; PartitionTupleRouting *proute = NULL;
ErrorContextCallback errcallback; ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true); CommandId mycid = GetCurrentCommandId(true);
int ti_options = 0; /* start with default table_insert options */ int ti_options = 0; /* start with default table_insert options */
BulkInsertState bistate; BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod; CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
uint64 processed = 0; uint64 processed = 0;
int nBufferedTuples = 0;
bool has_before_insert_row_trig; bool has_before_insert_row_trig;
bool has_instead_insert_row_trig; bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false; bool leafpart_use_multi_insert = false;
#define MAX_BUFFERED_TUPLES 1000
#define RECHECK_MULTI_INSERT_THRESHOLD 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
Size bufferedTuplesSize = 0;
uint64 firstBufferedLineNo = 0;
uint64 lastPartitionSampleLineNo = 0;
uint64 nPartitionChanges = 0;
double avgTuplesPerPartChange = 0;
Assert(cstate->rel); Assert(cstate->rel);
/* /*
...@@ -2382,8 +2711,6 @@ CopyFrom(CopyState cstate) ...@@ -2382,8 +2711,6 @@ CopyFrom(CopyState cstate)
RelationGetRelationName(cstate->rel)))); RelationGetRelationName(cstate->rel))));
} }
tupDesc = RelationGetDescr(cstate->rel);
/*---------- /*----------
* Check to see if we can avoid writing WAL * Check to see if we can avoid writing WAL
* *
...@@ -2412,7 +2739,7 @@ CopyFrom(CopyState cstate) ...@@ -2412,7 +2739,7 @@ CopyFrom(CopyState cstate)
* FSM for free space is a waste of time, even if we must use WAL because * FSM for free space is a waste of time, even if we must use WAL because
* of archiving. This could possibly be wrong, but it's unlikely. * of archiving. This could possibly be wrong, but it's unlikely.
* *
* The comments for heap_insert and RelationGetBufferForTuple specify that * The comments for table_insert and RelationGetBufferForTuple specify that
* skipping WAL logging is only safe if we ensure that our tuples do not * skipping WAL logging is only safe if we ensure that our tuples do not
* go into pages containing tuples from any other transactions --- but this * go into pages containing tuples from any other transactions --- but this
* must be the case if we have a new table or new relfilenode, so we need * must be the case if we have a new table or new relfilenode, so we need
...@@ -2518,10 +2845,6 @@ CopyFrom(CopyState cstate) ...@@ -2518,10 +2845,6 @@ CopyFrom(CopyState cstate)
ExecInitRangeTable(estate, cstate->range_table); ExecInitRangeTable(estate, cstate->range_table);
/* Set up a tuple slot too */
myslot = ExecInitExtraTupleSlot(estate, tupDesc,
&TTSOpsHeapTuple);
/* /*
* Set up a ModifyTableState so we can let FDW(s) init themselves for * Set up a ModifyTableState so we can let FDW(s) init themselves for
* foreign-table result relation(s). * foreign-table result relation(s).
...@@ -2565,10 +2888,11 @@ CopyFrom(CopyState cstate) ...@@ -2565,10 +2888,11 @@ CopyFrom(CopyState cstate)
&mtstate->ps); &mtstate->ps);
/* /*
* It's more efficient to prepare a bunch of tuples for insertion, and * It's generally more efficient to prepare a bunch of tuples for
* insert them in one heap_multi_insert() call, than call heap_insert() * insertion, and insert them in one table_multi_insert() call, than call
* separately for every tuple. However, there are a number of reasons why * table_insert() separately for every tuple. However, there are a number
* we might not be able to do this. These are explained below. * of reasons why we might not be able to do this. These are explained
* below.
*/ */
if (resultRelInfo->ri_TrigDesc != NULL && if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row || (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
...@@ -2589,8 +2913,8 @@ CopyFrom(CopyState cstate) ...@@ -2589,8 +2913,8 @@ CopyFrom(CopyState cstate)
* For partitioned tables we can't support multi-inserts when there * For partitioned tables we can't support multi-inserts when there
* are any statement level insert triggers. It might be possible to * are any statement level insert triggers. It might be possible to
* allow partitioned tables with such triggers in the future, but for * allow partitioned tables with such triggers in the future, but for
* now, CopyFromInsertBatch expects that any before row insert and * now, CopyMultiInsertInfoFlush expects that any before row insert
* statement level insert triggers are on the same relation. * and statement level insert triggers are on the same relation.
*/ */
insertMethod = CIM_SINGLE; insertMethod = CIM_SINGLE;
} }
...@@ -2622,8 +2946,7 @@ CopyFrom(CopyState cstate) ...@@ -2622,8 +2946,7 @@ CopyFrom(CopyState cstate)
{ {
/* /*
* For partitioned tables, we may still be able to perform bulk * For partitioned tables, we may still be able to perform bulk
* inserts for sets of consecutive tuples which belong to the same * inserts. However, the possibility of this depends on which types
* partition. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts * of triggers exist on the partition. We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert * if the partition is a foreign table or it has any before row insert
* or insert instead triggers (same as we checked above for the parent * or insert instead triggers (same as we checked above for the parent
...@@ -2632,18 +2955,27 @@ CopyFrom(CopyState cstate) ...@@ -2632,18 +2955,27 @@ CopyFrom(CopyState cstate)
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* flag that we must later determine if we can use bulk-inserts for * flag that we must later determine if we can use bulk-inserts for
* the partition being inserted into. * the partition being inserted into.
*
* Normally, when performing bulk inserts we just flush the insert
* buffer whenever it becomes full, but for the partitioned table
* case, we flush it whenever the current tuple does not belong to the
* same partition as the previous tuple.
*/ */
if (proute) if (proute)
insertMethod = CIM_MULTI_CONDITIONAL; insertMethod = CIM_MULTI_CONDITIONAL;
else else
insertMethod = CIM_MULTI; insertMethod = CIM_MULTI;
bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
estate, mycid, ti_options);
}
/*
* If not using batch mode (which allocates slots as needed) set up a
* tuple slot too. When inserting into a partitioned table, we also need
* one, even if we might batch insert, to read the tuple in the root
* partition's form.
*/
if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
{
singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
&estate->es_tupleTable);
bistate = GetBulkInsertState();
} }
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
...@@ -2660,10 +2992,6 @@ CopyFrom(CopyState cstate) ...@@ -2660,10 +2992,6 @@ CopyFrom(CopyState cstate)
*/ */
ExecBSInsertTriggers(estate, resultRelInfo); ExecBSInsertTriggers(estate, resultRelInfo);
values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
bistate = GetBulkInsertState();
econtext = GetPerTupleExprContext(estate); econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */ /* Set up callback to identify error line number */
...@@ -2672,17 +3000,9 @@ CopyFrom(CopyState cstate) ...@@ -2672,17 +3000,9 @@ CopyFrom(CopyState cstate)
errcallback.previous = error_context_stack; errcallback.previous = error_context_stack;
error_context_stack = &errcallback; error_context_stack = &errcallback;
/*
* Set up memory context for batches. For cases without batching we could
* use the per-tuple context, but it's simpler to just use it every time.
*/
batchcontext = AllocSetContextCreate(CurrentMemoryContext,
"batch context",
ALLOCSET_DEFAULT_SIZES);
for (;;) for (;;)
{ {
TupleTableSlot *slot; TupleTableSlot *myslot;
bool skip_tuple; bool skip_tuple;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -2693,42 +3013,53 @@ CopyFrom(CopyState cstate) ...@@ -2693,42 +3013,53 @@ CopyFrom(CopyState cstate)
*/ */
ResetPerTupleExprContext(estate); ResetPerTupleExprContext(estate);
/* select slot to (initially) load row into */
if (insertMethod == CIM_SINGLE || proute)
{
myslot = singleslot;
Assert(myslot != NULL);
}
else
{
Assert(resultRelInfo == target_resultRelInfo);
Assert(insertMethod == CIM_MULTI);
myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
}
/* /*
* Switch to per-tuple context before calling NextCopyFrom, which does * Switch to per-tuple context before calling NextCopyFrom, which does
* evaluate default expressions etc. and requires per-tuple context. * evaluate default expressions etc. and requires per-tuple context.
*/ */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
if (!NextCopyFrom(cstate, econtext, values, nulls)) ExecClearTuple(myslot);
break;
/* Switch into per-batch memory context before forming the tuple. */ /* Directly store the values/nulls array in the slot */
MemoryContextSwitchTo(batchcontext); if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
break;
/* And now we can form the input tuple. */ ExecStoreVirtualTuple(myslot);
tuple = heap_form_tuple(tupDesc, values, nulls);
/* /*
* Constraints might reference the tableoid column, so (re-)initialize * Constraints and where clause might reference the tableoid column,
* tts_tableOid before evaluating them. * so (re-)initialize tts_tableOid before evaluating them.
*/ */
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
/* Triggers and stuff need to be invoked in query context. */ /* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* Place tuple in tuple slot --- but slot shouldn't free it */
slot = myslot;
ExecStoreHeapTuple(tuple, slot, false);
if (cstate->whereClause) if (cstate->whereClause)
{ {
econtext->ecxt_scantuple = myslot; econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext)) if (!ExecQual(cstate->qualexpr, econtext))
continue; continue;
} }
/* Determine the partition to heap_insert the tuple into */ /* Determine the partition to insert the tuple into */
if (proute) if (proute)
{ {
TupleConversionMap *map; TupleConversionMap *map;
...@@ -2739,80 +3070,10 @@ CopyFrom(CopyState cstate) ...@@ -2739,80 +3070,10 @@ CopyFrom(CopyState cstate)
* if the found partition is not suitable for INSERTs. * if the found partition is not suitable for INSERTs.
*/ */
resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
proute, slot, estate); proute, myslot, estate);
if (prevResultRelInfo != resultRelInfo) if (prevResultRelInfo != resultRelInfo)
{ {
/* Check if we can multi-insert into this partition */
if (insertMethod == CIM_MULTI_CONDITIONAL)
{
/*
* When performing bulk-inserts into partitioned tables we
* must insert the tuples seen so far to the heap whenever
* the partition changes.
*/
if (nBufferedTuples > 0)
{
MemoryContext oldcontext;
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
prevResultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
nBufferedTuples = 0;
bufferedTuplesSize = 0;
/*
* The tuple is already allocated in the batch context, which
* we want to reset. So to keep the tuple we copy it into the
* short-lived (per-tuple) context, reset the batch context
* and then copy it back into the per-batch one.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
tuple = heap_copytuple(tuple);
MemoryContextSwitchTo(oldcontext);
/* cleanup the old batch */
MemoryContextReset(batchcontext);
/* copy the tuple back to the per-batch context */
oldcontext = MemoryContextSwitchTo(batchcontext);
tuple = heap_copytuple(tuple);
MemoryContextSwitchTo(oldcontext);
/*
* Also push the tuple copy to the slot (resetting the context
* invalidated the slot contents).
*/
ExecStoreHeapTuple(tuple, slot, false);
}
nPartitionChanges++;
/*
* Here we adaptively enable multi-inserts based on the
* average number of tuples from recent multi-insert
* batches. We recalculate the average every
* RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
* the average over the whole copy. This allows us to
* enable multi-inserts when we get periods in the copy
* stream that have tuples commonly belonging to the same
* partition, and disable when the partition is changing
* too often.
*/
if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
RECHECK_MULTI_INSERT_THRESHOLD)
&& cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
{
avgTuplesPerPartChange =
(cstate->cur_lineno - lastPartitionSampleLineNo) /
(double) nPartitionChanges;
lastPartitionSampleLineNo = cstate->cur_lineno;
nPartitionChanges = 0;
}
}
/* Determine which triggers exist on this partition */ /* Determine which triggers exist on this partition */
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row); resultRelInfo->ri_TrigDesc->trig_insert_before_row);
...@@ -2821,22 +3082,32 @@ CopyFrom(CopyState cstate) ...@@ -2821,22 +3082,32 @@ CopyFrom(CopyState cstate)
resultRelInfo->ri_TrigDesc->trig_insert_instead_row); resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
/* /*
* Tests have shown that using multi-inserts when the * Disable multi-inserts when the partition has BEFORE/INSTEAD
* partition changes on every tuple slightly decreases the * OF triggers, or if the partition is a foreign partition.
* performance, however, there are benefits even when only
* some batches have just 2 tuples, so let's enable
* multi-inserts even when the average is quite low.
*/ */
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
avgTuplesPerPartChange >= 1.3 &&
!has_before_insert_row_trig && !has_before_insert_row_trig &&
!has_instead_insert_row_trig && !has_instead_insert_row_trig &&
resultRelInfo->ri_FdwRoutine == NULL; resultRelInfo->ri_FdwRoutine == NULL;
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
resultRelInfo);
}
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
{
/* /*
* We'd better make the bulk insert mechanism gets a new * Flush pending inserts if this partition can't use
* buffer when the partition being inserted into changes. * batching, so rows are visible to triggers etc.
*/ */
CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
}
if (bistate != NULL)
ReleaseBulkInsertStatePin(bistate); ReleaseBulkInsertStatePin(bistate);
prevResultRelInfo = resultRelInfo; prevResultRelInfo = resultRelInfo;
} }
...@@ -2879,26 +3150,50 @@ CopyFrom(CopyState cstate) ...@@ -2879,26 +3150,50 @@ CopyFrom(CopyState cstate)
* rowtype. * rowtype.
*/ */
map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap; map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
{
/* non batch insert */
if (map != NULL) if (map != NULL)
{ {
TupleTableSlot *new_slot; TupleTableSlot *new_slot;
MemoryContext oldcontext;
new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
Assert(new_slot != NULL); myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
}
}
else
{
/*
* Prepare to queue up tuple for later batch insert into
* current partition.
*/
TupleTableSlot *batchslot;
/* no other path available for partitioned table */
Assert(insertMethod == CIM_MULTI_CONDITIONAL);
slot = execute_attr_map_slot(map->attrMap, slot, new_slot); batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
if (map != NULL)
myslot = execute_attr_map_slot(map->attrMap, myslot,
batchslot);
else
{
/* /*
* Get the tuple in the per-batch context, so that it will be * This looks more expensive than it is (Believe me, I
* freed after each batch insert. * optimized it away. Twice.). The input is in virtual
* form, and we'll materialize the slot below - for most
* slot types the copy performs the work materialization
* would later require anyway.
*/ */
oldcontext = MemoryContextSwitchTo(batchcontext); ExecCopySlot(batchslot, myslot);
tuple = ExecCopySlotHeapTuple(slot); myslot = batchslot;
MemoryContextSwitchTo(oldcontext); }
} }
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); /* ensure that triggers etc see the right relation */
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
} }
skip_tuple = false; skip_tuple = false;
...@@ -2906,7 +3201,7 @@ CopyFrom(CopyState cstate) ...@@ -2906,7 +3201,7 @@ CopyFrom(CopyState cstate)
/* BEFORE ROW INSERT Triggers */ /* BEFORE ROW INSERT Triggers */
if (has_before_insert_row_trig) if (has_before_insert_row_trig)
{ {
if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
skip_tuple = true; /* "do nothing" */ skip_tuple = true; /* "do nothing" */
} }
...@@ -2919,7 +3214,7 @@ CopyFrom(CopyState cstate) ...@@ -2919,7 +3214,7 @@ CopyFrom(CopyState cstate)
*/ */
if (has_instead_insert_row_trig) if (has_instead_insert_row_trig)
{ {
ExecIRInsertTriggers(estate, resultRelInfo, slot); ExecIRInsertTriggers(estate, resultRelInfo, myslot);
} }
else else
{ {
...@@ -2931,12 +3226,7 @@ CopyFrom(CopyState cstate) ...@@ -2931,12 +3226,7 @@ CopyFrom(CopyState cstate)
*/ */
if (resultRelInfo->ri_RelationDesc->rd_att->constr && if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
{ ExecComputeStoredGenerated(estate, myslot);
ExecComputeStoredGenerated(estate, slot);
MemoryContextSwitchTo(batchcontext);
tuple = ExecCopySlotHeapTuple(slot);
MemoryContextSwitchTo(oldcontext);
}
/* /*
* If the target is a plain table, check the constraints of * If the target is a plain table, check the constraints of
...@@ -2944,7 +3234,7 @@ CopyFrom(CopyState cstate) ...@@ -2944,7 +3234,7 @@ CopyFrom(CopyState cstate)
*/ */
if (resultRelInfo->ri_FdwRoutine == NULL && if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr) resultRelInfo->ri_RelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate); ExecConstraints(resultRelInfo, myslot, estate);
/* /*
* Also check the tuple against the partition constraint, if * Also check the tuple against the partition constraint, if
...@@ -2954,40 +3244,29 @@ CopyFrom(CopyState cstate) ...@@ -2954,40 +3244,29 @@ CopyFrom(CopyState cstate)
*/ */
if (resultRelInfo->ri_PartitionCheck && if (resultRelInfo->ri_PartitionCheck &&
(proute == NULL || has_before_insert_row_trig)) (proute == NULL || has_before_insert_row_trig))
ExecPartitionCheck(resultRelInfo, slot, estate, true); ExecPartitionCheck(resultRelInfo, myslot, estate, true);
/* /* Store the slot in the multi-insert buffer, when enabled. */
* Perform multi-inserts when enabled, or when loading a
* partitioned table that can support multi-inserts as
* determined above.
*/
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{ {
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);
/* Add this tuple to the tuple buffer */ /* Add this tuple to the tuple buffer */
if (nBufferedTuples == 0) CopyMultiInsertInfoStore(&multiInsertInfo,
firstBufferedLineNo = cstate->cur_lineno; resultRelInfo, myslot,
bufferedTuples[nBufferedTuples++] = tuple; cstate->line_buf.len,
bufferedTuplesSize += tuple->t_len; cstate->cur_lineno);
/* /*
* If the buffer filled up, flush it. Also flush if the * If enough inserts have queued up, then flush all
* total size of all the tuples in the buffer becomes * buffers out to their tables.
* large, to avoid using large amounts of memory for the
* buffer when the tuples are exceptionally wide.
*/ */
if (nBufferedTuples == MAX_BUFFERED_TUPLES || if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
bufferedTuplesSize > 65535) CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
{
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
nBufferedTuples = 0;
bufferedTuplesSize = 0;
/* free memory occupied by tuples from the batch */
MemoryContextReset(batchcontext);
}
} }
else else
{ {
...@@ -2996,12 +3275,12 @@ CopyFrom(CopyState cstate) ...@@ -2996,12 +3275,12 @@ CopyFrom(CopyState cstate)
/* OK, store the tuple */ /* OK, store the tuple */
if (resultRelInfo->ri_FdwRoutine != NULL) if (resultRelInfo->ri_FdwRoutine != NULL)
{ {
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
resultRelInfo, resultRelInfo,
slot, myslot,
NULL); NULL);
if (slot == NULL) /* "do nothing" */ if (myslot == NULL) /* "do nothing" */
continue; /* next tuple please */ continue; /* next tuple please */
/* /*
...@@ -3009,27 +3288,24 @@ CopyFrom(CopyState cstate) ...@@ -3009,27 +3288,24 @@ CopyFrom(CopyState cstate)
* column, so (re-)initialize tts_tableOid before * column, so (re-)initialize tts_tableOid before
* evaluating them. * evaluating them.
*/ */
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
} }
else else
{ {
tuple = ExecFetchSlotHeapTuple(slot, true, NULL); /* OK, store the tuple and create index entries for it */
heap_insert(resultRelInfo->ri_RelationDesc, tuple, table_insert(resultRelInfo->ri_RelationDesc, myslot,
mycid, ti_options, bistate); mycid, ti_options, bistate);
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
/* And create index entries for it */
if (resultRelInfo->ri_NumIndices > 0) if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, recheckIndexes = ExecInsertIndexTuples(myslot,
estate, estate,
false, false,
NULL, NULL,
NIL); NIL);
}
/* AFTER ROW INSERT Triggers */ /* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot, ExecARInsertTriggers(estate, resultRelInfo, myslot,
recheckIndexes, cstate->transition_capture); recheckIndexes, cstate->transition_capture);
list_free(recheckIndexes); list_free(recheckIndexes);
...@@ -3046,31 +3322,23 @@ CopyFrom(CopyState cstate) ...@@ -3046,31 +3322,23 @@ CopyFrom(CopyState cstate)
} }
/* Flush any remaining buffered tuples */ /* Flush any remaining buffered tuples */
if (nBufferedTuples > 0) if (insertMethod != CIM_SINGLE)
{ {
if (insertMethod == CIM_MULTI_CONDITIONAL) if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
{ CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
prevResultRelInfo, myslot, bistate, /* Tear down the multi-insert buffer data */
nBufferedTuples, bufferedTuples, CopyMultiInsertInfoCleanup(&multiInsertInfo);
firstBufferedLineNo);
}
else
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
} }
/* Done, clean up */ /* Done, clean up */
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
if (bistate != NULL)
FreeBulkInsertState(bistate); FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
MemoryContextDelete(batchcontext);
/* /*
* In the old protocol, tell pqcomm that we can process normal protocol * In the old protocol, tell pqcomm that we can process normal protocol
* messages again. * messages again.
...@@ -3084,9 +3352,6 @@ CopyFrom(CopyState cstate) ...@@ -3084,9 +3352,6 @@ CopyFrom(CopyState cstate)
/* Handle queued AFTER triggers */ /* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate); AfterTriggerEndQuery(estate);
pfree(values);
pfree(nulls);
ExecResetTupleTable(estate->es_tupleTable, false); ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */ /* Allow the FDW to shut down */
...@@ -3111,88 +3376,6 @@ CopyFrom(CopyState cstate) ...@@ -3111,88 +3376,6 @@ CopyFrom(CopyState cstate)
return processed; return processed;
} }
/*
* A subroutine of CopyFrom, to write the current batch of buffered heap
* tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
* triggers.
*/
static void
CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
int ti_options, ResultRelInfo *resultRelInfo,
TupleTableSlot *myslot, BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples,
uint64 firstBufferedLineNo)
{
MemoryContext oldcontext;
int i;
uint64 save_cur_lineno;
bool line_buf_valid = cstate->line_buf_valid;
/*
* Print error context information correctly, if one of the operations
* below fail.
*/
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
/*
* heap_multi_insert leaks memory, so switch to short-lived memory context
* before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
heap_multi_insert(resultRelInfo->ri_RelationDesc,
bufferedTuples,
nBufferedTuples,
mycid,
ti_options,
bistate);
MemoryContextSwitchTo(oldcontext);
/*
* If there are any indexes, update them for all the inserted tuples, and
* run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
for (i = 0; i < nBufferedTuples; i++)
{
List *recheckIndexes;
cstate->cur_lineno = firstBufferedLineNo + i;
ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
recheckIndexes =
ExecInsertIndexTuples(myslot,
estate, false, NULL, NIL);
ExecARInsertTriggers(estate, resultRelInfo,
myslot,
recheckIndexes, cstate->transition_capture);
list_free(recheckIndexes);
}
}
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT triggers
* anyway.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
{
for (i = 0; i < nBufferedTuples; i++)
{
cstate->cur_lineno = firstBufferedLineNo + i;
ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
ExecARInsertTriggers(estate, resultRelInfo,
myslot,
NIL, cstate->transition_capture);
}
}
/* reset cur_lineno and line_buf_valid to what they were */
cstate->line_buf_valid = line_buf_valid;
cstate->cur_lineno = save_cur_lineno;
}
/* /*
* Setup to read tuples from a file for COPY FROM. * Setup to read tuples from a file for COPY FROM.
* *
...@@ -4990,11 +5173,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -4990,11 +5173,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
DR_copy *myState = (DR_copy *) self; DR_copy *myState = (DR_copy *) self;
CopyState cstate = myState->cstate; CopyState cstate = myState->cstate;
/* Make sure the tuple is fully deconstructed */ /* Send the data */
slot_getallattrs(slot); CopyOneRowTo(cstate, slot);
/* And send the data */
CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
myState->processed++; myState->processed++;
return true; return true;
......
...@@ -1346,6 +1346,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, ...@@ -1346,6 +1346,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_PartitionCheck = partition_check; resultRelInfo->ri_PartitionCheck = partition_check;
resultRelInfo->ri_PartitionRoot = partition_root; resultRelInfo->ri_PartitionRoot = partition_root;
resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */ resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
} }
/* /*
......
...@@ -947,6 +947,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, ...@@ -947,6 +947,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
partRelInfo->ri_PartitionInfo = partrouteinfo; partRelInfo->ri_PartitionInfo = partrouteinfo;
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
/* /*
* Keep track of it in the PartitionTupleRouting->partitions array. * Keep track of it in the PartitionTupleRouting->partitions array.
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#define HEAP_INSERT_SPECULATIVE 0x0010 #define HEAP_INSERT_SPECULATIVE 0x0010
typedef struct BulkInsertStateData *BulkInsertState; typedef struct BulkInsertStateData *BulkInsertState;
struct TupleTableSlot;
#define MaxLockTupleMode LockTupleExclusive #define MaxLockTupleMode LockTupleExclusive
...@@ -143,8 +144,9 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); ...@@ -143,8 +144,9 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate); int options, BulkInsertState bistate);
extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
CommandId cid, int options, BulkInsertState bistate); int ntuples, CommandId cid, int options,
BulkInsertState bistate);
extern TM_Result heap_delete(Relation relation, ItemPointer tid, extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait, CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, bool changingPart); struct TM_FailureData *tmfd, bool changingPart);
......
...@@ -350,6 +350,10 @@ typedef struct TableAmRoutine ...@@ -350,6 +350,10 @@ typedef struct TableAmRoutine
uint32 specToken, uint32 specToken,
bool succeeded); bool succeeded);
/* see table_multi_insert() for reference about parameters */
void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
CommandId cid, int options, struct BulkInsertStateData *bistate);
/* see table_delete() for reference about parameters */ /* see table_delete() for reference about parameters */
TM_Result (*tuple_delete) (Relation rel, TM_Result (*tuple_delete) (Relation rel,
ItemPointer tid, ItemPointer tid,
...@@ -1077,6 +1081,28 @@ table_complete_speculative(Relation rel, TupleTableSlot *slot, ...@@ -1077,6 +1081,28 @@ table_complete_speculative(Relation rel, TupleTableSlot *slot,
succeeded); succeeded);
} }
/*
* Insert multiple tuple into a table.
*
* This is like table_insert(), but inserts multiple tuples in one
* operation. That's often faster than calling table_insert() in a loop,
* because e.g. the AM can reduce WAL logging and page locking overhead.
*
* Except for taking `nslots` tuples as input, as an array of TupleTableSlots
* in `slots`, the parameters for table_multi_insert() are the same as for
* table_insert().
*
* Note: this leaks memory into the current memory context. You can create a
* temporary context before calling this, if that's a problem.
*/
static inline void
table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
CommandId cid, int options, struct BulkInsertStateData *bistate)
{
rel->rd_tableam->multi_insert(rel, slots, nslots,
cid, options, bistate);
}
/* /*
* Delete a tuple. * Delete a tuple.
* *
......
...@@ -40,6 +40,7 @@ struct ExprState; ...@@ -40,6 +40,7 @@ struct ExprState;
struct ExprContext; struct ExprContext;
struct RangeTblEntry; /* avoid including parsenodes.h here */ struct RangeTblEntry; /* avoid including parsenodes.h here */
struct ExprEvalStep; /* avoid including execExpr.h everywhere */ struct ExprEvalStep; /* avoid including execExpr.h everywhere */
struct CopyMultiInsertBuffer;
/* ---------------- /* ----------------
...@@ -481,6 +482,9 @@ typedef struct ResultRelInfo ...@@ -481,6 +482,9 @@ typedef struct ResultRelInfo
/* Additional information specific to partition tuple routing */ /* Additional information specific to partition tuple routing */
struct PartitionRoutingInfo *ri_PartitionInfo; struct PartitionRoutingInfo *ri_PartitionInfo;
/* For use by copy.c when performing multi-inserts */
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo; } ResultRelInfo;
/* ---------------- /* ----------------
......
...@@ -402,6 +402,9 @@ ConversionLocation ...@@ -402,6 +402,9 @@ ConversionLocation
ConvertRowtypeExpr ConvertRowtypeExpr
CookedConstraint CookedConstraint
CopyDest CopyDest
CopyInsertMethod
CopyMultiInsertBuffer
CopyMultiInsertInfo
CopyState CopyState
CopyStateData CopyStateData
CopyStmt CopyStmt
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment