Commit 174b552e authored by Bruce Momjian's avatar Bruce Momjian

There are some bugs about backward scanning using

indexes.

1. Index Scan using plural indexids never scan backward
   as to the order of indexids.
2. The cursor using Index scan is not usable after moving
   past the end.

This patch solves above bugs.
Moreover the change of _bt_first() would be useful to extend
ORDER BY patch by Jan Wieck for all descending order cases.

Hiroshi Inoue
parent df6e5044
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.42 1999/03/28 20:31:58 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.43 1999/04/13 17:18:28 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -733,7 +733,8 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) ...@@ -733,7 +733,8 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
return res; return res;
} }
} while (keysok >= so->numberOfFirstKeys); } while (keysok >= so->numberOfFirstKeys ||
(keysok == -1 && ScanDirectionIsBackward(dir)));
ItemPointerSetInvalid(current); ItemPointerSetInvalid(current);
so->btso_curbuf = InvalidBuffer; so->btso_curbuf = InvalidBuffer;
...@@ -775,6 +776,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -775,6 +776,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
BTScanOpaque so; BTScanOpaque so;
ScanKeyData skdata; ScanKeyData skdata;
Size keysok; Size keysok;
int i;
int nKeyIndex = -1;
rel = scan->relation; rel = scan->relation;
so = (BTScanOpaque) scan->opaque; so = (BTScanOpaque) scan->opaque;
...@@ -790,11 +793,34 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -790,11 +793,34 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
{ {
_bt_orderkeys(rel, so); _bt_orderkeys(rel, so);
if (ScanDirectionIsBackward(dir))
{
for (i=0; i<so->numberOfKeys; i++)
{
if (so->keyData[i].sk_attno != 1)
break;
strat = _bt_getstrat(rel, so->keyData[i].sk_attno,
so->keyData[i].sk_procedure);
if (strat == BTLessStrategyNumber ||
strat == BTLessEqualStrategyNumber||
strat == BTEqualStrategyNumber)
{
nKeyIndex = i;
break;
}
}
}
else
{
strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure); strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
/* NOTE: it assumes ForwardScanDirection */
if (strat == BTLessStrategyNumber || if (strat == BTLessStrategyNumber ||
strat == BTLessEqualStrategyNumber) strat == BTLessEqualStrategyNumber)
;
else
nKeyIndex = 0;
}
if (nKeyIndex < 0)
scan->scanFromEnd = true; scan->scanFromEnd = true;
} }
else else
...@@ -823,8 +849,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -823,8 +849,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
return (RetrieveIndexResult) NULL; return (RetrieveIndexResult) NULL;
} }
proc = index_getprocid(rel, 1, BTORDER_PROC); proc = index_getprocid(rel, 1, BTORDER_PROC);
ScanKeyEntryInitialize(&skdata, so->keyData[0].sk_flags, 1, proc, ScanKeyEntryInitialize(&skdata, so->keyData[nKeyIndex].sk_flags,
so->keyData[0].sk_argument); 1, proc, so->keyData[nKeyIndex].sk_argument);
stack = _bt_search(rel, 1, &skdata, &buf); stack = _bt_search(rel, 1, &skdata, &buf);
_bt_freestack(stack); _bt_freestack(stack);
...@@ -897,7 +923,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -897,7 +923,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
/* it's yet other place to add some code latter for is(not)null */ /* it's yet other place to add some code latter for is(not)null */
strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure); strat = _bt_getstrat(rel, 1, so->keyData[nKeyIndex].sk_procedure);
switch (strat) switch (strat)
{ {
...@@ -914,9 +940,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -914,9 +940,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum); result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
} while (result <= 0); } while (result <= 0);
/* if this is true, the key we just looked at is gone */
if (result > 0)
_bt_twostep(scan, &buf, ForwardScanDirection);
} }
break; break;
...@@ -946,6 +969,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -946,6 +969,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
ItemPointerSetInvalid(&(scan->currentItemData)); ItemPointerSetInvalid(&(scan->currentItemData));
return (RetrieveIndexResult) NULL; return (RetrieveIndexResult) NULL;
} }
else if (ScanDirectionIsBackward(dir))
{
do
{
if (!_bt_twostep(scan, &buf, ForwardScanDirection))
break;
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(buf);
result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
} while (result == 0);
if (result < 0)
_bt_twostep(scan, &buf, BackwardScanDirection);
}
break; break;
case BTGreaterEqualStrategyNumber: case BTGreaterEqualStrategyNumber:
...@@ -1026,6 +1064,11 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -1026,6 +1064,11 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
so->btso_curbuf = buf; so->btso_curbuf = buf;
return _bt_next(scan, dir); return _bt_next(scan, dir);
} }
else if (keysok == -1 && ScanDirectionIsBackward(dir))
{
so->btso_curbuf = buf;
return _bt_next(scan, dir);
}
else else
{ {
ItemPointerSetInvalid(current); ItemPointerSetInvalid(current);
...@@ -1495,6 +1538,11 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) ...@@ -1495,6 +1538,11 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
so->btso_curbuf = buf; so->btso_curbuf = buf;
return _bt_next(scan, dir); return _bt_next(scan, dir);
} }
else if (keysok == -1 && ScanDirectionIsBackward(dir))
{
so->btso_curbuf = buf;
return _bt_next(scan, dir);
}
else else
{ {
ItemPointerSetInvalid(current); ItemPointerSetInvalid(current);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.25 1999/02/13 23:14:37 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.26 1999/04/13 17:18:29 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -367,8 +367,14 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok) ...@@ -367,8 +367,14 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok)
&isNull); &isNull);
/* btree doesn't support 'A is null' clauses, yet */ /* btree doesn't support 'A is null' clauses, yet */
if (isNull || key[0].sk_flags & SK_ISNULL) if (key[0].sk_flags & SK_ISNULL)
return false; return false;
if (isNull)
{
if (*keysok < so->numberOfFirstKeys)
*keysok = -1;
return false;
}
if (key[0].sk_flags & SK_COMMUTE) if (key[0].sk_flags & SK_COMMUTE)
{ {
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.33 1999/02/21 03:48:40 scrappy Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.34 1999/04/13 17:18:29 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -96,6 +96,8 @@ IndexNext(IndexScan *node) ...@@ -96,6 +96,8 @@ IndexNext(IndexScan *node)
Buffer buffer = InvalidBuffer; Buffer buffer = InvalidBuffer;
int numIndices; int numIndices;
bool bBackward;
int indexNumber;
/* ---------------- /* ----------------
* extract necessary information from index scan node * extract necessary information from index scan node
* ---------------- * ----------------
...@@ -151,7 +153,25 @@ IndexNext(IndexScan *node) ...@@ -151,7 +153,25 @@ IndexNext(IndexScan *node)
* appropriate heap tuple.. else return NULL. * appropriate heap tuple.. else return NULL.
* ---------------- * ----------------
*/ */
while (indexstate->iss_IndexPtr < numIndices) bBackward = ScanDirectionIsBackward(direction);
if (bBackward)
{
indexNumber = numIndices - indexstate->iss_IndexPtr - 1;
if (indexNumber < 0)
{
indexNumber = 0;
indexstate->iss_IndexPtr = numIndices - 1;
}
}
else
{
if ((indexNumber = indexstate->iss_IndexPtr) < 0)
{
indexNumber = 0;
indexstate->iss_IndexPtr = 0;
}
}
while (indexNumber < numIndices)
{ {
scandesc = scanDescs[indexstate->iss_IndexPtr]; scandesc = scanDescs[indexstate->iss_IndexPtr];
while ((result = index_getnext(scandesc, direction)) != NULL) while ((result = index_getnext(scandesc, direction)) != NULL)
...@@ -204,9 +224,15 @@ IndexNext(IndexScan *node) ...@@ -204,9 +224,15 @@ IndexNext(IndexScan *node)
if (BufferIsValid(buffer)) if (BufferIsValid(buffer))
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
} }
if (indexstate->iss_IndexPtr < numIndices) if (indexNumber < numIndices)
{
indexNumber++;
if (bBackward)
indexstate->iss_IndexPtr--;
else
indexstate->iss_IndexPtr++; indexstate->iss_IndexPtr++;
} }
}
/* ---------------- /* ----------------
* if we get here it means the index scan failed so we * if we get here it means the index scan failed so we
* are at the end of the scan.. * are at the end of the scan..
...@@ -294,7 +320,7 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent) ...@@ -294,7 +320,7 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
runtimeKeyInfo = (Pointer *) indexstate->iss_RuntimeKeyInfo; runtimeKeyInfo = (Pointer *) indexstate->iss_RuntimeKeyInfo;
indxqual = node->indxqual; indxqual = node->indxqual;
numScanKeys = indexstate->iss_NumScanKeys; numScanKeys = indexstate->iss_NumScanKeys;
indexstate->iss_IndexPtr = 0; indexstate->iss_IndexPtr = -1;
/* If this is re-scanning of PlanQual ... */ /* If this is re-scanning of PlanQual ... */
if (estate->es_evTuple != NULL && if (estate->es_evTuple != NULL &&
...@@ -611,7 +637,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent) ...@@ -611,7 +637,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
*/ */
indexstate = makeNode(IndexScanState); indexstate = makeNode(IndexScanState);
indexstate->iss_NumIndices = 0; indexstate->iss_NumIndices = 0;
indexstate->iss_IndexPtr = 0; indexstate->iss_IndexPtr = -1;
indexstate->iss_ScanKeys = NULL; indexstate->iss_ScanKeys = NULL;
indexstate->iss_NumScanKeys = NULL; indexstate->iss_NumScanKeys = NULL;
indexstate->iss_RuntimeKeyInfo = NULL; indexstate->iss_RuntimeKeyInfo = NULL;
...@@ -635,7 +661,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent) ...@@ -635,7 +661,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
indxid = node->indxid; indxid = node->indxid;
indxqual = node->indxqual; indxqual = node->indxqual;
numIndices = length(indxid); numIndices = length(indxid);
indexPtr = 0; indexPtr = -1;
CXT1_printf("ExecInitIndexScan: context is %d\n", CurrentMemoryContext); CXT1_printf("ExecInitIndexScan: context is %d\n", CurrentMemoryContext);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment