Commit d326d9e8 authored by Heikki Linnakangas's avatar Heikki Linnakangas

In COPY, insert tuples to the heap in batches.

This greatly reduces the WAL volume, especially when the table is narrow.
The overhead of locking the heap page is also reduced. Reduced WAL traffic
also makes it scale a lot better, if you run multiple COPY processes at
the same time.
parent 2c30f961
This diff is collapsed.
......@@ -33,6 +33,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
......@@ -149,6 +150,7 @@ typedef struct CopyStateData
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
bool volatile_defexprs; /* is any of defexprs volatile? */
/*
* These variables are used to reduce overhead in textual COPY FROM.
......@@ -277,6 +279,11 @@ static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
static uint64 CopyFrom(CopyState cstate);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int hi_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
......@@ -1842,11 +1849,17 @@ CopyFrom(CopyState cstate)
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
bool useHeapMultiInsert;
int nBufferedTuples = 0;
#define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples;
Size bufferedTuplesSize = 0;
Assert(cstate->rel);
......@@ -1941,6 +1954,28 @@ CopyFrom(CopyState cstate)
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
/*
* It's more efficient to prepare a bunch of tuples for insertion, and
* insert them in one heap_multi_insert() call, than call heap_insert()
* separately for every tuple. However, we can't do that if there are
* BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
* expressions. Such triggers or expressions might query the table we're
* inserting to, and act differently if the tuples that have already been
* processed and prepared for insertion are not there.
*/
if ((resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
cstate->volatile_defexprs)
{
useHeapMultiInsert = false;
}
else
{
useHeapMultiInsert = true;
bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
}
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
......@@ -1972,8 +2007,15 @@ CopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS();
/* Reset the per-tuple exprcontext */
if (nBufferedTuples == 0)
{
/*
* Reset the per-tuple exprcontext. We can only do this if the
* tuple buffer is empty (calling the context the per-tuple memory
* context is a bit of a misnomer now
*/
ResetPerTupleExprContext(estate);
}
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
......@@ -2010,12 +2052,36 @@ CopyFrom(CopyState cstate)
if (!skip_tuple)
{
List *recheckIndexes = NIL;
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
if (useHeapMultiInsert)
{
/* Add this tuple to the tuple buffer */
bufferedTuples[nBufferedTuples++] = tuple;
bufferedTuplesSize += tuple->t_len;
/*
* If the buffer filled up, flush it. Also flush if the total
* size of all the tuples in the buffer becomes large, to
* avoid using large amounts of memory for the buffers when
* the tuples are exceptionally wide.
*/
if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
bufferedTuplesSize > 65535)
{
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples);
nBufferedTuples = 0;
bufferedTuplesSize = 0;
}
}
else
{
List *recheckIndexes = NIL;
/* OK, store the tuple and create index entries for it */
heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
......@@ -2028,6 +2094,7 @@ CopyFrom(CopyState cstate)
recheckIndexes);
list_free(recheckIndexes);
}
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
......@@ -2038,6 +2105,12 @@ CopyFrom(CopyState cstate)
}
}
/* Flush any remaining buffered tuples */
if (nBufferedTuples > 0)
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples);
/* Done, clean up */
error_context_stack = errcontext.previous;
......@@ -2070,6 +2143,67 @@ CopyFrom(CopyState cstate)
return processed;
}
/*
* A subroutine of CopyFrom, to write the current batch of buffered heap
* tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
* triggers.
*/
static void
CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
int hi_options, ResultRelInfo *resultRelInfo,
TupleTableSlot *myslot, BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples)
{
MemoryContext oldcontext;
int i;
/*
* heap_multi_insert leaks memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
heap_multi_insert(cstate->rel,
bufferedTuples,
nBufferedTuples,
mycid,
hi_options,
bistate);
MemoryContextSwitchTo(oldcontext);
/*
* If there are any indexes, update them for all the inserted tuples,
* and run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
for (i = 0; i < nBufferedTuples; i++)
{
List *recheckIndexes;
ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
recheckIndexes =
ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
estate);
ExecARInsertTriggers(estate, resultRelInfo,
bufferedTuples[i],
recheckIndexes);
list_free(recheckIndexes);
}
}
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT triggers
* anyway.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_after_row)
{
for (i = 0; i < nBufferedTuples; i++)
ExecARInsertTriggers(estate, resultRelInfo,
bufferedTuples[i],
NIL);
}
}
/*
* Setup to read tuples from a file for COPY FROM.
*
......@@ -2099,6 +2233,7 @@ BeginCopyFrom(Relation rel,
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
......@@ -2122,6 +2257,7 @@ BeginCopyFrom(Relation rel,
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
volatile_defexprs = false;
/*
* Pick up the required catalog information for each attribute in the
......@@ -2163,6 +2299,9 @@ BeginCopyFrom(Relation rel,
expression_planner((Expr *) defexpr), NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
if (!volatile_defexprs)
volatile_defexprs = contain_volatile_functions(defexpr);
}
}
}
......@@ -2172,6 +2311,7 @@ BeginCopyFrom(Relation rel,
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
if (pipe)
......
......@@ -1677,10 +1677,10 @@ add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
}
/*
* pgstat_count_heap_insert - count a tuple insertion
* pgstat_count_heap_insert - count a tuple insertion of n tuples
*/
void
pgstat_count_heap_insert(Relation rel)
pgstat_count_heap_insert(Relation rel, int n)
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
......@@ -1693,7 +1693,7 @@ pgstat_count_heap_insert(Relation rel)
pgstat_info->trans->nest_level != nest_level)
add_tabstat_xact_level(pgstat_info, nest_level);
pgstat_info->trans->tuples_inserted++;
pgstat_info->trans->tuples_inserted += n;
}
}
......
......@@ -97,6 +97,8 @@ extern void FreeBulkInsertState(BulkInsertState);
extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate);
extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
CommandId cid, int options, BulkInsertState bistate);
extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
ItemPointer ctid, TransactionId *update_xmax,
CommandId cid, Snapshot crosscheck, bool wait);
......
......@@ -608,6 +608,7 @@ typedef HeapTupleData *HeapTuple;
/* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
#define XLOG_HEAP2_CLEANUP_INFO 0x30
#define XLOG_HEAP2_VISIBLE 0x40
#define XLOG_HEAP2_MULTI_INSERT 0x50
/*
* All what we need to find changed tuple
......@@ -661,6 +662,36 @@ typedef struct xl_heap_insert
#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
/*
* This is what we need to know about a multi-insert. The record consists of
* xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple
* data for each tuple. 'offsets' array is omitted if the whole page is
* reinitialized (XLOG_HEAP_INIT_PAGE)
*/
typedef struct xl_heap_multi_insert
{
RelFileNode node;
BlockNumber blkno;
bool all_visible_cleared;
uint16 ntuples;
OffsetNumber offsets[1];
/* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */
} xl_heap_multi_insert;
#define SizeOfHeapMultiInsert offsetof(xl_heap_multi_insert, offsets)
typedef struct xl_multi_insert_tuple
{
uint16 datalen; /* size of tuple data that follows */
uint16 t_infomask2;
uint16 t_infomask;
uint8 t_hoff;
/* TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_multi_insert_tuple;
#define SizeOfMultiInsertTuple (offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
/* This is what we need to know about update|hot_update */
typedef struct xl_heap_update
{
......
......@@ -766,7 +766,7 @@ extern void pgstat_initstats(Relation rel);
(rel)->pgstat_info->t_counts.t_blocks_hit++; \
} while (0)
extern void pgstat_count_heap_insert(Relation rel);
extern void pgstat_count_heap_insert(Relation rel, int n);
extern void pgstat_count_heap_update(Relation rel, bool hot);
extern void pgstat_count_heap_delete(Relation rel);
extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment