Commit 1acf7572 authored by Tom Lane's avatar Tom Lane

Fix GiST index build for NaN values in geometric types.

GiST index build could go into an infinite loop when presented with boxes
(or points, circles or polygons) containing NaN component values.  This
happened essentially because the code assumed that x == x is true for any
"double" value x; but it's not true for NaNs.  The looping behavior was not
the only problem though: we also attempted to sort the items using simple
double comparisons.  Since NaNs violate the trichotomy law, qsort could
(in principle at least) get arbitrarily confused and mess up the sorting of
ordinary values as well as NaNs.  And we based splitting choices on box size
calculations that could produce NaNs, again resulting in undesirable
behavior.

To fix, replace all comparisons of doubles in this logic with
float8_cmp_internal, which is NaN-aware and is careful to sort NaNs
consistently, higher than any non-NaN.  Also rearrange the box size
calculation to not produce NaNs; instead it should produce an infinity
for a box with NaN on one side and not-NaN on the other.

I don't by any means claim that this solves all problems with NaNs in
geometric values, but it should at least make GiST index insertion work
reliably with such data.  It's likely that the index search side of things
still needs some work, and probably regular geometric operations too.
But with this patch we're laying down a convention for how such cases
ought to behave.

Per bug #14238 from Guang-Dih Lei.  Back-patch to 9.2; the code used before
commit 7f3bd868 is quite different and doesn't lock up on my simple
test case, nor on the submitter's dataset.

Report: <20160708151747.1426.60150@wrigleys.postgresql.org>
Discussion: <28685.1468246504@sss.pgh.pa.us>
parent 00e0b67a
......@@ -17,20 +17,31 @@
*/
#include "postgres.h"
#include <math.h>
#include "access/gist.h"
#include "access/stratnum.h"
#include "utils/builtins.h"
#include "utils/geo_decls.h"
static bool gist_box_leaf_consistent(BOX *key, BOX *query,
StrategyNumber strategy);
static double size_box(BOX *box);
static bool rtree_internal_consistent(BOX *key, BOX *query,
StrategyNumber strategy);
/* Minimum accepted ratio of split */
#define LIMIT_RATIO 0.3
/* Convenience macros for NaN-aware comparisons */
#define FLOAT8_EQ(a,b) (float8_cmp_internal(a, b) == 0)
#define FLOAT8_LT(a,b) (float8_cmp_internal(a, b) < 0)
#define FLOAT8_LE(a,b) (float8_cmp_internal(a, b) <= 0)
#define FLOAT8_GT(a,b) (float8_cmp_internal(a, b) > 0)
#define FLOAT8_GE(a,b) (float8_cmp_internal(a, b) >= 0)
#define FLOAT8_MAX(a,b) (FLOAT8_GT(a, b) ? (a) : (b))
#define FLOAT8_MIN(a,b) (FLOAT8_LT(a, b) ? (a) : (b))
/**************************************************
* Box ops
......@@ -40,12 +51,53 @@ static bool rtree_internal_consistent(BOX *key, BOX *query,
* Calculates union of two boxes, a and b. The result is stored in *n.
*/
static void
rt_box_union(BOX *n, BOX *a, BOX *b)
rt_box_union(BOX *n, const BOX *a, const BOX *b)
{
n->high.x = FLOAT8_MAX(a->high.x, b->high.x);
n->high.y = FLOAT8_MAX(a->high.y, b->high.y);
n->low.x = FLOAT8_MIN(a->low.x, b->low.x);
n->low.y = FLOAT8_MIN(a->low.y, b->low.y);
}
/*
* Size of a BOX for penalty-calculation purposes.
* The result can be +Infinity, but not NaN.
*/
static double
size_box(const BOX *box)
{
/*
* Check for zero-width cases. Note that we define the size of a zero-
* by-infinity box as zero. It's important to special-case this somehow,
* as naively multiplying infinity by zero will produce NaN.
*
* The less-than cases should not happen, but if they do, say "zero".
*/
if (FLOAT8_LE(box->high.x, box->low.x) ||
FLOAT8_LE(box->high.y, box->low.y))
return 0.0;
/*
* We treat NaN as larger than +Infinity, so any distance involving a NaN
* and a non-NaN is infinite. Note the previous check eliminated the
* possibility that the low fields are NaNs.
*/
if (isnan(box->high.x) || isnan(box->high.y))
return get_float8_infinity();
return (box->high.x - box->low.x) * (box->high.y - box->low.y);
}
/*
* Return amount by which the union of the two boxes is larger than
* the original BOX's area. The result can be +Infinity, but not NaN.
*/
static double
box_penalty(const BOX *original, const BOX *new)
{
n->high.x = Max(a->high.x, b->high.x);
n->high.y = Max(a->high.y, b->high.y);
n->low.x = Min(a->low.x, b->low.x);
n->low.y = Min(a->low.y, b->low.y);
BOX unionbox;
rt_box_union(&unionbox, original, new);
return size_box(&unionbox) - size_box(original);
}
/*
......@@ -85,16 +137,19 @@ gist_box_consistent(PG_FUNCTION_ARGS)
strategy));
}
/*
* Increase BOX b to include addon.
*/
static void
adjustBox(BOX *b, BOX *addon)
adjustBox(BOX *b, const BOX *addon)
{
if (b->high.x < addon->high.x)
if (FLOAT8_LT(b->high.x, addon->high.x))
b->high.x = addon->high.x;
if (b->low.x > addon->low.x)
if (FLOAT8_GT(b->low.x, addon->low.x))
b->low.x = addon->low.x;
if (b->high.y < addon->high.y)
if (FLOAT8_LT(b->high.y, addon->high.y))
b->high.y = addon->high.y;
if (b->low.y > addon->low.y)
if (FLOAT8_GT(b->low.y, addon->low.y))
b->low.y = addon->low.y;
}
......@@ -174,10 +229,8 @@ gist_box_penalty(PG_FUNCTION_ARGS)
float *result = (float *) PG_GETARG_POINTER(2);
BOX *origbox = DatumGetBoxP(origentry->key);
BOX *newbox = DatumGetBoxP(newentry->key);
BOX unionbox;
rt_box_union(&unionbox, origbox, newbox);
*result = (float) (size_box(&unionbox) - size_box(origbox));
*result = (float) box_penalty(origbox, newbox);
PG_RETURN_POINTER(result);
}
......@@ -290,12 +343,7 @@ interval_cmp_lower(const void *i1, const void *i2)
double lower1 = ((const SplitInterval *) i1)->lower,
lower2 = ((const SplitInterval *) i2)->lower;
if (lower1 < lower2)
return -1;
else if (lower1 > lower2)
return 1;
else
return 0;
return float8_cmp_internal(lower1, lower2);
}
/*
......@@ -307,16 +355,11 @@ interval_cmp_upper(const void *i1, const void *i2)
double upper1 = ((const SplitInterval *) i1)->upper,
upper2 = ((const SplitInterval *) i2)->upper;
if (upper1 < upper2)
return -1;
else if (upper1 > upper2)
return 1;
else
return 0;
return float8_cmp_internal(upper1, upper2);
}
/*
* Replace negative value with zero.
* Replace negative (or NaN) value with zero.
*/
static inline float
non_negative(float val)
......@@ -435,25 +478,9 @@ g_box_consider_split(ConsiderSplitContext *context, int dimNum,
}
}
/*
* Return increase of original BOX area by new BOX area insertion.
*/
static double
box_penalty(BOX *original, BOX *new)
{
double union_width,
union_height;
union_width = Max(original->high.x, new->high.x) -
Min(original->low.x, new->low.x);
union_height = Max(original->high.y, new->high.y) -
Min(original->low.y, new->low.y);
return union_width * union_height - (original->high.x - original->low.x) *
(original->high.y - original->low.y);
}
/*
* Compare common entries by their deltas.
* (We assume the deltas can't be NaN.)
*/
static int
common_entry_cmp(const void *i1, const void *i2)
......@@ -615,9 +642,11 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
/*
* Find next lower bound of right group.
*/
while (i1 < nentries && rightLower == intervalsLower[i1].lower)
while (i1 < nentries &&
FLOAT8_EQ(rightLower, intervalsLower[i1].lower))
{
leftUpper = Max(leftUpper, intervalsLower[i1].upper);
if (FLOAT8_LT(leftUpper, intervalsLower[i1].upper))
leftUpper = intervalsLower[i1].upper;
i1++;
}
if (i1 >= nentries)
......@@ -628,7 +657,8 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
* Find count of intervals which anyway should be placed to the
* left group.
*/
while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
while (i2 < nentries &&
FLOAT8_LE(intervalsUpper[i2].upper, leftUpper))
i2++;
/*
......@@ -650,9 +680,10 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
/*
* Find next upper bound of left group.
*/
while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
while (i2 >= 0 && FLOAT8_EQ(leftUpper, intervalsUpper[i2].upper))
{
rightLower = Min(rightLower, intervalsUpper[i2].lower);
if (FLOAT8_GT(rightLower, intervalsUpper[i2].lower))
rightLower = intervalsUpper[i2].lower;
i2--;
}
if (i2 < 0)
......@@ -663,7 +694,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
* Find count of intervals which anyway should be placed to the
* right group.
*/
while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
while (i1 >= 0 && FLOAT8_GE(intervalsLower[i1].lower, rightLower))
i1--;
/*
......@@ -751,10 +782,10 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
upper = box->high.y;
}
if (upper <= context.leftUpper)
if (FLOAT8_LE(upper, context.leftUpper))
{
/* Fits to the left group */
if (lower >= context.rightLower)
if (FLOAT8_GE(lower, context.rightLower))
{
/* Fits also to the right group, so "common entry" */
commonEntries[commonEntriesCount++].index = i;
......@@ -772,7 +803,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
* entry didn't fit on the left group, it better fit in the right
* group.
*/
Assert(lower >= context.rightLower);
Assert(FLOAT8_GE(lower, context.rightLower));
/* Doesn't fit to the left group, so join to the right group */
PLACE_RIGHT(box, i);
......@@ -856,8 +887,10 @@ gist_box_same(PG_FUNCTION_ARGS)
bool *result = (bool *) PG_GETARG_POINTER(2);
if (b1 && b2)
*result = (b1->low.x == b2->low.x && b1->low.y == b2->low.y &&
b1->high.x == b2->high.x && b1->high.y == b2->high.y);
*result = (FLOAT8_EQ(b1->low.x, b2->low.x) &&
FLOAT8_EQ(b1->low.y, b2->low.y) &&
FLOAT8_EQ(b1->high.x, b2->high.x) &&
FLOAT8_EQ(b1->high.y, b2->high.y));
else
*result = (b1 == NULL && b2 == NULL);
PG_RETURN_POINTER(result);
......@@ -943,14 +976,6 @@ gist_box_leaf_consistent(BOX *key, BOX *query, StrategyNumber strategy)
return retval;
}
static double
size_box(BOX *box)
{
if (box->high.x <= box->low.x || box->high.y <= box->low.y)
return 0.0;
return (box->high.x - box->low.x) * (box->high.y - box->low.y);
}
/*****************************************
* Common rtree functions (for boxes, polygons, and circles)
*****************************************/
......
......@@ -90,8 +90,6 @@ float8 degree_c_one_half = 0.5;
float8 degree_c_one = 1.0;
/* Local function prototypes */
static int float4_cmp_internal(float4 a, float4 b);
static int float8_cmp_internal(float8 a, float8 b);
static double sind_q1(double x);
static double cosd_q1(double x);
static void init_degree_constants(void);
......@@ -936,7 +934,7 @@ float8div(PG_FUNCTION_ARGS)
/*
* float4{eq,ne,lt,le,gt,ge} - float4/float4 comparison operations
*/
static int
int
float4_cmp_internal(float4 a, float4 b)
{
/*
......@@ -1050,7 +1048,7 @@ btfloat4sortsupport(PG_FUNCTION_ARGS)
/*
* float8{eq,ne,lt,le,gt,ge} - float8/float8 comparison operations
*/
static int
int
float8_cmp_internal(float8 a, float8 b)
{
/*
......
......@@ -346,6 +346,8 @@ extern int is_infinite(double val);
extern double float8in_internal(char *num, char **endptr_p,
const char *type_name, const char *orig_string);
extern char *float8out_internal(double num);
extern int float4_cmp_internal(float4 a, float4 b);
extern int float8_cmp_internal(float8 a, float8 b);
extern Datum float4in(PG_FUNCTION_ARGS);
extern Datum float4out(PG_FUNCTION_ARGS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment