Commit 5ea86e6e authored by Robert Haas's avatar Robert Haas

Use the sortsupport infrastructure in more cases.

This removes some fmgr overhead from cases such as btree index builds.

Peter Geoghegan, reviewed by Andreas Karlsson and me.
parent 99e8f08f
...@@ -686,6 +686,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) ...@@ -686,6 +686,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
int i, int i,
keysz = RelationGetNumberOfAttributes(wstate->index); keysz = RelationGetNumberOfAttributes(wstate->index);
ScanKey indexScanKey = NULL; ScanKey indexScanKey = NULL;
SortSupport sortKeys;
if (merge) if (merge)
{ {
...@@ -701,6 +702,31 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) ...@@ -701,6 +702,31 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
true, &should_free2); true, &should_free2);
indexScanKey = _bt_mkscankey_nodata(wstate->index); indexScanKey = _bt_mkscankey_nodata(wstate->index);
/* Prepare SortSupport data for each column */
sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
for (i = 0; i < keysz; i++)
{
SortSupport sortKey = sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
for (;;) for (;;)
{ {
load1 = true; /* load BTSpool next ? */ load1 = true; /* load BTSpool next ? */
...@@ -713,43 +739,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) ...@@ -713,43 +739,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
{ {
for (i = 1; i <= keysz; i++) for (i = 1; i <= keysz; i++)
{ {
ScanKey entry; SortSupport entry;
Datum attrDatum1, Datum attrDatum1,
attrDatum2; attrDatum2;
bool isNull1, bool isNull1,
isNull2; isNull2;
int32 compare; int32 compare;
entry = indexScanKey + i - 1; entry = sortKeys + i - 1;
attrDatum1 = index_getattr(itup, i, tupdes, &isNull1); attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2); attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
if (isNull1)
{ compare = ApplySortComparator(attrDatum1, isNull1,
if (isNull2) attrDatum2, isNull2,
compare = 0; /* NULL "=" NULL */ entry);
else if (entry->sk_flags & SK_BT_NULLS_FIRST)
compare = -1; /* NULL "<" NOT_NULL */
else
compare = 1; /* NULL ">" NOT_NULL */
}
else if (isNull2)
{
if (entry->sk_flags & SK_BT_NULLS_FIRST)
compare = 1; /* NOT_NULL ">" NULL */
else
compare = -1; /* NOT_NULL "<" NULL */
}
else
{
compare =
DatumGetInt32(FunctionCall2Coll(&entry->sk_func,
entry->sk_collation,
attrDatum1,
attrDatum2));
if (entry->sk_flags & SK_BT_DESC)
compare = -compare;
}
if (compare > 0) if (compare > 0)
{ {
load1 = false; load1 = false;
...@@ -783,7 +786,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) ...@@ -783,7 +786,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
true, &should_free2); true, &should_free2);
} }
} }
_bt_freeskey(indexScanKey); pfree(sortKeys);
} }
else else
{ {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "access/nbtree.h" #include "access/nbtree.h"
#include "fmgr.h" #include "fmgr.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/sortsupport.h" #include "utils/sortsupport.h"
...@@ -86,28 +87,14 @@ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup) ...@@ -86,28 +87,14 @@ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup)
} }
/* /*
* Fill in SortSupport given an ordering operator (btree "<" or ">" operator). * Look up and call sortsupport function to setup SortSupport comparator;
* * or if no such function exists or it declines to set up the appropriate
* Caller must previously have zeroed the SortSupportData structure and then * state, prepare a suitable shim.
* filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill
* in ssup_reverse as well as the comparator function pointer.
*/ */
void static void
PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup)
{ {
Oid sortSupportFunction; Oid sortSupportFunction;
Oid opfamily;
Oid opcintype;
int16 strategy;
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
&strategy))
elog(ERROR, "operator %u is not a valid ordering operator",
orderingOp);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
/* Look for a sort support function */ /* Look for a sort support function */
sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
...@@ -136,3 +123,57 @@ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) ...@@ -136,3 +123,57 @@ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
PrepareSortSupportComparisonShim(sortFunction, ssup); PrepareSortSupportComparisonShim(sortFunction, ssup);
} }
} }
/*
* Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
*
* Caller must previously have zeroed the SortSupportData structure and then
* filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill
* in ssup_reverse as well as the comparator function pointer.
*/
void
PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
{
Oid opfamily;
Oid opcintype;
int16 strategy;
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
&strategy))
elog(ERROR, "operator %u is not a valid ordering operator",
orderingOp);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
FinishSortSupportFunction(opfamily, opcintype, ssup);
}
/*
* Fill in SortSupport given an index relation, attribute, and strategy.
*
* Caller must previously have zeroed the SortSupportData structure and then
* filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This
* will fill in ssup_reverse (based on the supplied strategy), as well as the
* comparator function pointer.
*/
void
PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
SortSupport ssup)
{
Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (indexRel->rd_rel->relam != BTREE_AM_OID)
elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam);
if (strategy != BTGreaterStrategyNumber &&
strategy != BTLessStrategyNumber)
elog(ERROR, "unexpected sort support strategy: %d", strategy);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
FinishSortSupportFunction(opfamily, opcintype, ssup);
}
...@@ -256,13 +256,6 @@ struct Tuplesortstate ...@@ -256,13 +256,6 @@ struct Tuplesortstate
void (*readtup) (Tuplesortstate *state, SortTuple *stup, void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len); int tapenum, unsigned int len);
/*
* Function to reverse the sort direction from its current state. (We
* could dispense with this if we wanted to enforce that all variants
* represent the sort key information alike.)
*/
void (*reversedirection) (Tuplesortstate *state);
/* /*
* This array holds the tuples now in sort memory. If we are in state * This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state * INITIAL, the tuples are in no particular order; if we are in state
...@@ -340,8 +333,9 @@ struct Tuplesortstate ...@@ -340,8 +333,9 @@ struct Tuplesortstate
bool markpos_eof; /* saved "eof_reached" */ bool markpos_eof; /* saved "eof_reached" */
/* /*
* These variables are specific to the MinimalTuple case; they are set by * The sortKeys variable is used by every case other than the hash index
* tuplesort_begin_heap and used only by the MinimalTuple routines. * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
* MinimalTuple and CLUSTER routines, though.
*/ */
TupleDesc tupDesc; TupleDesc tupDesc;
SortSupport sortKeys; /* array of length nKeys */ SortSupport sortKeys; /* array of length nKeys */
...@@ -354,8 +348,7 @@ struct Tuplesortstate ...@@ -354,8 +348,7 @@ struct Tuplesortstate
/* /*
* These variables are specific to the CLUSTER case; they are set by * These variables are specific to the CLUSTER case; they are set by
* tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and * tuplesort_begin_cluster.
* indexScanKey.
*/ */
IndexInfo *indexInfo; /* info about index being used for reference */ IndexInfo *indexInfo; /* info about index being used for reference */
EState *estate; /* for evaluating index expressions */ EState *estate; /* for evaluating index expressions */
...@@ -368,7 +361,6 @@ struct Tuplesortstate ...@@ -368,7 +361,6 @@ struct Tuplesortstate
Relation indexRel; /* index being built */ Relation indexRel; /* index being built */
/* These are specific to the index_btree subcase: */ /* These are specific to the index_btree subcase: */
ScanKey indexScanKey;
bool enforceUnique; /* complain if we find duplicate tuples */ bool enforceUnique; /* complain if we find duplicate tuples */
/* These are specific to the index_hash subcase: */ /* These are specific to the index_hash subcase: */
...@@ -395,7 +387,6 @@ struct Tuplesortstate ...@@ -395,7 +387,6 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
#define LACKMEM(state) ((state)->availMem < 0) #define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt)) #define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt))
...@@ -464,6 +455,7 @@ static void sort_bounded_heap(Tuplesortstate *state); ...@@ -464,6 +455,7 @@ static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex); int tupleindex, bool checkIndex);
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum); static void markrunend(Tuplesortstate *state, int tapenum);
static int comparetup_heap(const SortTuple *a, const SortTuple *b, static int comparetup_heap(const SortTuple *a, const SortTuple *b,
...@@ -473,7 +465,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum, ...@@ -473,7 +465,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
SortTuple *stup); SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup, static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len); int tapenum, unsigned int len);
static void reversedirection_heap(Tuplesortstate *state);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b, static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state); Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
...@@ -490,8 +481,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum, ...@@ -490,8 +481,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
SortTuple *stup); SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup, static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len); int tapenum, unsigned int len);
static void reversedirection_index_btree(Tuplesortstate *state);
static void reversedirection_index_hash(Tuplesortstate *state);
static int comparetup_datum(const SortTuple *a, const SortTuple *b, static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state); Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
...@@ -499,7 +488,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum, ...@@ -499,7 +488,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup); SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup, static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len); int tapenum, unsigned int len);
static void reversedirection_datum(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/* /*
...@@ -629,7 +617,6 @@ tuplesort_begin_heap(TupleDesc tupDesc, ...@@ -629,7 +617,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap; state->copytup = copytup_heap;
state->writetup = writetup_heap; state->writetup = writetup_heap;
state->readtup = readtup_heap; state->readtup = readtup_heap;
state->reversedirection = reversedirection_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
...@@ -665,7 +652,9 @@ tuplesort_begin_cluster(TupleDesc tupDesc, ...@@ -665,7 +652,9 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
int workMem, bool randomAccess) int workMem, bool randomAccess)
{ {
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext; MemoryContext oldcontext;
int i;
Assert(indexRel->rd_rel->relam == BTREE_AM_OID); Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
...@@ -691,13 +680,13 @@ tuplesort_begin_cluster(TupleDesc tupDesc, ...@@ -691,13 +680,13 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->copytup = copytup_cluster; state->copytup = copytup_cluster;
state->writetup = writetup_cluster; state->writetup = writetup_cluster;
state->readtup = readtup_cluster; state->readtup = readtup_cluster;
state->reversedirection = reversedirection_index_btree;
state->indexInfo = BuildIndexInfo(indexRel); state->indexInfo = BuildIndexInfo(indexRel);
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
indexScanKey = _bt_mkscankey_nodata(indexRel);
if (state->indexInfo->ii_Expressions != NULL) if (state->indexInfo->ii_Expressions != NULL)
{ {
TupleTableSlot *slot; TupleTableSlot *slot;
...@@ -715,6 +704,32 @@ tuplesort_begin_cluster(TupleDesc tupDesc, ...@@ -715,6 +704,32 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
econtext->ecxt_scantuple = slot; econtext->ecxt_scantuple = slot;
} }
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
sizeof(SortSupportData));
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
return state; return state;
...@@ -727,7 +742,9 @@ tuplesort_begin_index_btree(Relation heapRel, ...@@ -727,7 +742,9 @@ tuplesort_begin_index_btree(Relation heapRel,
int workMem, bool randomAccess) int workMem, bool randomAccess)
{ {
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext; MemoryContext oldcontext;
int i;
oldcontext = MemoryContextSwitchTo(state->sortcontext); oldcontext = MemoryContextSwitchTo(state->sortcontext);
...@@ -751,13 +768,40 @@ tuplesort_begin_index_btree(Relation heapRel, ...@@ -751,13 +768,40 @@ tuplesort_begin_index_btree(Relation heapRel,
state->copytup = copytup_index; state->copytup = copytup_index;
state->writetup = writetup_index; state->writetup = writetup_index;
state->readtup = readtup_index; state->readtup = readtup_index;
state->reversedirection = reversedirection_index_btree;
state->heapRel = heapRel; state->heapRel = heapRel;
state->indexRel = indexRel; state->indexRel = indexRel;
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->enforceUnique = enforceUnique; state->enforceUnique = enforceUnique;
indexScanKey = _bt_mkscankey_nodata(indexRel);
state->nKeys = RelationGetNumberOfAttributes(indexRel);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
sizeof(SortSupportData));
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
return state; return state;
...@@ -788,7 +832,6 @@ tuplesort_begin_index_hash(Relation heapRel, ...@@ -788,7 +832,6 @@ tuplesort_begin_index_hash(Relation heapRel,
state->copytup = copytup_index; state->copytup = copytup_index;
state->writetup = writetup_index; state->writetup = writetup_index;
state->readtup = readtup_index; state->readtup = readtup_index;
state->reversedirection = reversedirection_index_hash;
state->heapRel = heapRel; state->heapRel = heapRel;
state->indexRel = indexRel; state->indexRel = indexRel;
...@@ -831,7 +874,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, ...@@ -831,7 +874,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
state->copytup = copytup_datum; state->copytup = copytup_datum;
state->writetup = writetup_datum; state->writetup = writetup_datum;
state->readtup = readtup_datum; state->readtup = readtup_datum;
state->reversedirection = reversedirection_datum;
state->datumType = datumType; state->datumType = datumType;
...@@ -2599,7 +2641,7 @@ make_bounded_heap(Tuplesortstate *state) ...@@ -2599,7 +2641,7 @@ make_bounded_heap(Tuplesortstate *state)
Assert(tupcount >= state->bound); Assert(tupcount >= state->bound);
/* Reverse sort direction so largest entry will be at root */ /* Reverse sort direction so largest entry will be at root */
REVERSEDIRECTION(state); reversedirection(state);
state->memtupcount = 0; /* make the heap empty */ state->memtupcount = 0; /* make the heap empty */
for (i = 0; i < tupcount; i++) for (i = 0; i < tupcount; i++)
...@@ -2663,7 +2705,7 @@ sort_bounded_heap(Tuplesortstate *state) ...@@ -2663,7 +2705,7 @@ sort_bounded_heap(Tuplesortstate *state)
* Reverse sort direction back to the original state. This is not * Reverse sort direction back to the original state. This is not
* actually necessary but seems like a good idea for tidiness. * actually necessary but seems like a good idea for tidiness.
*/ */
REVERSEDIRECTION(state); reversedirection(state);
state->status = TSS_SORTEDINMEM; state->status = TSS_SORTEDINMEM;
state->boundUsed = true; state->boundUsed = true;
...@@ -2753,6 +2795,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) ...@@ -2753,6 +2795,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
memtuples[i] = *tuple; memtuples[i] = *tuple;
} }
/*
* Function to reverse the sort direction from its current state
*
* It is not safe to call this when performing hash tuplesorts
*/
static void
reversedirection(Tuplesortstate *state)
{
SortSupport sortKey = state->sortKeys;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
{
sortKey->ssup_reverse = !sortKey->ssup_reverse;
sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
}
}
/* /*
* Tape interface routines * Tape interface routines
...@@ -2780,73 +2840,6 @@ markrunend(Tuplesortstate *state, int tapenum) ...@@ -2780,73 +2840,6 @@ markrunend(Tuplesortstate *state, int tapenum)
} }
/*
* Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
*/
static inline Datum
myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
{
FunctionCallInfoData fcinfo;
Datum result;
InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
fcinfo.arg[0] = arg1;
fcinfo.arg[1] = arg2;
fcinfo.argnull[0] = false;
fcinfo.argnull[1] = false;
result = FunctionCallInvoke(&fcinfo);
/* Check for null result, since caller is clearly not expecting one */
if (fcinfo.isnull)
elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
return result;
}
/*
* Apply a sort function (by now converted to fmgr lookup form)
* and return a 3-way comparison result. This takes care of handling
* reverse-sort and NULLs-ordering properly. We assume that DESC and
* NULLS_FIRST options are encoded in sk_flags the same way btree does it.
*/
static inline int32
inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
Datum datum1, bool isNull1,
Datum datum2, bool isNull2)
{
int32 compare;
if (isNull1)
{
if (isNull2)
compare = 0; /* NULL "=" NULL */
else if (sk_flags & SK_BT_NULLS_FIRST)
compare = -1; /* NULL "<" NOT_NULL */
else
compare = 1; /* NULL ">" NOT_NULL */
}
else if (isNull2)
{
if (sk_flags & SK_BT_NULLS_FIRST)
compare = 1; /* NOT_NULL ">" NULL */
else
compare = -1; /* NOT_NULL "<" NULL */
}
else
{
compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
datum1, datum2));
if (sk_flags & SK_BT_DESC)
compare = -compare;
}
return compare;
}
/* /*
* Routines specialized for HeapTuple (actually MinimalTuple) case * Routines specialized for HeapTuple (actually MinimalTuple) case
*/ */
...@@ -2972,20 +2965,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, ...@@ -2972,20 +2965,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1); &stup->isnull1);
} }
static void
reversedirection_heap(Tuplesortstate *state)
{
SortSupport sortKey = state->sortKeys;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
{
sortKey->ssup_reverse = !sortKey->ssup_reverse;
sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
}
}
/* /*
* Routines specialized for the CLUSTER case (HeapTuple data, with * Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition) * comparisons per a btree index definition)
...@@ -2995,7 +2974,7 @@ static int ...@@ -2995,7 +2974,7 @@ static int
comparetup_cluster(const SortTuple *a, const SortTuple *b, comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state) Tuplesortstate *state)
{ {
ScanKey scanKey = state->indexScanKey; SortSupport sortKey = state->sortKeys;
HeapTuple ltup; HeapTuple ltup;
HeapTuple rtup; HeapTuple rtup;
TupleDesc tupDesc; TupleDesc tupDesc;
...@@ -3005,14 +2984,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, ...@@ -3005,14 +2984,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
/* Compare the leading sort key, if it's simple */ /* Compare the leading sort key, if it's simple */
if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
{ {
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, compare = ApplySortComparator(a->datum1, a->isnull1,
scanKey->sk_collation, b->datum1, b->isnull1,
a->datum1, a->isnull1, sortKey);
b->datum1, b->isnull1);
if (compare != 0 || state->nKeys == 1) if (compare != 0 || state->nKeys == 1)
return compare; return compare;
/* Compare additional columns the hard way */ /* Compare additional columns the hard way */
scanKey++; sortKey++;
nkey = 1; nkey = 1;
} }
else else
...@@ -3030,7 +3008,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, ...@@ -3030,7 +3008,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
/* If not expression index, just compare the proper heap attrs */ /* If not expression index, just compare the proper heap attrs */
tupDesc = state->tupDesc; tupDesc = state->tupDesc;
for (; nkey < state->nKeys; nkey++, scanKey++) for (; nkey < state->nKeys; nkey++, sortKey++)
{ {
AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey]; AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
Datum datum1, Datum datum1,
...@@ -3041,11 +3019,9 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, ...@@ -3041,11 +3019,9 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
compare = inlineApplySortFunction(&scanKey->sk_func, compare = ApplySortComparator(datum1, isnull1,
scanKey->sk_flags, datum2, isnull2,
scanKey->sk_collation, sortKey);
datum1, isnull1,
datum2, isnull2);
if (compare != 0) if (compare != 0)
return compare; return compare;
} }
...@@ -3077,15 +3053,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, ...@@ -3077,15 +3053,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
r_index_values, r_index_isnull); r_index_values, r_index_isnull);
for (; nkey < state->nKeys; nkey++, scanKey++) for (; nkey < state->nKeys; nkey++, sortKey++)
{ {
compare = inlineApplySortFunction(&scanKey->sk_func, compare = ApplySortComparator(l_index_values[nkey],
scanKey->sk_flags,
scanKey->sk_collation,
l_index_values[nkey],
l_index_isnull[nkey], l_index_isnull[nkey],
r_index_values[nkey], r_index_values[nkey],
r_index_isnull[nkey]); r_index_isnull[nkey],
sortKey);
if (compare != 0) if (compare != 0)
return compare; return compare;
} }
...@@ -3180,7 +3154,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, ...@@ -3180,7 +3154,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
* is also special handling for enforcing uniqueness, and special treatment * is also special handling for enforcing uniqueness, and special treatment
* for equal keys at the end. * for equal keys at the end.
*/ */
ScanKey scanKey = state->indexScanKey; SortSupport sortKey = state->sortKeys;
IndexTuple tuple1; IndexTuple tuple1;
IndexTuple tuple2; IndexTuple tuple2;
int keysz; int keysz;
...@@ -3190,10 +3164,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, ...@@ -3190,10 +3164,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
int32 compare; int32 compare;
/* Compare the leading sort key */ /* Compare the leading sort key */
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, compare = ApplySortComparator(a->datum1, a->isnull1,
scanKey->sk_collation, b->datum1, b->isnull1,
a->datum1, a->isnull1, sortKey);
b->datum1, b->isnull1);
if (compare != 0) if (compare != 0)
return compare; return compare;
...@@ -3206,8 +3179,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, ...@@ -3206,8 +3179,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
tuple2 = (IndexTuple) b->tuple; tuple2 = (IndexTuple) b->tuple;
keysz = state->nKeys; keysz = state->nKeys;
tupDes = RelationGetDescr(state->indexRel); tupDes = RelationGetDescr(state->indexRel);
scanKey++; sortKey++;
for (nkey = 2; nkey <= keysz; nkey++, scanKey++) for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
{ {
Datum datum1, Datum datum1,
datum2; datum2;
...@@ -3217,10 +3190,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, ...@@ -3217,10 +3190,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, compare = ApplySortComparator(datum1, isnull1,
scanKey->sk_collation, datum2, isnull2,
datum1, isnull1, sortKey);
datum2, isnull2);
if (compare != 0) if (compare != 0)
return compare; /* done when we find unequal attributes */ return compare; /* done when we find unequal attributes */
...@@ -3394,26 +3366,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, ...@@ -3394,26 +3366,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1); &stup->isnull1);
} }
static void
reversedirection_index_btree(Tuplesortstate *state)
{
ScanKey scanKey = state->indexScanKey;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
{
scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
}
}
static void
reversedirection_index_hash(Tuplesortstate *state)
{
/* We don't support reversing direction in a hash index sort */
elog(ERROR, "reversedirection_index_hash is not implemented");
}
/* /*
* Routines specialized for DatumTuple case * Routines specialized for DatumTuple case
*/ */
...@@ -3512,13 +3464,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, ...@@ -3512,13 +3464,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
&tuplen, sizeof(tuplen)); &tuplen, sizeof(tuplen));
} }
static void
reversedirection_datum(Tuplesortstate *state)
{
state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
}
/* /*
* Convenience routine to free a tuple previously loaded into sort memory * Convenience routine to free a tuple previously loaded into sort memory
*/ */
......
...@@ -48,6 +48,7 @@ ...@@ -48,6 +48,7 @@
#define SORTSUPPORT_H #define SORTSUPPORT_H
#include "access/attnum.h" #include "access/attnum.h"
#include "utils/relcache.h"
typedef struct SortSupportData *SortSupport; typedef struct SortSupportData *SortSupport;
...@@ -152,5 +153,7 @@ ApplySortComparator(Datum datum1, bool isNull1, ...@@ -152,5 +153,7 @@ ApplySortComparator(Datum datum1, bool isNull1,
/* Other functions in utils/sort/sortsupport.c */ /* Other functions in utils/sort/sortsupport.c */
extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup); extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup); extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
SortSupport ssup);
#endif /* SORTSUPPORT_H */ #endif /* SORTSUPPORT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment