Commit c87cb5f7 authored by Tom Lane's avatar Tom Lane

Allow btree comparison functions to return INT_MIN.

Historically we forbade datatype-specific comparison functions from
returning INT_MIN, so that it would be safe to invert the sort order
just by negating the comparison result.  However, this was never
really safe for comparison functions that directly return the result
of memcmp(), strcmp(), etc, as POSIX doesn't place any such restriction
on those library functions.  Buildfarm results show that at least on
recent Linux on s390x, memcmp() actually does return INT_MIN sometimes,
causing sort failures.

The agreed-on answer is to remove this restriction and fix relevant
call sites to not make such an assumption; code such as "res = -res"
should be replaced by "INVERT_COMPARE_RESULT(res)".  The same is needed
in a few places that just directly negated the result of memcmp or
strcmp.

To help find places having this problem, I've also added a compile option
to nbtcompare.c that causes some of the commonly used comparators to
return INT_MIN/INT_MAX instead of their usual -1/+1.  It'd likely be
a good idea to have at least one buildfarm member running with
"-DSTRESS_SORT_INT_MIN".  That's far from a complete test of course,
but it should help to prevent fresh introductions of such bugs.

This is a longstanding portability hazard, so back-patch to all supported
branches.

Discussion: https://postgr.es/m/20180928185215.ffoq2xrq5d3pafna@alap3.anarazel.de
parent 113a6599
...@@ -45,17 +45,24 @@ ltree_compare(const ltree *a, const ltree *b) ...@@ -45,17 +45,24 @@ ltree_compare(const ltree *a, const ltree *b)
ltree_level *bl = LTREE_FIRST(b); ltree_level *bl = LTREE_FIRST(b);
int an = a->numlevel; int an = a->numlevel;
int bn = b->numlevel; int bn = b->numlevel;
int res = 0;
while (an > 0 && bn > 0) while (an > 0 && bn > 0)
{ {
int res;
if ((res = memcmp(al->name, bl->name, Min(al->len, bl->len))) == 0) if ((res = memcmp(al->name, bl->name, Min(al->len, bl->len))) == 0)
{ {
if (al->len != bl->len) if (al->len != bl->len)
return (al->len - bl->len) * 10 * (an + 1); return (al->len - bl->len) * 10 * (an + 1);
} }
else else
{
if (res < 0)
res = -1;
else
res = 1;
return res * 10 * (an + 1); return res * 10 * (an + 1);
}
an--; an--;
bn--; bn--;
...@@ -146,7 +153,7 @@ inner_isparent(const ltree *c, const ltree *p) ...@@ -146,7 +153,7 @@ inner_isparent(const ltree *c, const ltree *p)
{ {
if (cl->len != pl->len) if (cl->len != pl->len)
return false; return false;
if (memcmp(cl->name, pl->name, cl->len)) if (memcmp(cl->name, pl->name, cl->len) != 0)
return false; return false;
pn--; pn--;
......
...@@ -1254,11 +1254,9 @@ mp_int_compare(mp_int a, mp_int b) ...@@ -1254,11 +1254,9 @@ mp_int_compare(mp_int a, mp_int b)
* If they're both zero or positive, the normal comparison applies; if * If they're both zero or positive, the normal comparison applies; if
* both negative, the sense is reversed. * both negative, the sense is reversed.
*/ */
if (sa == MP_ZPOS) if (sa != MP_ZPOS)
return cmp; INVERT_COMPARE_RESULT(cmp);
else return cmp;
return -cmp;
} }
else else
{ {
...@@ -1314,10 +1312,9 @@ mp_int_compare_value(mp_int z, int value) ...@@ -1314,10 +1312,9 @@ mp_int_compare_value(mp_int z, int value)
{ {
cmp = s_vcmp(z, value); cmp = s_vcmp(z, value);
if (vsign == MP_ZPOS) if (vsign != MP_ZPOS)
return cmp; INVERT_COMPARE_RESULT(cmp);
else return cmp;
return -cmp;
} }
else else
{ {
......
...@@ -228,11 +228,8 @@ ...@@ -228,11 +228,8 @@
<replaceable>B</replaceable>, <replaceable>A</replaceable> <replaceable>B</replaceable>, <replaceable>A</replaceable>
<literal>=</literal> <replaceable>B</replaceable>, <literal>=</literal> <replaceable>B</replaceable>,
or <replaceable>A</replaceable> <literal>&gt;</literal> or <replaceable>A</replaceable> <literal>&gt;</literal>
<replaceable>B</replaceable>, respectively. The function must not <replaceable>B</replaceable>, respectively.
return <literal>INT_MIN</literal> for the <replaceable>A</replaceable> A null result is disallowed: all values of the data type must be comparable.
<literal>&lt;</literal> <replaceable>B</replaceable> case,
since the value may be negated before being tested for sign. A null
result is disallowed, too.
See <filename>src/backend/access/nbtree/nbtcompare.c</filename> for See <filename>src/backend/access/nbtree/nbtcompare.c</filename> for
examples. examples.
</para> </para>
......
...@@ -22,11 +22,10 @@ ...@@ -22,11 +22,10 @@
* *
* The result is always an int32 regardless of the input datatype. * The result is always an int32 regardless of the input datatype.
* *
* Although any negative int32 (except INT_MIN) is acceptable for reporting * Although any negative int32 is acceptable for reporting "<",
* "<", and any positive int32 is acceptable for reporting ">", routines * and any positive int32 is acceptable for reporting ">", routines
* that work on 32-bit or wider datatypes can't just return "a - b". * that work on 32-bit or wider datatypes can't just return "a - b".
* That could overflow and give the wrong answer. Also, one must not * That could overflow and give the wrong answer.
* return INT_MIN to report "<", since some callers will negate the result.
* *
* NOTE: it is critical that the comparison function impose a total order * NOTE: it is critical that the comparison function impose a total order
* on all non-NULL values of the data type, and that the datatype's * on all non-NULL values of the data type, and that the datatype's
...@@ -44,13 +43,31 @@ ...@@ -44,13 +43,31 @@
* during an index access won't be recovered till end of query. This * during an index access won't be recovered till end of query. This
* primarily affects comparison routines for toastable datatypes; * primarily affects comparison routines for toastable datatypes;
* they have to be careful to free any detoasted copy of an input datum. * they have to be careful to free any detoasted copy of an input datum.
*
* NOTE: we used to forbid comparison functions from returning INT_MIN,
* but that proves to be too error-prone because some platforms' versions
* of memcmp() etc can return INT_MIN. As a means of stress-testing
* callers, this file can be compiled with STRESS_SORT_INT_MIN defined
* to cause many of these functions to return INT_MIN or INT_MAX instead of
* their customary -1/+1. For production, though, that's not a good idea
* since users or third-party code might expect the traditional results.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/sortsupport.h" #include "utils/sortsupport.h"
#ifdef STRESS_SORT_INT_MIN
#define A_LESS_THAN_B INT_MIN
#define A_GREATER_THAN_B INT_MAX
#else
#define A_LESS_THAN_B (-1)
#define A_GREATER_THAN_B 1
#endif
Datum Datum
btboolcmp(PG_FUNCTION_ARGS) btboolcmp(PG_FUNCTION_ARGS)
...@@ -95,11 +112,11 @@ btint4cmp(PG_FUNCTION_ARGS) ...@@ -95,11 +112,11 @@ btint4cmp(PG_FUNCTION_ARGS)
int32 b = PG_GETARG_INT32(1); int32 b = PG_GETARG_INT32(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
static int static int
...@@ -109,11 +126,11 @@ btint4fastcmp(Datum x, Datum y, SortSupport ssup) ...@@ -109,11 +126,11 @@ btint4fastcmp(Datum x, Datum y, SortSupport ssup)
int32 b = DatumGetInt32(y); int32 b = DatumGetInt32(y);
if (a > b) if (a > b)
return 1; return A_GREATER_THAN_B;
else if (a == b) else if (a == b)
return 0; return 0;
else else
return -1; return A_LESS_THAN_B;
} }
Datum Datum
...@@ -132,11 +149,11 @@ btint8cmp(PG_FUNCTION_ARGS) ...@@ -132,11 +149,11 @@ btint8cmp(PG_FUNCTION_ARGS)
int64 b = PG_GETARG_INT64(1); int64 b = PG_GETARG_INT64(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
static int static int
...@@ -146,11 +163,11 @@ btint8fastcmp(Datum x, Datum y, SortSupport ssup) ...@@ -146,11 +163,11 @@ btint8fastcmp(Datum x, Datum y, SortSupport ssup)
int64 b = DatumGetInt64(y); int64 b = DatumGetInt64(y);
if (a > b) if (a > b)
return 1; return A_GREATER_THAN_B;
else if (a == b) else if (a == b)
return 0; return 0;
else else
return -1; return A_LESS_THAN_B;
} }
Datum Datum
...@@ -169,11 +186,11 @@ btint48cmp(PG_FUNCTION_ARGS) ...@@ -169,11 +186,11 @@ btint48cmp(PG_FUNCTION_ARGS)
int64 b = PG_GETARG_INT64(1); int64 b = PG_GETARG_INT64(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -183,11 +200,11 @@ btint84cmp(PG_FUNCTION_ARGS) ...@@ -183,11 +200,11 @@ btint84cmp(PG_FUNCTION_ARGS)
int32 b = PG_GETARG_INT32(1); int32 b = PG_GETARG_INT32(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -197,11 +214,11 @@ btint24cmp(PG_FUNCTION_ARGS) ...@@ -197,11 +214,11 @@ btint24cmp(PG_FUNCTION_ARGS)
int32 b = PG_GETARG_INT32(1); int32 b = PG_GETARG_INT32(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -211,11 +228,11 @@ btint42cmp(PG_FUNCTION_ARGS) ...@@ -211,11 +228,11 @@ btint42cmp(PG_FUNCTION_ARGS)
int16 b = PG_GETARG_INT16(1); int16 b = PG_GETARG_INT16(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -225,11 +242,11 @@ btint28cmp(PG_FUNCTION_ARGS) ...@@ -225,11 +242,11 @@ btint28cmp(PG_FUNCTION_ARGS)
int64 b = PG_GETARG_INT64(1); int64 b = PG_GETARG_INT64(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -239,11 +256,11 @@ btint82cmp(PG_FUNCTION_ARGS) ...@@ -239,11 +256,11 @@ btint82cmp(PG_FUNCTION_ARGS)
int16 b = PG_GETARG_INT16(1); int16 b = PG_GETARG_INT16(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
Datum Datum
...@@ -253,11 +270,11 @@ btoidcmp(PG_FUNCTION_ARGS) ...@@ -253,11 +270,11 @@ btoidcmp(PG_FUNCTION_ARGS)
Oid b = PG_GETARG_OID(1); Oid b = PG_GETARG_OID(1);
if (a > b) if (a > b)
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else if (a == b) else if (a == b)
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
static int static int
...@@ -267,11 +284,11 @@ btoidfastcmp(Datum x, Datum y, SortSupport ssup) ...@@ -267,11 +284,11 @@ btoidfastcmp(Datum x, Datum y, SortSupport ssup)
Oid b = DatumGetObjectId(y); Oid b = DatumGetObjectId(y);
if (a > b) if (a > b)
return 1; return A_GREATER_THAN_B;
else if (a == b) else if (a == b)
return 0; return 0;
else else
return -1; return A_LESS_THAN_B;
} }
Datum Datum
...@@ -299,9 +316,9 @@ btoidvectorcmp(PG_FUNCTION_ARGS) ...@@ -299,9 +316,9 @@ btoidvectorcmp(PG_FUNCTION_ARGS)
if (a->values[i] != b->values[i]) if (a->values[i] != b->values[i])
{ {
if (a->values[i] > b->values[i]) if (a->values[i] > b->values[i])
PG_RETURN_INT32(1); PG_RETURN_INT32(A_GREATER_THAN_B);
else else
PG_RETURN_INT32(-1); PG_RETURN_INT32(A_LESS_THAN_B);
} }
} }
PG_RETURN_INT32(0); PG_RETURN_INT32(0);
......
...@@ -530,7 +530,7 @@ _bt_compare(Relation rel, ...@@ -530,7 +530,7 @@ _bt_compare(Relation rel,
scankey->sk_argument)); scankey->sk_argument));
if (!(scankey->sk_flags & SK_BT_DESC)) if (!(scankey->sk_flags & SK_BT_DESC))
result = -result; INVERT_COMPARE_RESULT(result);
} }
/* if the keys are unequal, return the difference */ /* if the keys are unequal, return the difference */
......
...@@ -518,7 +518,7 @@ _bt_compare_array_elements(const void *a, const void *b, void *arg) ...@@ -518,7 +518,7 @@ _bt_compare_array_elements(const void *a, const void *b, void *arg)
cxt->collation, cxt->collation,
da, db)); da, db));
if (cxt->reverse) if (cxt->reverse)
compare = -compare; INVERT_COMPARE_RESULT(compare);
return compare; return compare;
} }
...@@ -1652,7 +1652,7 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, ...@@ -1652,7 +1652,7 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
subkey->sk_argument)); subkey->sk_argument));
if (subkey->sk_flags & SK_BT_DESC) if (subkey->sk_flags & SK_BT_DESC)
cmpresult = -cmpresult; INVERT_COMPARE_RESULT(cmpresult);
/* Done comparing if unequal, else advance to next column */ /* Done comparing if unequal, else advance to next column */
if (cmpresult != 0) if (cmpresult != 0)
......
...@@ -755,7 +755,10 @@ heap_compare_slots(Datum a, Datum b, void *arg) ...@@ -755,7 +755,10 @@ heap_compare_slots(Datum a, Datum b, void *arg)
datum2, isNull2, datum2, isNull2,
sortKey); sortKey);
if (compare != 0) if (compare != 0)
return -compare; {
INVERT_COMPARE_RESULT(compare);
return compare;
}
} }
return 0; return 0;
} }
...@@ -467,9 +467,10 @@ reorderqueue_cmp(const pairingheap_node *a, const pairingheap_node *b, ...@@ -467,9 +467,10 @@ reorderqueue_cmp(const pairingheap_node *a, const pairingheap_node *b,
ReorderTuple *rtb = (ReorderTuple *) b; ReorderTuple *rtb = (ReorderTuple *) b;
IndexScanState *node = (IndexScanState *) arg; IndexScanState *node = (IndexScanState *) arg;
return -cmp_orderbyvals(rta->orderbyvals, rta->orderbynulls, /* exchange argument order to invert the sort order */
rtb->orderbyvals, rtb->orderbynulls, return cmp_orderbyvals(rtb->orderbyvals, rtb->orderbynulls,
node); rta->orderbyvals, rta->orderbynulls,
node);
} }
/* /*
......
...@@ -338,7 +338,10 @@ heap_compare_slots(Datum a, Datum b, void *arg) ...@@ -338,7 +338,10 @@ heap_compare_slots(Datum a, Datum b, void *arg)
datum2, isNull2, datum2, isNull2,
sortKey); sortKey);
if (compare != 0) if (compare != 0)
return -compare; {
INVERT_COMPARE_RESULT(compare);
return compare;
}
} }
return 0; return 0;
} }
......
...@@ -810,7 +810,7 @@ final_filemap_cmp(const void *a, const void *b) ...@@ -810,7 +810,7 @@ final_filemap_cmp(const void *a, const void *b)
return -1; return -1;
if (fa->action == FILE_ACTION_REMOVE) if (fa->action == FILE_ACTION_REMOVE)
return -strcmp(fa->path, fb->path); return strcmp(fb->path, fa->path);
else else
return strcmp(fa->path, fb->path); return strcmp(fa->path, fb->path);
} }
...@@ -274,8 +274,7 @@ typedef struct BTMetaPageData ...@@ -274,8 +274,7 @@ typedef struct BTMetaPageData
* When a new operator class is declared, we require that the user * When a new operator class is declared, we require that the user
* supply us with an amproc procedure (BTORDER_PROC) for determining * supply us with an amproc procedure (BTORDER_PROC) for determining
* whether, for two keys a and b, a < b, a = b, or a > b. This routine * whether, for two keys a and b, a < b, a = b, or a > b. This routine
* must return < 0, 0, > 0, respectively, in these three cases. (It must * must return < 0, 0, > 0, respectively, in these three cases.
* not return INT_MIN, since we may negate the result before using it.)
* *
* To facilitate accelerated sorting, an operator class may choose to * To facilitate accelerated sorting, an operator class may choose to
* offer a second procedure (BTSORTSUPPORT_PROC). For full details, see * offer a second procedure (BTSORTSUPPORT_PROC). For full details, see
......
...@@ -1022,6 +1022,14 @@ extern void ExceptionalCondition(const char *conditionName, ...@@ -1022,6 +1022,14 @@ extern void ExceptionalCondition(const char *conditionName,
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
/*
* Invert the sign of a qsort-style comparison result, ie, exchange negative
* and positive integer values, being careful not to get the wrong answer
* for INT_MIN. The argument should be an integral variable.
*/
#define INVERT_COMPARE_RESULT(var) \
((var) = ((var) < 0) ? 1 : -(var))
/* /*
* Use this, not "char buf[BLCKSZ]", to declare a field or local variable * Use this, not "char buf[BLCKSZ]", to declare a field or local variable
* holding a page buffer, if that page might be accessed as a page and not * holding a page buffer, if that page might be accessed as a page and not
......
...@@ -96,8 +96,7 @@ typedef struct SortSupportData ...@@ -96,8 +96,7 @@ typedef struct SortSupportData
* Comparator function has the same API as the traditional btree * Comparator function has the same API as the traditional btree
* comparison function, ie, return <0, 0, or >0 according as x is less * comparison function, ie, return <0, 0, or >0 according as x is less
* than, equal to, or greater than y. Note that x and y are guaranteed * than, equal to, or greater than y. Note that x and y are guaranteed
* not null, and there is no way to return null either. Do not return * not null, and there is no way to return null either.
* INT_MIN, as callers are allowed to negate the result before using it.
* *
* This may be either the authoritative comparator, or the abbreviated * This may be either the authoritative comparator, or the abbreviated
* comparator. Core code may switch this over the initial preference of * comparator. Core code may switch this over the initial preference of
...@@ -224,7 +223,7 @@ ApplySortComparator(Datum datum1, bool isNull1, ...@@ -224,7 +223,7 @@ ApplySortComparator(Datum datum1, bool isNull1,
{ {
compare = ssup->comparator(datum1, datum2, ssup); compare = ssup->comparator(datum1, datum2, ssup);
if (ssup->ssup_reverse) if (ssup->ssup_reverse)
compare = -compare; INVERT_COMPARE_RESULT(compare);
} }
return compare; return compare;
...@@ -262,7 +261,7 @@ ApplySortAbbrevFullComparator(Datum datum1, bool isNull1, ...@@ -262,7 +261,7 @@ ApplySortAbbrevFullComparator(Datum datum1, bool isNull1,
{ {
compare = ssup->abbrev_full_comparator(datum1, datum2, ssup); compare = ssup->abbrev_full_comparator(datum1, datum2, ssup);
if (ssup->ssup_reverse) if (ssup->ssup_reverse)
compare = -compare; INVERT_COMPARE_RESULT(compare);
} }
return compare; return compare;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment