Commit 1b0fc850 authored by Robert Haas's avatar Robert Haas

Properly adjust pointers when tuples are moved during CLUSTER.

Otherwise, when we abandon incremental memory accounting and use
batch allocation for the final merge pass, we might crash.  This
has been broken since 0011c009.

Peter Geoghegan, tested by Noah Misch
parent b22934dc
......@@ -300,10 +300,20 @@ struct Tuplesortstate
* the already-read length of the stored tuple. Create a palloc'd copy,
* initialize tuple/datum1/isnull1 in the target SortTuple struct, and
* decrease state->availMem by the amount of memory space consumed.
* (See batchUsed notes for details on how memory is handled when
* incremental accounting is abandoned.)
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
/*
* Function to move a caller tuple. This is usually implemented as a
* memmove() shim, but function may also perform additional fix-up of
* caller tuple where needed. Batch memory support requires the
* movement of caller tuples from one location in memory to another.
*/
void (*movetup) (void *dest, void *src, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
......@@ -475,6 +485,7 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
......@@ -571,6 +582,7 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void movetup_heap(void *dest, void *src, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
......@@ -578,6 +590,7 @@ static void writetup_cluster(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void movetup_cluster(void *dest, void *src, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
......@@ -587,6 +600,7 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void movetup_index(void *dest, void *src, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
......@@ -594,6 +608,7 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void movetup_datum(void *dest, void *src, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
......@@ -749,6 +764,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
state->movetup = movetup_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
state->abbrevNext = 10;
......@@ -821,6 +837,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
state->movetup = movetup_cluster;
state->abbrevNext = 10;
state->indexInfo = BuildIndexInfo(indexRel);
......@@ -912,6 +929,7 @@ tuplesort_begin_index_btree(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->movetup = movetup_index;
state->abbrevNext = 10;
state->heapRel = heapRel;
......@@ -979,6 +997,7 @@ tuplesort_begin_index_hash(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->movetup = movetup_index;
state->heapRel = heapRel;
state->indexRel = indexRel;
......@@ -1021,6 +1040,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
state->movetup = movetup_datum;
state->abbrevNext = 10;
state->datumType = datumType;
......@@ -2975,7 +2995,7 @@ mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
*/
tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
state->mergecurrent[srcTape] = state->mergetuples[srcTape];
memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
tupLen);
/* Make SortTuple at top of the merge heap point to new tuple */
......@@ -3039,7 +3059,7 @@ mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
memcpy(rtup->tuple, oldTuple, tuplen);
MOVETUP(rtup->tuple, oldTuple, tuplen);
*should_free = true;
}
......@@ -4061,6 +4081,12 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
static void
movetup_heap(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
......@@ -4302,6 +4328,18 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
static void
movetup_cluster(void *dest, void *src, unsigned int len)
{
HeapTuple tuple;
memmove(dest, src, len);
/* Repoint the HeapTupleData header */
tuple = (HeapTuple) dest;
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
}
/*
* Routines specialized for IndexTuple case
......@@ -4594,6 +4632,12 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
static void
movetup_index(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Routines specialized for DatumTuple case
*/
......@@ -4704,6 +4748,12 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
&tuplen, sizeof(tuplen));
}
static void
movetup_datum(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment