Commit 03e56f79 authored by Tom Lane's avatar Tom Lane

Restructure SPGiST opclass interface API to support whole-index scans.

The original API definition was incapable of supporting whole-index scans
because there was no way to invoke leaf-value reconstruction without
checking any qual conditions.  Also, it was inefficient for
multiple-qual-condition scans because value reconstruction got done over
again for each qual condition, and because other internal work in the
consistent functions likewise had to be done for each qual.  To fix these
issues, pass the whole scankey array to the opclass consistent functions,
instead of only letting them see one item at a time.  (Essentially, the
loop over scankey entries is now inside the consistent functions not
outside them.  This makes the consistent functions a bit more complicated,
but not unreasonably so.)

In itself this commit does nothing except save a few cycles in
multiple-qual-condition index scans, since we can't support whole-index
scans on SPGiST indexes until nulls are included in the index.  However,
I consider this a must-fix for 9.2 because once we release it will get
very much harder to change the opclass API definition.
parent 39d74e34
...@@ -439,8 +439,8 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ... ...@@ -439,8 +439,8 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ...
<programlisting> <programlisting>
typedef struct spgInnerConsistentIn typedef struct spgInnerConsistentIn
{ {
StrategyNumber strategy; /* operator strategy number */ ScanKey scankeys; /* array of operators and comparison values */
Datum query; /* operator's RHS value */ int nkeys; /* length of array */
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
...@@ -463,8 +463,17 @@ typedef struct spgInnerConsistentOut ...@@ -463,8 +463,17 @@ typedef struct spgInnerConsistentOut
} spgInnerConsistentOut; } spgInnerConsistentOut;
</programlisting> </programlisting>
<structfield>strategy</> and The array <structfield>scankeys</>, of length <structfield>nkeys</>,
<structfield>query</> describe the index search condition. describes the index search condition(s). These conditions are
combined with AND &mdash; only index entries that satisfy all of
them are interesting. (Note that <structfield>nkeys</> = 0 implies
that all index entries satisfy the query.) Usually the consistent
function only cares about the <structfield>sk_strategy</> and
<structfield>sk_argument</> fields of each array entry, which
respectively give the indexable operator and comparison value.
In particular it is not necessary to check <structfield>sk_flags</> to
see if the comparison value is NULL, because the SP-GiST core code
will filter out such conditions.
<structfield>reconstructedValue</> is the value reconstructed for the <structfield>reconstructedValue</> is the value reconstructed for the
parent tuple; it is <literal>(Datum) 0</> at the root level or if the parent tuple; it is <literal>(Datum) 0</> at the root level or if the
<function>inner_consistent</> function did not provide a value at the <function>inner_consistent</> function did not provide a value at the
...@@ -527,8 +536,8 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ... ...@@ -527,8 +536,8 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ...
<programlisting> <programlisting>
typedef struct spgLeafConsistentIn typedef struct spgLeafConsistentIn
{ {
StrategyNumber strategy; /* operator strategy number */ ScanKey scankeys; /* array of operators and comparison values */
Datum query; /* operator's RHS value */ int nkeys; /* length of array */
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
...@@ -544,8 +553,17 @@ typedef struct spgLeafConsistentOut ...@@ -544,8 +553,17 @@ typedef struct spgLeafConsistentOut
} spgLeafConsistentOut; } spgLeafConsistentOut;
</programlisting> </programlisting>
<structfield>strategy</> and The array <structfield>scankeys</>, of length <structfield>nkeys</>,
<structfield>query</> define the index search condition. describes the index search condition(s). These conditions are
combined with AND &mdash; only index entries that satisfy all of
them satisfy the query. (Note that <structfield>nkeys</> = 0 implies
that all index entries satisfy the query.) Usually the consistent
function only cares about the <structfield>sk_strategy</> and
<structfield>sk_argument</> fields of each array entry, which
respectively give the indexable operator and comparison value.
In particular it is not necessary to check <structfield>sk_flags</> to
see if the comparison value is NULL, because the SP-GiST core code
will filter out such conditions.
<structfield>reconstructedValue</> is the value reconstructed for the <structfield>reconstructedValue</> is the value reconstructed for the
parent tuple; it is <literal>(Datum) 0</> at the root level or if the parent tuple; it is <literal>(Datum) 0</> at the root level or if the
<function>inner_consistent</> function did not provide a value at the <function>inner_consistent</> function did not provide a value at the
...@@ -566,8 +584,8 @@ typedef struct spgLeafConsistentOut ...@@ -566,8 +584,8 @@ typedef struct spgLeafConsistentOut
<structfield>leafValue</> must be set to the value originally supplied <structfield>leafValue</> must be set to the value originally supplied
to be indexed for this leaf tuple. Also, to be indexed for this leaf tuple. Also,
<structfield>recheck</> may be set to <literal>true</> if the match <structfield>recheck</> may be set to <literal>true</> if the match
is uncertain and so the operator must be re-applied to the actual heap is uncertain and so the operator(s) must be re-applied to the actual
tuple to verify the match. heap tuple to verify the match.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -159,11 +159,10 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS) ...@@ -159,11 +159,10 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
{ {
spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
Point *query;
BOX *boxQuery;
double coord; double coord;
int which;
int i;
query = DatumGetPointP(in->query);
Assert(in->hasPrefix); Assert(in->hasPrefix);
coord = DatumGetFloat8(in->prefixDatum); coord = DatumGetFloat8(in->prefixDatum);
...@@ -171,124 +170,97 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS) ...@@ -171,124 +170,97 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
elog(ERROR, "allTheSame should not occur for k-d trees"); elog(ERROR, "allTheSame should not occur for k-d trees");
Assert(in->nNodes == 2); Assert(in->nNodes == 2);
out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
out->levelAdds = (int *) palloc(sizeof(int) * 2);
out->levelAdds[0] = 1;
out->levelAdds[1] = 1;
out->nNodes = 0;
switch (in->strategy) /* "which" is a bitmask of children that satisfy all constraints */
which = (1 << 1) | (1 << 2);
for (i = 0; i < in->nkeys; i++)
{ {
case RTLeftStrategyNumber: Point *query = DatumGetPointP(in->scankeys[i].sk_argument);
out->nNodes = 1; BOX *boxQuery;
out->nodeNumbers[0] = 0;
switch (in->scankeys[i].sk_strategy)
if ((in->level % 2) == 0 || FPge(query->x, coord)) {
{ case RTLeftStrategyNumber:
out->nodeNumbers[1] = 1; if ((in->level % 2) != 0 && FPlt(query->x, coord))
out->nNodes++; which &= (1 << 1);
} break;
break; case RTRightStrategyNumber:
case RTRightStrategyNumber: if ((in->level % 2) != 0 && FPgt(query->x, coord))
out->nNodes = 1; which &= (1 << 2);
out->nodeNumbers[0] = 1; break;
case RTSameStrategyNumber:
if ((in->level % 2) == 0 || FPle(query->x, coord)) if ((in->level % 2) != 0)
{
out->nodeNumbers[1] = 0;
out->nNodes++;
}
break;
case RTSameStrategyNumber:
if (in->level % 2)
{
if (FPle(query->x, coord))
{
out->nodeNumbers[out->nNodes] = 0;
out->nNodes++;
}
if (FPge(query->x, coord))
{
out->nodeNumbers[out->nNodes] = 1;
out->nNodes++;
}
}
else
{
if (FPle(query->y, coord))
{ {
out->nodeNumbers[out->nNodes] = 0; if (FPlt(query->x, coord))
out->nNodes++; which &= (1 << 1);
else if (FPgt(query->x, coord))
which &= (1 << 2);
} }
if (FPge(query->y, coord)) else
{ {
out->nodeNumbers[out->nNodes] = 1; if (FPlt(query->y, coord))
out->nNodes++; which &= (1 << 1);
else if (FPgt(query->y, coord))
which &= (1 << 2);
} }
} break;
break; case RTBelowStrategyNumber:
case RTBelowStrategyNumber: if ((in->level % 2) == 0 && FPlt(query->y, coord))
out->nNodes = 1; which &= (1 << 1);
out->nodeNumbers[0] = 0; break;
case RTAboveStrategyNumber:
if ((in->level % 2) == 1 || FPge(query->y, coord)) if ((in->level % 2) == 0 && FPgt(query->y, coord))
{ which &= (1 << 2);
out->nodeNumbers[1] = 1; break;
out->nNodes++; case RTContainedByStrategyNumber:
}
break; /*
case RTAboveStrategyNumber: * For this operator, the query is a box not a point. We
out->nNodes = 1; * cheat to the extent of assuming that DatumGetPointP won't
out->nodeNumbers[0] = 1; * do anything that would be bad for a pointer-to-box.
*/
if ((in->level % 2) == 1 || FPle(query->y, coord)) boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument);
{
out->nodeNumbers[1] = 0; if ((in->level % 2) != 0)
out->nNodes++;
}
break;
case RTContainedByStrategyNumber:
/*
* For this operator, the query is a box not a point. We cheat to
* the extent of assuming that DatumGetPointP won't do anything
* that would be bad for a pointer-to-box.
*/
boxQuery = DatumGetBoxP(in->query);
out->nNodes = 1;
if (in->level % 2)
{
if (FPlt(boxQuery->high.x, coord))
out->nodeNumbers[0] = 0;
else if (FPgt(boxQuery->low.x, coord))
out->nodeNumbers[0] = 1;
else
{ {
out->nodeNumbers[0] = 0; if (FPlt(boxQuery->high.x, coord))
out->nodeNumbers[1] = 1; which &= (1 << 1);
out->nNodes = 2; else if (FPgt(boxQuery->low.x, coord))
which &= (1 << 2);
} }
}
else
{
if (FPlt(boxQuery->high.y, coord))
out->nodeNumbers[0] = 0;
else if (FPgt(boxQuery->low.y, coord))
out->nodeNumbers[0] = 1;
else else
{ {
out->nodeNumbers[0] = 0; if (FPlt(boxQuery->high.y, coord))
out->nodeNumbers[1] = 1; which &= (1 << 1);
out->nNodes = 2; else if (FPgt(boxQuery->low.y, coord))
which &= (1 << 2);
} }
} break;
break; default:
default: elog(ERROR, "unrecognized strategy number: %d",
elog(ERROR, "unrecognized strategy number: %d", in->strategy); in->scankeys[i].sk_strategy);
break; break;
}
if (which == 0)
break; /* no need to consider remaining conditions */
} }
/* We must descend into the children identified by which */
out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
out->nNodes = 0;
for (i = 1; i <= 2; i++)
{
if (which & (1 << i))
out->nodeNumbers[out->nNodes++] = i - 1;
}
/* Set up level increments, too */
out->levelAdds = (int *) palloc(sizeof(int) * 2);
out->levelAdds[0] = 1;
out->levelAdds[1] = 1;
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
......
...@@ -190,45 +190,21 @@ spg_quad_picksplit(PG_FUNCTION_ARGS) ...@@ -190,45 +190,21 @@ spg_quad_picksplit(PG_FUNCTION_ARGS)
} }
/* Subroutine to fill out->nodeNumbers[] for spg_quad_inner_consistent */
static void
setNodes(spgInnerConsistentOut *out, bool isAll, int first, int second)
{
if (isAll)
{
out->nNodes = 4;
out->nodeNumbers[0] = 0;
out->nodeNumbers[1] = 1;
out->nodeNumbers[2] = 2;
out->nodeNumbers[3] = 3;
}
else
{
out->nNodes = 2;
out->nodeNumbers[0] = first - 1;
out->nodeNumbers[1] = second - 1;
}
}
Datum Datum
spg_quad_inner_consistent(PG_FUNCTION_ARGS) spg_quad_inner_consistent(PG_FUNCTION_ARGS)
{ {
spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
Point *query, Point *centroid;
*centroid; int which;
BOX *boxQuery; int i;
query = DatumGetPointP(in->query);
Assert(in->hasPrefix); Assert(in->hasPrefix);
centroid = DatumGetPointP(in->prefixDatum); centroid = DatumGetPointP(in->prefixDatum);
if (in->allTheSame) if (in->allTheSame)
{ {
/* Report that all nodes should be visited */ /* Report that all nodes should be visited */
int i;
out->nNodes = in->nNodes; out->nNodes = in->nNodes;
out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
for (i = 0; i < in->nNodes; i++) for (i = 0; i < in->nNodes; i++)
...@@ -237,76 +213,86 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS) ...@@ -237,76 +213,86 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
} }
Assert(in->nNodes == 4); Assert(in->nNodes == 4);
out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
switch (in->strategy) /* "which" is a bitmask of quadrants that satisfy all constraints */
which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
for (i = 0; i < in->nkeys; i++)
{ {
case RTLeftStrategyNumber: Point *query = DatumGetPointP(in->scankeys[i].sk_argument);
setNodes(out, SPTEST(point_left, centroid, query), 3, 4); BOX *boxQuery;
break;
case RTRightStrategyNumber: switch (in->scankeys[i].sk_strategy)
setNodes(out, SPTEST(point_right, centroid, query), 1, 2); {
break; case RTLeftStrategyNumber:
case RTSameStrategyNumber: if (SPTEST(point_right, centroid, query))
out->nNodes = 1; which &= (1 << 3) | (1 << 4);
out->nodeNumbers[0] = getQuadrant(centroid, query) - 1; break;
break; case RTRightStrategyNumber:
case RTBelowStrategyNumber: if (SPTEST(point_left, centroid, query))
setNodes(out, SPTEST(point_below, centroid, query), 2, 3); which &= (1 << 1) | (1 << 2);
break; break;
case RTAboveStrategyNumber: case RTSameStrategyNumber:
setNodes(out, SPTEST(point_above, centroid, query), 1, 4); which &= (1 << getQuadrant(centroid, query));
break; break;
case RTContainedByStrategyNumber: case RTBelowStrategyNumber:
if (SPTEST(point_above, centroid, query))
/* which &= (1 << 2) | (1 << 3);
* For this operator, the query is a box not a point. We cheat to break;
* the extent of assuming that DatumGetPointP won't do anything case RTAboveStrategyNumber:
* that would be bad for a pointer-to-box. if (SPTEST(point_below, centroid, query))
*/ which &= (1 << 1) | (1 << 4);
boxQuery = DatumGetBoxP(in->query); break;
case RTContainedByStrategyNumber:
if (DatumGetBool(DirectFunctionCall2(box_contain_pt,
PointerGetDatum(boxQuery), /*
PointerGetDatum(centroid)))) * For this operator, the query is a box not a point. We
{ * cheat to the extent of assuming that DatumGetPointP won't
/* centroid is in box, so descend to all quadrants */ * do anything that would be bad for a pointer-to-box.
setNodes(out, true, 0, 0); */
} boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument);
else
{ if (DatumGetBool(DirectFunctionCall2(box_contain_pt,
/* identify quadrant(s) containing all corners of box */ PointerGetDatum(boxQuery),
Point p; PointerGetDatum(centroid))))
int i,
r = 0;
p = boxQuery->low;
r |= 1 << (getQuadrant(centroid, &p) - 1);
p.y = boxQuery->high.y;
r |= 1 << (getQuadrant(centroid, &p) - 1);
p = boxQuery->high;
r |= 1 << (getQuadrant(centroid, &p) - 1);
p.x = boxQuery->low.x;
r |= 1 << (getQuadrant(centroid, &p) - 1);
/* we must descend into those quadrant(s) */
out->nNodes = 0;
for (i = 0; i < 4; i++)
{ {
if (r & (1 << i)) /* centroid is in box, so all quadrants are OK */
{
out->nodeNumbers[out->nNodes] = i;
out->nNodes++;
}
} }
} else
break; {
default: /* identify quadrant(s) containing all corners of box */
elog(ERROR, "unrecognized strategy number: %d", in->strategy); Point p;
break; int r = 0;
p = boxQuery->low;
r |= 1 << getQuadrant(centroid, &p);
p.y = boxQuery->high.y;
r |= 1 << getQuadrant(centroid, &p);
p = boxQuery->high;
r |= 1 << getQuadrant(centroid, &p);
p.x = boxQuery->low.x;
r |= 1 << getQuadrant(centroid, &p);
which &= r;
}
break;
default:
elog(ERROR, "unrecognized strategy number: %d",
in->scankeys[i].sk_strategy);
break;
}
if (which == 0)
break; /* no need to consider remaining conditions */
}
/* We must descend into the quadrant(s) identified by which */
out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
out->nNodes = 0;
for (i = 1; i <= 4; i++)
{
if (which & (1 << i))
out->nodeNumbers[out->nNodes++] = i - 1;
} }
PG_RETURN_VOID(); PG_RETURN_VOID();
...@@ -318,9 +304,9 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS) ...@@ -318,9 +304,9 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
{ {
spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0); spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1); spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
Point *query = DatumGetPointP(in->query);
Point *datum = DatumGetPointP(in->leafDatum); Point *datum = DatumGetPointP(in->leafDatum);
bool res; bool res;
int i;
/* all tests are exact */ /* all tests are exact */
out->recheck = false; out->recheck = false;
...@@ -328,35 +314,45 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS) ...@@ -328,35 +314,45 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
/* leafDatum is what it is... */ /* leafDatum is what it is... */
out->leafValue = in->leafDatum; out->leafValue = in->leafDatum;
switch (in->strategy) /* Perform the required comparison(s) */
res = true;
for (i = 0; i < in->nkeys; i++)
{ {
case RTLeftStrategyNumber: Point *query = DatumGetPointP(in->scankeys[i].sk_argument);
res = SPTEST(point_left, datum, query);
break; switch (in->scankeys[i].sk_strategy)
case RTRightStrategyNumber: {
res = SPTEST(point_right, datum, query); case RTLeftStrategyNumber:
break; res = SPTEST(point_left, datum, query);
case RTSameStrategyNumber: break;
res = SPTEST(point_eq, datum, query); case RTRightStrategyNumber:
break; res = SPTEST(point_right, datum, query);
case RTBelowStrategyNumber: break;
res = SPTEST(point_below, datum, query); case RTSameStrategyNumber:
break; res = SPTEST(point_eq, datum, query);
case RTAboveStrategyNumber: break;
res = SPTEST(point_above, datum, query); case RTBelowStrategyNumber:
break; res = SPTEST(point_below, datum, query);
case RTContainedByStrategyNumber: break;
case RTAboveStrategyNumber:
/* res = SPTEST(point_above, datum, query);
* For this operator, the query is a box not a point. We cheat to break;
* the extent of assuming that DatumGetPointP won't do anything case RTContainedByStrategyNumber:
* that would be bad for a pointer-to-box.
*/ /*
res = SPTEST(box_contain_pt, query, datum); * For this operator, the query is a box not a point. We
break; * cheat to the extent of assuming that DatumGetPointP won't
default: * do anything that would be bad for a pointer-to-box.
elog(ERROR, "unrecognized strategy number: %d", in->strategy); */
res = false; res = SPTEST(box_contain_pt, query, datum);
break;
default:
elog(ERROR, "unrecognized strategy number: %d",
in->scankeys[i].sk_strategy);
break;
}
if (!res)
break; break;
} }
......
This diff is collapsed.
...@@ -362,25 +362,12 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS) ...@@ -362,25 +362,12 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS)
{ {
spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
StrategyNumber strategy = in->strategy; bool collate_is_c = lc_collate_is_c(PG_GET_COLLATION());
text *inText;
int inSize;
int i;
text *reconstrText = NULL; text *reconstrText = NULL;
int maxReconstrLen = 0; int maxReconstrLen = 0;
text *prefixText = NULL; text *prefixText = NULL;
int prefixSize = 0; int prefixSize = 0;
int i;
/*
* If it's a collation-aware operator, but the collation is C, we can
* treat it as non-collation-aware.
*/
if (strategy > 10 &&
lc_collate_is_c(PG_GET_COLLATION()))
strategy -= 10;
inText = DatumGetTextPP(in->query);
inSize = VARSIZE_ANY_EXHDR(inText);
/* /*
* Reconstruct values represented at this tuple, including parent data, * Reconstruct values represented at this tuple, including parent data,
...@@ -431,8 +418,8 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS) ...@@ -431,8 +418,8 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS)
{ {
uint8 nodeChar = DatumGetUInt8(in->nodeLabels[i]); uint8 nodeChar = DatumGetUInt8(in->nodeLabels[i]);
int thisLen; int thisLen;
int r; bool res = true;
bool res = false; int j;
/* If nodeChar is zero, don't include it in data */ /* If nodeChar is zero, don't include it in data */
if (nodeChar == '\0') if (nodeChar == '\0')
...@@ -443,38 +430,57 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS) ...@@ -443,38 +430,57 @@ spg_text_inner_consistent(PG_FUNCTION_ARGS)
thisLen = maxReconstrLen; thisLen = maxReconstrLen;
} }
r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText), for (j = 0; j < in->nkeys; j++)
Min(inSize, thisLen));
switch (strategy)
{ {
case BTLessStrategyNumber: StrategyNumber strategy = in->scankeys[j].sk_strategy;
case BTLessEqualStrategyNumber: text *inText;
if (r <= 0) int inSize;
res = true; int r;
break;
case BTEqualStrategyNumber: /*
if (r == 0 && inSize >= thisLen) * If it's a collation-aware operator, but the collation is C, we
res = true; * can treat it as non-collation-aware. With non-C collation we
break; * need to traverse whole tree :-( so there's no point in making
case BTGreaterEqualStrategyNumber: * any check here.
case BTGreaterStrategyNumber: */
if (r >= 0) if (strategy > 10)
res = true; {
break; if (collate_is_c)
case BTLessStrategyNumber + 10: strategy -= 10;
case BTLessEqualStrategyNumber + 10: else
case BTGreaterEqualStrategyNumber + 10: continue;
case BTGreaterStrategyNumber + 10: }
/*
* with non-C collation we need to traverse whole tree :-( inText = DatumGetTextPP(in->scankeys[j].sk_argument);
*/ inSize = VARSIZE_ANY_EXHDR(inText);
res = true;
break; r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
default: Min(inSize, thisLen));
elog(ERROR, "unrecognized strategy number: %d",
in->strategy); switch (strategy)
break; {
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
if (r > 0)
res = false;
break;
case BTEqualStrategyNumber:
if (r != 0 || inSize < thisLen)
res = false;
break;
case BTGreaterEqualStrategyNumber:
case BTGreaterStrategyNumber:
if (r < 0)
res = false;
break;
default:
elog(ERROR, "unrecognized strategy number: %d",
in->scankeys[j].sk_strategy);
break;
}
if (!res)
break; /* no need to consider remaining conditions */
} }
if (res) if (res)
...@@ -496,16 +502,13 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS) ...@@ -496,16 +502,13 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS)
{ {
spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0); spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1); spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
StrategyNumber strategy = in->strategy;
text *query = DatumGetTextPP(in->query);
int level = in->level; int level = in->level;
text *leafValue, text *leafValue,
*reconstrValue = NULL; *reconstrValue = NULL;
char *fullValue; char *fullValue;
int fullLen; int fullLen;
int queryLen;
int r;
bool res; bool res;
int j;
/* all tests are exact */ /* all tests are exact */
out->recheck = false; out->recheck = false;
...@@ -518,18 +521,8 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS) ...@@ -518,18 +521,8 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS)
Assert(level == 0 ? reconstrValue == NULL : Assert(level == 0 ? reconstrValue == NULL :
VARSIZE_ANY_EXHDR(reconstrValue) == level); VARSIZE_ANY_EXHDR(reconstrValue) == level);
/* Reconstruct the full string represented by this leaf tuple */
fullLen = level + VARSIZE_ANY_EXHDR(leafValue); fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
queryLen = VARSIZE_ANY_EXHDR(query);
/*
* For an equality check, we needn't reconstruct fullValue if not same
* length; it can't match
*/
if (strategy == BTEqualStrategyNumber && queryLen != fullLen)
PG_RETURN_BOOL(false);
/* Else, reconstruct the full string represented by this leaf tuple */
if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0) if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
{ {
fullValue = VARDATA(reconstrValue); fullValue = VARDATA(reconstrValue);
...@@ -549,54 +542,67 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS) ...@@ -549,54 +542,67 @@ spg_text_leaf_consistent(PG_FUNCTION_ARGS)
out->leafValue = PointerGetDatum(fullText); out->leafValue = PointerGetDatum(fullText);
} }
/* Run the appropriate type of comparison */ /* Perform the required comparison(s) */
if (strategy > 10) res = true;
for (j = 0; j < in->nkeys; j++)
{ {
/* Collation-aware comparison */ StrategyNumber strategy = in->scankeys[j].sk_strategy;
strategy -= 10; text *query = DatumGetTextPP(in->scankeys[j].sk_argument);
int queryLen = VARSIZE_ANY_EXHDR(query);
int r;
/* If asserts are enabled, verify encoding of reconstructed string */ if (strategy > 10)
Assert(pg_verifymbstr(fullValue, fullLen, false)); {
/* Collation-aware comparison */
strategy -= 10;
r = varstr_cmp(fullValue, Min(queryLen, fullLen), /* If asserts enabled, verify encoding of reconstructed string */
VARDATA_ANY(query), Min(queryLen, fullLen), Assert(pg_verifymbstr(fullValue, fullLen, false));
PG_GET_COLLATION());
}
else
{
/* Non-collation-aware comparison */
r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
}
if (r == 0) r = varstr_cmp(fullValue, Min(queryLen, fullLen),
{ VARDATA_ANY(query), Min(queryLen, fullLen),
if (queryLen > fullLen) PG_GET_COLLATION());
r = -1; }
else if (queryLen < fullLen) else
r = 1; {
} /* Non-collation-aware comparison */
r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
}
switch (strategy) if (r == 0)
{ {
case BTLessStrategyNumber: if (queryLen > fullLen)
res = (r < 0); r = -1;
break; else if (queryLen < fullLen)
case BTLessEqualStrategyNumber: r = 1;
res = (r <= 0); }
break;
case BTEqualStrategyNumber: switch (strategy)
res = (r == 0); {
break; case BTLessStrategyNumber:
case BTGreaterEqualStrategyNumber: res = (r < 0);
res = (r >= 0); break;
break; case BTLessEqualStrategyNumber:
case BTGreaterStrategyNumber: res = (r <= 0);
res = (r > 0); break;
break; case BTEqualStrategyNumber:
default: res = (r == 0);
elog(ERROR, "unrecognized strategy number: %d", in->strategy); break;
res = false; case BTGreaterEqualStrategyNumber:
break; res = (r >= 0);
break;
case BTGreaterStrategyNumber:
res = (r > 0);
break;
default:
elog(ERROR, "unrecognized strategy number: %d",
in->scankeys[j].sk_strategy);
res = false;
break;
}
if (!res)
break; /* no need to consider remaining conditions */
} }
PG_RETURN_BOOL(res); PG_RETURN_BOOL(res);
......
...@@ -128,8 +128,8 @@ typedef struct spgPickSplitOut ...@@ -128,8 +128,8 @@ typedef struct spgPickSplitOut
*/ */
typedef struct spgInnerConsistentIn typedef struct spgInnerConsistentIn
{ {
StrategyNumber strategy; /* operator strategy number */ ScanKey scankeys; /* array of operators and comparison values */
Datum query; /* operator's RHS value */ int nkeys; /* length of array */
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
...@@ -156,8 +156,8 @@ typedef struct spgInnerConsistentOut ...@@ -156,8 +156,8 @@ typedef struct spgInnerConsistentOut
*/ */
typedef struct spgLeafConsistentIn typedef struct spgLeafConsistentIn
{ {
StrategyNumber strategy; /* operator strategy number */ ScanKey scankeys; /* array of operators and comparison values */
Datum query; /* operator's RHS value */ int nkeys; /* length of array */
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
......
...@@ -126,7 +126,11 @@ typedef struct SpGistScanOpaqueData ...@@ -126,7 +126,11 @@ typedef struct SpGistScanOpaqueData
SpGistState state; /* see above */ SpGistState state; /* see above */
MemoryContext tempCxt; /* short-lived memory context */ MemoryContext tempCxt; /* short-lived memory context */
/* Index quals for scan (copied from IndexScanDesc for convenience) */ /* Control flags showing whether to search nulls and/or non-nulls */
bool searchNulls; /* scan matches (all) null entries */
bool searchNonNulls; /* scan matches (some) non-null entries */
/* Index quals to be passed to opclass (null-related quals removed) */
int numberOfKeys; /* number of index qualifier conditions */ int numberOfKeys; /* number of index qualifier conditions */
ScanKey keyData; /* array of index qualifier descriptors */ ScanKey keyData; /* array of index qualifier descriptors */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment