Commit 73912e7f authored by Tom Lane's avatar Tom Lane

Fix GIN to support null keys, empty and null items, and full index scans.

Per my recent proposal(s).  Null key datums can now be returned by
extractValue and extractQuery functions, and will be stored in the index.
Also, placeholder entries are made for indexable items that are NULL or
contain no keys according to extractValue.  This means that the index is
now always complete, having at least one entry for every indexed heap TID,
and so we can get rid of the prohibition on full-index scans.  A full-index
scan is implemented much the same way as partial-match scans were already:
we build a bitmap representing all the TIDs found in the index, and then
drive the results off that.

Also, introduce a concept of a "search mode" that can be requested by
extractQuery when the operator requires matching to empty items (this is
just as cheap as matching to a single key) or requires a full index scan
(which is not so cheap, but it sure beats failing or giving wrong answers).
The behavior remains backward compatible for opclasses that don't return
any null keys or request a non-default search mode.

Using these features, we can now make the GIN index opclass for anyarray
behave in a way that matches the actual anyarray operators for &&, <@, @>,
and = ... which it failed to do before in assorted corner cases.

This commit fixes the core GIN code and ginarrayprocs.c, updates the
documentation, and adds some simple regression test cases for the new
behaviors using the array operators.  The tsearch and contrib GIN opclass
support functions still need to be looked over and probably fixed.

Another thing I intend to fix separately is that this is pretty inefficient
for cases where more than one scan condition needs a full-index search:
we'll run duplicate GinScanEntrys, each one of which builds a large bitmap.
There is some existing logic to merge duplicate GinScanEntrys but it needs
refactoring to make it work for entries belonging to different scan keys.

Note that most of gin.h has been split out into a new file gin_private.h,
so that gin.h doesn't export anything that's not supposed to be used by GIN
opclasses or the rest of the backend.  I did quite a bit of other code
beautification work as well, mostly fixing comments and choosing more
appropriate names for things.
parent 9b4271de
......@@ -4,6 +4,7 @@
#include "postgres.h"
#include "access/gin.h"
#include "access/skey.h"
#include "catalog/pg_type.h"
#include "hstore.h"
......
This diff is collapsed.
This diff is collapsed.
......@@ -14,7 +14,9 @@
#include "postgres.h"
#include "access/gin.h"
#include "access/skey.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
......@@ -23,34 +25,23 @@
#define GinContainedStrategy 3
#define GinEqualStrategy 4
#define ARRAYCHECK(x) do { \
if ( ARR_HASNULL(x) ) \
ereport(ERROR, \
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \
errmsg("array must not contain null values"))); \
} while(0)
/*
* Function used as extractValue and extractQuery both
* extractValue support function
*/
Datum
ginarrayextract(PG_FUNCTION_ARGS)
{
ArrayType *array;
int32 *nentries = (int32 *) PG_GETARG_POINTER(1);
Datum *entries = NULL;
/* Make copy of array input to ensure it doesn't disappear while in use */
ArrayType *array = PG_GETARG_ARRAYTYPE_P_COPY(0);
int32 *nkeys = (int32 *) PG_GETARG_POINTER(1);
bool **nullFlags = (bool **) PG_GETARG_POINTER(2);
int16 elmlen;
bool elmbyval;
char elmalign;
/*
* we should guarantee that array will not be destroyed during all
* operation
*/
array = PG_GETARG_ARRAYTYPE_P_COPY(0);
ARRAYCHECK(array);
Datum *elems;
bool *nulls;
int nelems;
get_typlenbyvalalign(ARR_ELEMTYPE(array),
&elmlen, &elmbyval, &elmalign);
......@@ -58,89 +49,140 @@ ginarrayextract(PG_FUNCTION_ARGS)
deconstruct_array(array,
ARR_ELEMTYPE(array),
elmlen, elmbyval, elmalign,
&entries, NULL, (int *) nentries);
&elems, &nulls, &nelems);
if (*nentries == 0 && PG_NARGS() == 3)
{
switch (PG_GETARG_UINT16(2)) /* StrategyNumber */
{
case GinOverlapStrategy:
*nentries = -1; /* nobody can be found */
break;
case GinContainsStrategy:
case GinContainedStrategy:
case GinEqualStrategy:
default: /* require fullscan: GIN can't find void
* arrays */
break;
}
}
*nkeys = nelems;
*nullFlags = nulls;
/* we should not free array, entries[i] points into it */
PG_RETURN_POINTER(entries);
/* we should not free array, elems[i] points into it */
PG_RETURN_POINTER(elems);
}
/*
* extractQuery support function
*/
Datum
ginqueryarrayextract(PG_FUNCTION_ARGS)
{
PG_RETURN_DATUM(DirectFunctionCall3(ginarrayextract,
PG_GETARG_DATUM(0),
PG_GETARG_DATUM(1),
PG_GETARG_DATUM(2)));
/* Make copy of array input to ensure it doesn't disappear while in use */
ArrayType *array = PG_GETARG_ARRAYTYPE_P_COPY(0);
int32 *nkeys = (int32 *) PG_GETARG_POINTER(1);
StrategyNumber strategy = PG_GETARG_UINT16(2);
/* bool **pmatch = (bool **) PG_GETARG_POINTER(3); */
/* Pointer *extra_data = (Pointer *) PG_GETARG_POINTER(4); */
bool **nullFlags = (bool **) PG_GETARG_POINTER(5);
int32 *searchMode = (int32 *) PG_GETARG_POINTER(6);
int16 elmlen;
bool elmbyval;
char elmalign;
Datum *elems;
bool *nulls;
int nelems;
get_typlenbyvalalign(ARR_ELEMTYPE(array),
&elmlen, &elmbyval, &elmalign);
deconstruct_array(array,
ARR_ELEMTYPE(array),
elmlen, elmbyval, elmalign,
&elems, &nulls, &nelems);
*nkeys = nelems;
*nullFlags = nulls;
switch (strategy)
{
case GinOverlapStrategy:
*searchMode = GIN_SEARCH_MODE_DEFAULT;
break;
case GinContainsStrategy:
if (nelems > 0)
*searchMode = GIN_SEARCH_MODE_DEFAULT;
else /* everything contains the empty set */
*searchMode = GIN_SEARCH_MODE_ALL;
break;
case GinContainedStrategy:
/* empty set is contained in everything */
*searchMode = GIN_SEARCH_MODE_INCLUDE_EMPTY;
break;
case GinEqualStrategy:
if (nelems > 0)
*searchMode = GIN_SEARCH_MODE_DEFAULT;
else
*searchMode = GIN_SEARCH_MODE_INCLUDE_EMPTY;
break;
default:
elog(ERROR, "ginqueryarrayextract: unknown strategy number: %d",
strategy);
}
/* we should not free array, elems[i] points into it */
PG_RETURN_POINTER(elems);
}
/*
* consistent support function
*/
Datum
ginarrayconsistent(PG_FUNCTION_ARGS)
{
bool *check = (bool *) PG_GETARG_POINTER(0);
StrategyNumber strategy = PG_GETARG_UINT16(1);
ArrayType *query = PG_GETARG_ARRAYTYPE_P(2);
/* int32 nkeys = PG_GETARG_INT32(3); */
/* ArrayType *query = PG_GETARG_ARRAYTYPE_P(2); */
int32 nkeys = PG_GETARG_INT32(3);
/* Pointer *extra_data = (Pointer *) PG_GETARG_POINTER(4); */
bool *recheck = (bool *) PG_GETARG_POINTER(5);
/* Datum *queryKeys = (Datum *) PG_GETARG_POINTER(6); */
bool *nullFlags = (bool *) PG_GETARG_POINTER(7);
bool res;
int i,
nentries;
/* ARRAYCHECK was already done by previous ginarrayextract call */
int32 i;
switch (strategy)
{
case GinOverlapStrategy:
/* result is not lossy */
*recheck = false;
/* at least one element in check[] is true, so result = true */
res = true;
break;
case GinContainedStrategy:
/* we will need recheck */
*recheck = true;
/* at least one element in check[] is true, so result = true */
res = true;
/* must have a match for at least one non-null element */
res = false;
for (i = 0; i < nkeys; i++)
{
if (check[i] && !nullFlags[i])
{
res = true;
break;
}
}
break;
case GinContainsStrategy:
/* result is not lossy */
*recheck = false;
/* must have all elements in check[] true */
nentries = ArrayGetNItems(ARR_NDIM(query), ARR_DIMS(query));
/* must have all elements in check[] true, and no nulls */
res = true;
for (i = 0; i < nentries; i++)
for (i = 0; i < nkeys; i++)
{
if (!check[i])
if (!check[i] || nullFlags[i])
{
res = false;
break;
}
}
break;
case GinContainedStrategy:
/* we will need recheck */
*recheck = true;
/* can't do anything else useful here */
res = true;
break;
case GinEqualStrategy:
/* we will need recheck */
*recheck = true;
/* must have all elements in check[] true */
nentries = ArrayGetNItems(ARR_NDIM(query), ARR_DIMS(query));
/*
* Must have all elements in check[] true; no discrimination
* against nulls here. This is because array_contain_compare
* and array_eq handle nulls differently ...
*/
res = true;
for (i = 0; i < nentries; i++)
for (i = 0; i < nkeys; i++)
{
if (!check[i])
{
......
......@@ -14,7 +14,7 @@
#include "postgres.h"
#include "access/gin.h"
#include "access/gin_private.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
......@@ -104,7 +104,8 @@ ginFindLeafPage(GinBtree btree, GinBtreeStack *stack)
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
*/
while (btree->fullScan == FALSE && stack->blkno != rootBlkno && btree->isMoveRight(btree, page))
while (btree->fullScan == FALSE && stack->blkno != rootBlkno &&
btree->isMoveRight(btree, page))
{
BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
......@@ -226,7 +227,6 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
LockBuffer(root->buffer, GIN_UNLOCK);
Assert(blkno != InvalidBlockNumber);
for (;;)
{
buffer = ReadBuffer(btree->index, blkno);
......
......@@ -14,12 +14,12 @@
#include "postgres.h"
#include "access/gin.h"
#include "access/gin_private.h"
#include "utils/datum.h"
#include "utils/memutils.h"
#define DEF_NENTRY 2048 /* EntryAccumulator allocation quantum */
#define DEF_NENTRY 2048 /* GinEntryAccumulator allocation quantum */
#define DEF_NPTR 5 /* ItemPointer initial allocation quantum */
......@@ -27,48 +27,49 @@
static void
ginCombineData(RBNode *existing, const RBNode *newdata, void *arg)
{
EntryAccumulator *eo = (EntryAccumulator *) existing;
const EntryAccumulator *en = (const EntryAccumulator *) newdata;
GinEntryAccumulator *eo = (GinEntryAccumulator *) existing;
const GinEntryAccumulator *en = (const GinEntryAccumulator *) newdata;
BuildAccumulator *accum = (BuildAccumulator *) arg;
/*
* Note this code assumes that newdata contains only one itempointer.
*/
if (eo->number >= eo->length)
if (eo->count >= eo->maxcount)
{
accum->allocatedMemory -= GetMemoryChunkSpace(eo->list);
eo->length *= 2;
eo->list = (ItemPointerData *) repalloc(eo->list,
sizeof(ItemPointerData) * eo->length);
eo->maxcount *= 2;
eo->list = (ItemPointerData *)
repalloc(eo->list, sizeof(ItemPointerData) * eo->maxcount);
accum->allocatedMemory += GetMemoryChunkSpace(eo->list);
}
/* If item pointers are not ordered, they will need to be sorted. */
/* If item pointers are not ordered, they will need to be sorted later */
if (eo->shouldSort == FALSE)
{
int res;
res = ginCompareItemPointers(eo->list + eo->number - 1, en->list);
res = ginCompareItemPointers(eo->list + eo->count - 1, en->list);
Assert(res != 0);
if (res > 0)
eo->shouldSort = TRUE;
}
eo->list[eo->number] = en->list[0];
eo->number++;
eo->list[eo->count] = en->list[0];
eo->count++;
}
/* Comparator function for rbtree.c */
static int
cmpEntryAccumulator(const RBNode *a, const RBNode *b, void *arg)
{
const EntryAccumulator *ea = (const EntryAccumulator *) a;
const EntryAccumulator *eb = (const EntryAccumulator *) b;
const GinEntryAccumulator *ea = (const GinEntryAccumulator *) a;
const GinEntryAccumulator *eb = (const GinEntryAccumulator *) b;
BuildAccumulator *accum = (BuildAccumulator *) arg;
return ginCompareAttEntries(accum->ginstate, ea->attnum, ea->value,
eb->attnum, eb->value);
return ginCompareAttEntries(accum->ginstate,
ea->attnum, ea->key, ea->category,
eb->attnum, eb->key, eb->category);
}
/* Allocator function for rbtree.c */
......@@ -76,22 +77,22 @@ static RBNode *
ginAllocEntryAccumulator(void *arg)
{
BuildAccumulator *accum = (BuildAccumulator *) arg;
EntryAccumulator *ea;
GinEntryAccumulator *ea;
/*
* Allocate memory by rather big chunks to decrease overhead. We have
* no need to reclaim RBNodes individually, so this costs nothing.
*/
if (accum->entryallocator == NULL || accum->length >= DEF_NENTRY)
if (accum->entryallocator == NULL || accum->eas_used >= DEF_NENTRY)
{
accum->entryallocator = palloc(sizeof(EntryAccumulator) * DEF_NENTRY);
accum->entryallocator = palloc(sizeof(GinEntryAccumulator) * DEF_NENTRY);
accum->allocatedMemory += GetMemoryChunkSpace(accum->entryallocator);
accum->length = 0;
accum->eas_used = 0;
}
/* Allocate new RBNode from current chunk */
ea = accum->entryallocator + accum->length;
accum->length++;
ea = accum->entryallocator + accum->eas_used;
accum->eas_used++;
return (RBNode *) ea;
}
......@@ -99,10 +100,11 @@ ginAllocEntryAccumulator(void *arg)
void
ginInitBA(BuildAccumulator *accum)
{
/* accum->ginstate is intentionally not set here */
accum->allocatedMemory = 0;
accum->length = 0;
accum->entryallocator = NULL;
accum->tree = rb_create(sizeof(EntryAccumulator),
accum->eas_used = 0;
accum->tree = rb_create(sizeof(GinEntryAccumulator),
cmpEntryAccumulator,
ginCombineData,
ginAllocEntryAccumulator,
......@@ -111,8 +113,8 @@ ginInitBA(BuildAccumulator *accum)
}
/*
* This is basically the same as datumCopy(), but modified to count
* palloc'd space in accum.
* This is basically the same as datumCopy(), but extended to count
* palloc'd space in accum->allocatedMemory.
*/
static Datum
getDatumCopy(BuildAccumulator *accum, OffsetNumber attnum, Datum value)
......@@ -134,32 +136,37 @@ getDatumCopy(BuildAccumulator *accum, OffsetNumber attnum, Datum value)
* Find/store one entry from indexed value.
*/
static void
ginInsertEntry(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum entry)
ginInsertBAEntry(BuildAccumulator *accum,
ItemPointer heapptr, OffsetNumber attnum,
Datum key, GinNullCategory category)
{
EntryAccumulator key;
EntryAccumulator *ea;
GinEntryAccumulator eatmp;
GinEntryAccumulator *ea;
bool isNew;
/*
* For the moment, fill only the fields of key that will be looked at
* For the moment, fill only the fields of eatmp that will be looked at
* by cmpEntryAccumulator or ginCombineData.
*/
key.attnum = attnum;
key.value = entry;
eatmp.attnum = attnum;
eatmp.key = key;
eatmp.category = category;
/* temporarily set up single-entry itempointer list */
key.list = heapptr;
eatmp.list = heapptr;
ea = (EntryAccumulator *) rb_insert(accum->tree, (RBNode *) &key, &isNew);
ea = (GinEntryAccumulator *) rb_insert(accum->tree, (RBNode *) &eatmp,
&isNew);
if (isNew)
{
/*
* Finish initializing new tree entry, including making permanent
* copies of the datum and itempointer.
* copies of the datum (if it's not null) and itempointer.
*/
ea->value = getDatumCopy(accum, attnum, entry);
ea->length = DEF_NPTR;
ea->number = 1;
if (category == GIN_CAT_NORM_KEY)
ea->key = getDatumCopy(accum, attnum, key);
ea->maxcount = DEF_NPTR;
ea->count = 1;
ea->shouldSort = FALSE;
ea->list =
(ItemPointerData *) palloc(sizeof(ItemPointerData) * DEF_NPTR);
......@@ -175,7 +182,7 @@ ginInsertEntry(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum
}
/*
* Insert one heap pointer.
* Insert the entries for one heap pointer.
*
* Since the entries are being inserted into a balanced binary tree, you
* might think that the order of insertion wouldn't be critical, but it turns
......@@ -187,22 +194,24 @@ ginInsertEntry(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum
* We do this as follows. First, we imagine that we have an array whose size
* is the smallest power of two greater than or equal to the actual array
* size. Second, we insert the middle entry of our virtual array into the
* tree; then, we insert the middles of each half of out virtual array, then
* tree; then, we insert the middles of each half of our virtual array, then
* middles of quarters, etc.
*/
void
ginInsertRecordBA(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum,
Datum *entries, int32 nentry)
ginInsertBAEntries(BuildAccumulator *accum,
ItemPointer heapptr, OffsetNumber attnum,
Datum *entries, GinNullCategory *categories,
int32 nentries)
{
uint32 step = nentry;
uint32 step = nentries;
if (nentry <= 0)
if (nentries <= 0)
return;
Assert(ItemPointerIsValid(heapptr) && attnum >= FirstOffsetNumber);
/*
* step will contain largest power of 2 and <= nentry
* step will contain largest power of 2 and <= nentries
*/
step |= (step >> 1);
step |= (step >> 2);
......@@ -216,8 +225,9 @@ ginInsertRecordBA(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber att
{
int i;
for (i = step - 1; i < nentry && i >= 0; i += step << 1 /* *2 */ )
ginInsertEntry(accum, heapptr, attnum, entries[i]);
for (i = step - 1; i < nentries && i >= 0; i += step << 1 /* *2 */ )
ginInsertBAEntry(accum, heapptr, attnum,
entries[i], categories[i]);
step >>= 1; /* /2 */
}
......@@ -228,37 +238,47 @@ qsortCompareItemPointers(const void *a, const void *b)
{
int res = ginCompareItemPointers((ItemPointer) a, (ItemPointer) b);
/* Assert that there are no equal item pointers being sorted */
Assert(res != 0);
return res;
}
/* Prepare to read out the rbtree contents using ginGetEntry */
/* Prepare to read out the rbtree contents using ginGetBAEntry */
void
ginBeginBAScan(BuildAccumulator *accum)
{
rb_begin_iterate(accum->tree, LeftRightWalk);
}
/*
* Get the next entry in sequence from the BuildAccumulator's rbtree.
* This consists of a single key datum and a list (array) of one or more
* heap TIDs in which that key is found. The list is guaranteed sorted.
*/
ItemPointerData *
ginGetEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *value, uint32 *n)
ginGetBAEntry(BuildAccumulator *accum,
OffsetNumber *attnum, Datum *key, GinNullCategory *category,
uint32 *n)
{
EntryAccumulator *entry;
GinEntryAccumulator *entry;
ItemPointerData *list;
entry = (EntryAccumulator *) rb_iterate(accum->tree);
entry = (GinEntryAccumulator *) rb_iterate(accum->tree);
if (entry == NULL)
return NULL;
return NULL; /* no more entries */
*n = entry->number;
*attnum = entry->attnum;
*value = entry->value;
*key = entry->key;
*category = entry->category;
list = entry->list;
*n = entry->count;
Assert(list != NULL);
Assert(list != NULL && entry->count > 0);
if (entry->shouldSort && entry->number > 1)
qsort(list, *n, sizeof(ItemPointerData), qsortCompareItemPointers);
if (entry->shouldSort && entry->count > 1)
qsort(list, entry->count, sizeof(ItemPointerData),
qsortCompareItemPointers);
return list;
}
......@@ -14,21 +14,27 @@
#include "postgres.h"
#include "access/gin.h"
#include "access/gin_private.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
int
ginCompareItemPointers(ItemPointer a, ItemPointer b)
{
if (GinItemPointerGetBlockNumber(a) == GinItemPointerGetBlockNumber(b))
BlockNumber ba = GinItemPointerGetBlockNumber(a);
BlockNumber bb = GinItemPointerGetBlockNumber(b);
if (ba == bb)
{
if (GinItemPointerGetOffsetNumber(a) == GinItemPointerGetOffsetNumber(b))
OffsetNumber oa = GinItemPointerGetOffsetNumber(a);
OffsetNumber ob = GinItemPointerGetOffsetNumber(b);
if (oa == ob)
return 0;
return (GinItemPointerGetOffsetNumber(a) > GinItemPointerGetOffsetNumber(b)) ? 1 : -1;
return (oa > ob) ? 1 : -1;
}
return (GinItemPointerGetBlockNumber(a) > GinItemPointerGetBlockNumber(b)) ? 1 : -1;
return (ba > bb) ? 1 : -1;
}
/*
......@@ -122,12 +128,13 @@ dataLocateItem(GinBtree btree, GinBtreeStack *stack)
pitem = (PostingItem *) GinDataPageGetItem(page, mid);
if (mid == maxoff)
{
/*
* Right infinity, page already correctly chosen with a help of
* dataIsMoveRight
*/
result = -1;
}
else
{
pitem = (PostingItem *) GinDataPageGetItem(page, mid);
......@@ -220,7 +227,7 @@ dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber stor
Assert(!GinPageIsLeaf(page));
Assert(GinPageIsData(page));
/* if page isn't changed, we returns storedOff */
/* if page isn't changed, we return storedOff */
if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
{
pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
......@@ -286,9 +293,11 @@ GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
{
ptr = GinDataPageGetItem(page, offset);
if (maxoff + 1 - offset != 0)
memmove(ptr + GinSizeOfItem(page), ptr, (maxoff - offset + 1) * GinSizeOfItem(page));
memmove(ptr + GinSizeOfDataPageItem(page),
ptr,
(maxoff - offset + 1) * GinSizeOfDataPageItem(page));
}
memcpy(ptr, data, GinSizeOfItem(page));
memcpy(ptr, data, GinSizeOfDataPageItem(page));
GinPageGetOpaque(page)->maxoff++;
}
......@@ -372,10 +381,11 @@ static void
dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
{
Page page = BufferGetPage(buf);
int sizeofitem = GinSizeOfDataPageItem(page);
int cnt = 0;
/* these must be static so they can be returned to caller */
static XLogRecData rdata[3];
int sizeofitem = GinSizeOfItem(page);
static ginxlogInsert data;
int cnt = 0;
*prdata = rdata;
Assert(GinPageIsData(page));
......@@ -453,20 +463,21 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prda
static Page
dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
{
static ginxlogSplit data;
static XLogRecData rdata[4];
static char vector[2 * BLCKSZ];
char *ptr;
OffsetNumber separator;
ItemPointer bound;
Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
int sizeofitem = GinSizeOfItem(lpage);
int sizeofitem = GinSizeOfDataPageItem(lpage);
OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
Page rpage = BufferGetPage(rbuf);
Size pageSize = PageGetPageSize(lpage);
Size freeSpace;
uint32 nCopied = 1;
/* these must be static so they can be returned to caller */
static ginxlogSplit data;
static XLogRecData rdata[4];
static char vector[2 * BLCKSZ];
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
freeSpace = GinDataPageGetFreeSpace(rpage);
......@@ -482,9 +493,11 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{
nCopied = 0;
while (btree->curitem < btree->nitem && maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
while (btree->curitem < btree->nitem &&
maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
{
memcpy(vector + maxoff * sizeof(ItemPointerData), btree->items + btree->curitem,
memcpy(vector + maxoff * sizeof(ItemPointerData),
btree->items + btree->curitem,
sizeof(ItemPointerData));
maxoff++;
nCopied++;
......@@ -631,9 +644,9 @@ ginPrepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode
* Inserts array of item pointers, may execute several tree scan (very rare)
*/
void
ginInsertItemPointer(GinPostingTreeScan *gdi,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats)
ginInsertItemPointers(GinPostingTreeScan *gdi,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats)
{
BlockNumber rootBlkno = gdi->stack->blkno;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -14,8 +14,7 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/gin.h"
#include "access/gin_private.h"
#include "catalog/storage.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
......@@ -190,7 +189,6 @@ ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
/* saves changes about deleted tuple ... */
if (oldMaxOff != newMaxOff)
{
START_CRIT_SECTION();
if (newMaxOff > 0)
......@@ -519,7 +517,7 @@ ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint3
* store posting tree's roots for further processing, we can't
* vacuum it just now due to risk of deadlocks with scans/inserts
*/
roots[*nroot] = GinItemPointerGetBlockNumber(&itup->t_tid);
roots[*nroot] = GinGetDownlink(itup);
(*nroot)++;
}
else if (GinGetNPosting(itup) > 0)
......@@ -533,8 +531,9 @@ ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint3
if (GinGetNPosting(itup) != newN)
{
Datum value;
OffsetNumber attnum;
Datum key;
GinNullCategory category;
/*
* Some ItemPointers was deleted, so we should remake our
......@@ -562,9 +561,9 @@ ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint3
itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
}
value = gin_index_getattr(&gvs->ginstate, itup);
attnum = gintuple_get_attrnum(&gvs->ginstate, itup);
itup = GinFormTuple(gvs->index, &gvs->ginstate, attnum, value,
key = gintuple_get_key(&gvs->ginstate, itup, &category);
itup = GinFormTuple(&gvs->ginstate, attnum, key, category,
GinGetPosting(itup), newN, true);
PageIndexTupleDelete(tmppage, i);
......@@ -606,7 +605,7 @@ ginbulkdelete(PG_FUNCTION_ARGS)
/* Yes, so initialize stats to zeroes */
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* and cleanup any pending inserts */
ginInsertCleanup(index, &gvs.ginstate, true, stats);
ginInsertCleanup(&gvs.ginstate, true, stats);
}
/* we'll re-count the tuples each time */
......@@ -642,7 +641,7 @@ ginbulkdelete(PG_FUNCTION_ARGS)
Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
blkno = GinGetDownlink(itup);
Assert(blkno != InvalidBlockNumber);
UnlockReleaseBuffer(buffer);
......@@ -719,7 +718,7 @@ ginvacuumcleanup(PG_FUNCTION_ARGS)
if (IsAutoVacuumWorkerProcess())
{
initGinState(&ginstate, index);
ginInsertCleanup(index, &ginstate, true, stats);
ginInsertCleanup(&ginstate, true, stats);
}
PG_RETURN_POINTER(stats);
}
......@@ -732,7 +731,7 @@ ginvacuumcleanup(PG_FUNCTION_ARGS)
{
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
initGinState(&ginstate, index);
ginInsertCleanup(index, &ginstate, true, stats);
ginInsertCleanup(&ginstate, true, stats);
}
memset(&idxStat, 0, sizeof(idxStat));
......
......@@ -13,7 +13,7 @@
*/
#include "postgres.h"
#include "access/gin.h"
#include "access/gin_private.h"
#include "access/xlogutils.h"
#include "storage/bufmgr.h"
#include "utils/memutils.h"
......@@ -152,7 +152,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node,
GinItemPointerGetBlockNumber(&itup->t_tid),
GinGetDownlink(itup),
data->updateBlkno);
}
}
......@@ -213,7 +213,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Assert(!GinPageIsLeaf(page));
Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
GinSetDownlink(itup, data->updateBlkno);
}
if (data->isDelete)
......@@ -270,7 +270,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isData)
{
char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
Size sizeofitem = GinSizeOfItem(lpage);
Size sizeofitem = GinSizeOfDataPageItem(lpage);
OffsetNumber i;
ItemPointer bound;
......@@ -380,8 +380,9 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
{
if (GinPageIsData(page))
{
memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
GinSizeOfItem(page) *data->nitem);
memcpy(GinDataPageGetData(page),
XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
data->nitem * GinSizeOfDataPageItem(page));
GinPageGetOpaque(page)->maxoff = data->nitem;
}
else
......@@ -792,6 +793,7 @@ static void
ginContinueSplit(ginIncompleteSplit *split)
{
GinBtreeData btree;
GinState ginstate;
Relation reln;
Buffer buffer;
GinBtreeStack stack;
......@@ -813,7 +815,12 @@ ginContinueSplit(ginIncompleteSplit *split)
if (split->rootBlkno == GIN_ROOT_BLKNO)
{
ginPrepareEntryScan(&btree, reln, InvalidOffsetNumber, (Datum) 0, NULL);
MemSet(&ginstate, 0, sizeof(ginstate));
ginstate.index = reln;
ginPrepareEntryScan(&btree,
InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
&ginstate);
btree.entry = ginPageGetLinkItup(buffer);
}
else
......
This diff is collapsed.
This diff is collapsed.
......@@ -1047,6 +1047,11 @@ extern Datum window_first_value(PG_FUNCTION_ARGS);
extern Datum window_last_value(PG_FUNCTION_ARGS);
extern Datum window_nth_value(PG_FUNCTION_ARGS);
/* access/gin/ginarrayproc.c */
extern Datum ginarrayextract(PG_FUNCTION_ARGS);
extern Datum ginqueryarrayextract(PG_FUNCTION_ARGS);
extern Datum ginarrayconsistent(PG_FUNCTION_ARGS);
/* access/transam/twophase.c */
extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
......
......@@ -98,3 +98,6 @@
98 {38,34,32,89} {AAAAAAAAAAAAAAAAAA71621,AAAA8857,AAAAAAAAAAAAAAAAAAA65037,AAAAAAAAAAAAAAAA31334,AAAAAAAAAA48845}
99 {37,86} {AAAAAAAAAAAAAAAAAA32918,AAAAA70514,AAAAAAAAA10012,AAAAAAAAAAAAAAAAA59387,AAAAAAAAAA64777,AAAAAAAAAAAAAAAAAAA15356}
100 {85,32,57,39,49,84,32,3,30} {AAAAAAA80240,AAAAAAAAAAAAAAAA1729,AAAAA60038,AAAAAAAAAAA92631,AAAAAAAA9523}
101 {} {}
102 {NULL} {NULL}
103 \N \N
This diff is collapsed.
This diff is collapsed.
......@@ -203,6 +203,14 @@ SELECT * FROM array_op_test WHERE i && '{17}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i @> '{32,17}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i && '{32,17}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i <@ '{38,34,32,89}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i = '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i @> '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i && '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i <@ '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i = '{NULL}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i @> '{NULL}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i && '{NULL}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE i <@ '{NULL}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t @> '{AAAAAAAA72908}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t && '{AAAAAAAA72908}' ORDER BY seqno;
......@@ -211,6 +219,10 @@ SELECT * FROM array_op_test WHERE t && '{AAAAAAAAAA646}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t @> '{AAAAAAAA72908,AAAAAAAAAA646}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t && '{AAAAAAAA72908,AAAAAAAAAA646}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t <@ '{AAAAAAAA72908,AAAAAAAAAAAAAAAAAAA17075,AA88409,AAAAAAAAAAAAAAAAAA36842,AAAAAAA48038,AAAAAAAAAAAAAA10611}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t = '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t @> '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t && '{}' ORDER BY seqno;
SELECT * FROM array_op_test WHERE t <@ '{}' ORDER BY seqno;
-- array casts
SELECT ARRAY[1,2,3]::text[]::int[]::float8[] AS "{1,2,3}";
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment