Commit b1ecb9b3 authored by Robert Haas's avatar Robert Haas

Fix interaction of partitioned tables with BulkInsertState.

When copying into a partitioned table, the target heap may change from
one tuple to next.  We must ask ReadBufferBI() to get a new buffer
every time such change occurs.  To do that, use new function
ReleaseBulkInsertStatePin().  This fixes the bug that tuples ended up
being inserted into the wrong partition, which occurred exactly
because the wrong buffer was used.

Amit Langote, per a suggestion from Robert Haas.  Some cosmetic
adjustments by me.

Reports by 高增琦 (Gao Zengqi), Venkata B Nagothi, and
Ragnar Ouchterlony.

Discussion: http://postgr.es/m/CAFmBtr32FDOqofo8yG-4mjzL1HnYHxXK5S9OGFJ%3D%3DcJpgEW4vA%40mail.gmail.com
Discussion: http://postgr.es/m/CAEyp7J9WiX0L3DoiNcRrY-9iyw%3DqP%2Bj%3DDLsAnNFF1xT2J1ggfQ%40mail.gmail.com
Discussion: http://postgr.es/m/16d73804-c9cd-14c5-463e-5caad563ff77%40agama.tv
Discussion: http://postgr.es/m/CA+TgmoaiZpDVUUN8LZ4jv1qFE_QyR+H9ec+79f5vNczYarg5Zg@mail.gmail.com
parent 3eaf03b5
...@@ -2324,6 +2324,17 @@ FreeBulkInsertState(BulkInsertState bistate) ...@@ -2324,6 +2324,17 @@ FreeBulkInsertState(BulkInsertState bistate)
pfree(bistate); pfree(bistate);
} }
/*
* ReleaseBulkInsertStatePin - release a buffer currently held in bistate
*/
void
ReleaseBulkInsertStatePin(BulkInsertState bistate)
{
if (bistate->current_buf != InvalidBuffer)
ReleaseBuffer(bistate->current_buf);
bistate->current_buf = InvalidBuffer;
}
/* /*
* heap_insert - insert tuple into a heap * heap_insert - insert tuple into a heap
......
...@@ -2307,6 +2307,7 @@ CopyFrom(CopyState cstate) ...@@ -2307,6 +2307,7 @@ CopyFrom(CopyState cstate)
uint64 processed = 0; uint64 processed = 0;
bool useHeapMultiInsert; bool useHeapMultiInsert;
int nBufferedTuples = 0; int nBufferedTuples = 0;
int prev_leaf_part_index = -1;
#define MAX_BUFFERED_TUPLES 1000 #define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
...@@ -2561,6 +2562,17 @@ CopyFrom(CopyState cstate) ...@@ -2561,6 +2562,17 @@ CopyFrom(CopyState cstate)
Assert(leaf_part_index >= 0 && Assert(leaf_part_index >= 0 &&
leaf_part_index < cstate->num_partitions); leaf_part_index < cstate->num_partitions);
/*
* If this tuple is mapped to a partition that is not same as the
* previous one, we'd better make the bulk insert mechanism gets a
* new buffer.
*/
if (prev_leaf_part_index != leaf_part_index)
{
ReleaseBulkInsertStatePin(bistate);
prev_leaf_part_index = leaf_part_index;
}
/* /*
* Save the old ResultRelInfo and switch to the one corresponding * Save the old ResultRelInfo and switch to the one corresponding
* to the selected partition. * to the selected partition.
......
...@@ -147,6 +147,7 @@ extern void setLastTid(const ItemPointer tid); ...@@ -147,6 +147,7 @@ extern void setLastTid(const ItemPointer tid);
extern BulkInsertState GetBulkInsertState(void); extern BulkInsertState GetBulkInsertState(void);
extern void FreeBulkInsertState(BulkInsertState); extern void FreeBulkInsertState(BulkInsertState);
extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate); int options, BulkInsertState bistate);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment