Commit 23f10b64 authored by Heikki Linnakangas's avatar Heikki Linnakangas

SP-GiST support of the range adjacent operator -|-

Alexander Korotkov, reviewed by Jeff Davis.
parent 2443a26b
......@@ -709,6 +709,64 @@ range_after(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(range_after_internal(typcache, r1, r2));
}
/*
* Check if two bounds A and B are "adjacent", where A is an upper bound and B
* is a lower bound. For the bounds to be adjacent, each subtype value must
* satisfy strictly one of the bounds: there are no values which satisfy both
* bounds (i.e. less than A and greater than B); and there are no values which
* satisfy neither bound (i.e. greater than A and less than B).
*
* For discrete ranges, we rely on the canonicalization function to see if A..B
* normalizes to empty. (If there is no canonicalization function, it's
* impossible for such a range to normalize to empty, so we needn't bother to
* try.)
*
* If A == B, the ranges are adjacent only if the bounds have different
* inclusive flags (i.e., exactly one of the ranges includes the common
* boundary point).
*
* And if A > B then the ranges are not adjacent in this order.
*/
bool
bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB)
{
int cmp;
Assert(!boundA.lower && boundB.lower);
cmp = range_cmp_bound_values(typcache, &boundA, &boundB);
if (cmp < 0)
{
RangeType *r;
/*
* Bounds do not overlap; see if there are points in between.
*/
/* in a continuous subtype, there are assumed to be points between */
if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
return false;
/*
* The bounds are of a discrete range type; so make a range A..B and
* see if it's empty.
*/
/* flip the inclusion flags */
boundA.inclusive = !boundA.inclusive;
boundB.inclusive = !boundB.inclusive;
/* change upper/lower labels to avoid Assert failures */
boundA.lower = true;
boundB.lower = false;
r = make_range(typcache, &boundA, &boundB, false);
return RangeIsEmpty(r);
}
else if (cmp == 0)
return boundA.inclusive != boundB.inclusive;
else
return false; /* bounds overlap */
}
/* adjacent to (but not overlapping)? (internal version) */
bool
range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
......@@ -719,8 +777,6 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
upper2;
bool empty1,
empty2;
RangeType *r3;
int cmp;
/* Different types should be prevented by ANYRANGE matching rules */
if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2))
......@@ -734,62 +790,11 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
return false;
/*
* Given two ranges A..B and C..D, where B < C, the ranges are adjacent if
* and only if the range B..C is empty, where inclusivity of these two
* bounds is inverted compared to the original bounds. For discrete
* ranges, we have to rely on the canonicalization function to normalize
* B..C to empty if it contains no elements of the subtype. (If there is
* no canonicalization function, it's impossible for such a range to
* normalize to empty, so we needn't bother to try.)
*
* If B == C, the ranges are adjacent only if these bounds have different
* inclusive flags (i.e., exactly one of the ranges includes the common
* boundary point).
*
* And if B > C then the ranges cannot be adjacent in this order, but we
* must consider the other order (i.e., check D <= A).
* Given two ranges A..B and C..D, the ranges are adjacent if and only if
* B is adjacent to C, or D is adjacent to A.
*/
cmp = range_cmp_bound_values(typcache, &upper1, &lower2);
if (cmp < 0)
{
/* in a continuous subtype, there are assumed to be points between */
if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
return (false);
/* flip the inclusion flags */
upper1.inclusive = !upper1.inclusive;
lower2.inclusive = !lower2.inclusive;
/* change upper/lower labels to avoid Assert failures */
upper1.lower = true;
lower2.lower = false;
r3 = make_range(typcache, &upper1, &lower2, false);
return RangeIsEmpty(r3);
}
if (cmp == 0)
{
return (upper1.inclusive != lower2.inclusive);
}
cmp = range_cmp_bound_values(typcache, &upper2, &lower1);
if (cmp < 0)
{
/* in a continuous subtype, there are assumed to be points between */
if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
return (false);
/* flip the inclusion flags */
upper2.inclusive = !upper2.inclusive;
lower1.inclusive = !lower1.inclusive;
/* change upper/lower labels to avoid Assert failures */
upper2.lower = true;
lower1.lower = false;
r3 = make_range(typcache, &upper2, &lower1, false);
return RangeIsEmpty(r3);
}
if (cmp == 0)
{
return (upper2.inclusive != lower1.inclusive);
}
return false;
return (bounds_adjacent(typcache, upper1, lower2) ||
bounds_adjacent(typcache, upper2, lower1));
}
/* adjacent to (but not overlapping)? */
......
......@@ -305,6 +305,16 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
int which;
int i;
/*
* For adjacent search we need also previous centroid (if any) to improve
* the precision of the consistent check. In this case needPrevious flag is
* set and centroid is passed into reconstructedValues. This is not the
* intended purpose of reconstructedValues (because we already have the
* full value available at the leaf), but it's a convenient place to store
* state while traversing the tree.
*/
bool needPrevious = false;
if (in->allTheSame)
{
/* Report that all nodes should be visited */
......@@ -351,6 +361,7 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
case RANGESTRAT_OVERLAPS:
case RANGESTRAT_OVERRIGHT:
case RANGESTRAT_AFTER:
case RANGESTRAT_ADJACENT:
/* These strategies return false if any argument is empty */
if (empty)
which = 0;
......@@ -435,6 +446,9 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
/* Are the restrictions on range bounds inclusive? */
bool inclusive = true;
bool strictEmpty = true;
int cmp,
which1,
which2;
strategy = in->scankeys[i].sk_strategy;
......@@ -522,6 +536,118 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
inclusive = false;
break;
case RANGESTRAT_ADJACENT:
if (empty)
break; /* Skip to strictEmpty check. */
/*
* which1 is bitmask for possibility to be adjacent with
* lower bound of argument. which2 is bitmask for
* possibility to be adjacent with upper bound of argument.
*/
which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
/*
* Previously selected quadrant could exclude possibility
* for lower or upper bounds to be adjacent. Deserialize
* previous centroid range if present for checking this.
*/
if (in->reconstructedValue != (Datum) 0)
{
RangeType *prevCentroid;
RangeBound prevLower,
prevUpper;
bool prevEmpty;
int cmp1,
cmp2;
prevCentroid = DatumGetRangeType(in->reconstructedValue);
range_deserialize(typcache, prevCentroid,
&prevLower, &prevUpper, &prevEmpty);
/*
* Check if lower bound of argument is not in a
* quadrant we visited in the previous step.
*/
cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
cmp2 = range_cmp_bounds(typcache, &centroidUpper,
&prevUpper);
if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
which1 = 0;
/*
* Check if upper bound of argument is not in a
* quadrant we visited in the previous step.
*/
cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
cmp2 = range_cmp_bounds(typcache, &centroidLower,
&prevLower);
if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
which2 = 0;
}
if (which1)
{
/*
* For a range's upper bound to be adjacent to the
* argument's lower bound, it will be found along the
* line adjacent to (and just below) Y=lower.
* Therefore, if the argument's lower bound is less
* than the centroid's upper bound, the line falls in
* quadrants 2 and 3; if greater, the line falls in
* quadrants 1 and 4.
*
* The above is true even when the argument's lower
* bound is greater and adjacent to the centroid's
* upper bound. If the argument's lower bound is
* greater than the centroid's upper bound, then the
* lowest value that an adjacent range could have is
* that of the centroid's upper bound, which still
* falls in quadrants 1 and 4.
*
* In the edge case, where the argument's lower bound
* is equal to the cetroid's upper bound, there may be
* adjacent ranges in any quadrant.
*/
cmp = range_cmp_bounds(typcache, &lower,
&centroidUpper);
if (cmp < 0)
which1 &= (1 << 2) | (1 << 3);
else if (cmp > 0)
which1 &= (1 << 1) | (1 << 4);
}
if (which2)
{
/*
* For a range's lower bound to be adjacent to the
* argument's upper bound, it will be found along the
* line adjacent to (and just right of)
* X=upper. Therefore, if the argument's upper bound is
* less than (and not adjacent to) the centroid's upper
* bound, the line falls in quadrants 3 and 4; if
* greater or equal to, the line falls in quadrants 1
* and 2.
*
* The edge case is when the argument's upper bound is
* less than and adjacent to the centroid's lower
* bound. In that case, adjacent ranges may be in any
* quadrant.
*/
cmp = range_cmp_bounds(typcache, &lower,
&centroidUpper);
if (cmp < 0 &&
!bounds_adjacent(typcache, upper, centroidLower))
which1 &= (1 << 3) | (1 << 4);
else if (cmp > 0)
which1 &= (1 << 1) | (1 << 2);
}
which &= which1 | which2;
needPrevious = true;
break;
case RANGESTRAT_CONTAINS:
/*
* Non-empty range A contains non-empty range B if lower
......@@ -652,11 +778,18 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
/* We must descend into the quadrant(s) identified by 'which' */
out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
if (needPrevious)
out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
out->nNodes = 0;
for (i = 1; i <= in->nNodes; i++)
{
if (which & (1 << i))
{
/* Save previous prefix if needed */
if (needPrevious)
out->reconstructedValues[out->nNodes] = in->prefixDatum;
out->nodeNumbers[out->nNodes++] = i - 1;
}
}
PG_RETURN_VOID();
......@@ -713,6 +846,10 @@ spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
res = range_after_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_ADJACENT:
res = range_adjacent_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_CONTAINS:
res = range_contains_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
......
......@@ -775,6 +775,7 @@ DATA(insert ( 3474 3831 3831 2 s 3895 4000 0 ));
DATA(insert ( 3474 3831 3831 3 s 3888 4000 0 ));
DATA(insert ( 3474 3831 3831 4 s 3896 4000 0 ));
DATA(insert ( 3474 3831 3831 5 s 3894 4000 0 ));
DATA(insert ( 3474 3831 3831 6 s 3897 4000 0 ));
DATA(insert ( 3474 3831 3831 7 s 3890 4000 0 ));
DATA(insert ( 3474 3831 3831 8 s 3892 4000 0 ));
DATA(insert ( 3474 3831 2283 16 s 3889 4000 0 ));
......
......@@ -201,6 +201,8 @@ extern int range_cmp_bounds(TypeCacheEntry *typcache, RangeBound *b1,
RangeBound *b2);
extern int range_cmp_bound_values(TypeCacheEntry *typcache, RangeBound *b1,
RangeBound *b2);
extern bool bounds_adjacent(TypeCacheEntry *typcache, RangeBound bound1,
RangeBound bound2);
extern RangeType *make_empty_range(TypeCacheEntry *typcache);
/* GiST support (in rangetypes_gist.c) */
......
......@@ -1076,6 +1076,7 @@ ORDER BY 1, 2, 3;
4000 | 4 | ~>=~
4000 | 5 | >>
4000 | 5 | ~>~
4000 | 6 | -|-
4000 | 6 | ~=
4000 | 7 | @>
4000 | 8 | <@
......@@ -1087,7 +1088,7 @@ ORDER BY 1, 2, 3;
4000 | 15 | >
4000 | 16 | @>
4000 | 18 | =
(61 rows)
(62 rows)
-- Check that all opclass search operators have selectivity estimators.
-- This is not absolutely required, but it seems a reasonable thing
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment