Commit 36a35c55 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Compress GIN posting lists, for smaller index size.

GIN posting lists are now encoded using varbyte-encoding, which allows them
to fit in much smaller space than the straight ItemPointer array format used
before. The new encoding is used for both the lists stored in-line in entry
tree items, and in posting tree leaf pages.

To maintain backwards-compatibility and keep pg_upgrade working, the code
can still read old-style pages and tuples. Posting tree leaf pages in the
new format are flagged with GIN_COMPRESSED flag, to distinguish old and new
format pages. Likewise, entry tree tuples in the new format have a
GIN_ITUP_COMPRESSED flag set in a bit that was previously unused.

This patch bumps GIN_CURRENT_VERSION from 1 to 2. New indexes created with
version 9.4 will therefore have version number 2 in the metapage, while old
pg_upgraded indexes will have version 1. The code treats them the same, but
it might be come handy in the future, if we want to drop support for the
uncompressed format.

Alexander Korotkov and me. Reviewed by Tomas Vondra and Amit Langote.
parent 243ee266
......@@ -123,6 +123,6 @@ create index test_ginidx on test using gin (b);
select * from pgstatginindex('test_ginidx');
version | pending_pages | pending_tuples
---------+---------------+----------------
1 | 0 | 0
2 | 0 | 0
(1 row)
......@@ -135,15 +135,15 @@ same category of null entry are merged into one index entry just as happens
with ordinary key entries.
* In a key entry at the btree leaf level, at the next SHORTALIGN boundary,
there is an array of zero or more ItemPointers, which store the heap tuple
TIDs for which the indexable items contain this key. This is called the
"posting list". The TIDs in a posting list must appear in sorted order.
If the list would be too big for the index tuple to fit on an index page,
the ItemPointers are pushed out to a separate posting page or pages, and
none appear in the key entry itself. The separate pages are called a
"posting tree"; they are organized as a btree of ItemPointer values.
Note that in either case, the ItemPointers associated with a key can
easily be read out in sorted order; this is relied on by the scan
there is a list of item pointers, in compressed format (see Posting List
Compression section), pointing to the heap tuples for which the indexable
items contain this key. This is called the "posting list".
If the list would be too big for the index tuple to fit on an index page, the
ItemPointers are pushed out to a separate posting page or pages, and none
appear in the key entry itself. The separate pages are called a "posting
tree" (see below); Note that in either case, the ItemPointers associated with
a key can easily be read out in sorted order; this is relied on by the scan
algorithms.
* The index tuple header fields of a leaf key entry are abused as follows:
......@@ -163,6 +163,11 @@ algorithms.
* The posting list can be accessed with GinGetPosting(itup)
* If GinITupIsCompressed(itup), the posting list is stored in compressed
format. Otherwise it is just an array of ItemPointers. New tuples are always
stored in compressed format, uncompressed items can be present if the
database was migrated from 9.3 or earlier version.
2) Posting tree case:
* ItemPointerGetBlockNumber(&itup->t_tid) contains the index block number
......@@ -210,6 +215,76 @@ fit on one pending-list page must have those pages to itself, even if this
results in wasting much of the space on the preceding page and the last
page for the tuple.)
Posting tree
------------
If a posting list is too large to store in-line in a key entry, a posting tree
is created. A posting tree is a B-tree structure, where the ItemPointer is
used as the key.
Internal posting tree pages use the standard PageHeader and the same "opaque"
struct as other GIN page, but do not contain regular index tuples. Instead,
the contents of the page is an array of PostingItem structs. Each PostingItem
consists of the block number of the child page, and the right bound of that
child page, as an ItemPointer. The right bound of the page is stored right
after the page header, before the PostingItem array.
Posting tree leaf pages also use the standard PageHeader and opaque struct,
and the right bound of the page is stored right after the page header,
but the page content comprises of 0-32 compressed posting lists, and an
additional array of regular uncompressed item pointers. The compressed posting
lists are stored one after each other, between page header and pd_lower. The
uncompressed array is stored between pd_upper and pd_special. The space
between pd_lower and pd_upper is unused, which allows full-page images of
posting tree leaf pages to skip the unused space in middle (buffer_std = true
in XLogRecData). For historical reasons, this does not apply to internal
pages, or uncompressed leaf pages migrated from earlier versions.
The item pointers are stored in a number of independent compressed posting
lists (also called segments), instead of one big one, to make random access
to a given item pointer faster: to find an item in a compressed list, you
have to read the list from the beginning, but when the items are split into
multiple lists, you can first skip over to the list containing the item you're
looking for, and read only that segment. Also, an update only needs to
re-encode the affected segment.
The uncompressed items array is used for insertions, to avoid re-encoding
a compressed list on every update. If there is room on a page, an insertion
simply inserts the new item to the right place in the uncompressed array.
When a page becomes full, it is rewritten, merging all the uncompressed items
are into the compressed lists. When reading, the uncompressed array and the
compressed lists are read in tandem, and merged into one stream of sorted
item pointers.
Posting List Compression
------------------------
To fit as many item pointers on a page as possible, posting tree leaf pages
and posting lists stored inline in entry tree leaf tuples use a lightweight
form of compression. We take advantage of the fact that the item pointers
are stored in sorted order. Instead of storing the block and offset number of
each item pointer separately, we store the difference from the previous item.
That in itself doesn't do much, but it allows us to use so-called varbyte
encoding to compress them.
Varbyte encoding is a method to encode integers, allowing smaller numbers to
take less space at the cost of larger numbers. Each integer is represented by
variable number of bytes. High bit of each byte in varbyte encoding determines
whether the next byte is still part of this number. Therefore, to read a single
varbyte encoded number, you have to read bytes until you find a byte with the
high bit not set.
When encoding, the block and offset number forming the item pointer are
combined into a single integer. The offset number is stored in the 11 low
bits (see MaxHeapTuplesPerPageBits in ginpostinglist.c), and the block number
is stored in the higher bits. That requires 43 bits in total, which
conveniently fits in at most 6 bytes.
A compressed posting list is passed around and stored on disk in a
PackedPostingList struct. The first item in the list is stored uncompressed
as a regular ItemPointerData, followed by the length of the list in bytes,
followed by the packed items.
Concurrency
-----------
......@@ -260,6 +335,36 @@ page-deletions safe; it stamps the deleted pages with an XID and keeps the
deleted pages around with the right-link intact until all concurrent scans
have finished.)
Compatibility
-------------
Compression of TIDs was introduced in 9.4. Some GIN indexes could remain in
uncompressed format because of pg_upgrade from 9.3 or earlier versions.
For compatibility, old uncompressed format is also supported. Following
rules are used to handle it:
* GIN_ITUP_COMPRESSED flag marks index tuples that contain a posting list.
This flag is stored in high bit of ItemPointerGetBlockNumber(&itup->t_tid).
Use GinItupIsCompressed(itup) to check the flag.
* Posting tree pages in the new format are marked with the GIN_COMPRESSED flag.
Macros GinPageIsCompressed(page) and GinPageSetCompressed(page) are used to
check and set this flag.
* All scan operations check format of posting list add use corresponding code
to read its content.
* When updating an index tuple containing an uncompressed posting list, it
will be replaced with new index tuple containing a compressed list.
* When updating an uncompressed posting tree leaf page, it's compressed.
* If vacuum finds some dead TIDs in uncompressed posting lists, they are
converted into compressed posting lists. This assumes that the compressed
posting list fits in the space occupied by the uncompressed list. IOW, we
assume that the compressed version of the page, with the dead items removed,
takes less space than the old uncompressed version.
Limitations
-----------
......
......@@ -325,9 +325,10 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
{
Page page = BufferGetPage(stack->buffer);
XLogRecData *payloadrdata;
bool fit;
GinPlaceToPageRC rc;
uint16 xlflags = 0;
Page childpage = NULL;
Page newlpage = NULL, newrpage = NULL;
if (GinPageIsData(page))
xlflags |= GIN_INSERT_ISDATA;
......@@ -345,16 +346,17 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
}
/*
* Try to put the incoming tuple on the page. If it doesn't fit,
* placeToPage method will return false and leave the page unmodified, and
* we'll have to split the page.
* Try to put the incoming tuple on the page. placeToPage will decide
* if the page needs to be split.
*/
START_CRIT_SECTION();
fit = btree->placeToPage(btree, stack->buffer, stack->off,
insertdata, updateblkno,
&payloadrdata);
if (fit)
rc = btree->placeToPage(btree, stack->buffer, stack,
insertdata, updateblkno,
&payloadrdata, &newlpage, &newrpage);
if (rc == UNMODIFIED)
return true;
else if (rc == INSERTED)
{
/* placeToPage did START_CRIT_SECTION() */
MarkBufferDirty(stack->buffer);
/* An insert to an internal page finishes the split of the child. */
......@@ -373,7 +375,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
xlrec.node = btree->index->rd_node;
xlrec.blkno = BufferGetBlockNumber(stack->buffer);
xlrec.offset = stack->off;
xlrec.flags = xlflags;
rdata[0].buffer = InvalidBuffer;
......@@ -415,20 +416,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
return true;
}
else
else if (rc == SPLIT)
{
/* Didn't fit, have to split */
Buffer rbuffer;
Page newlpage;
BlockNumber savedRightLink;
Page rpage;
XLogRecData rdata[2];
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
......@@ -443,12 +440,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
savedRightLink = GinPageGetOpaque(page)->rightlink;
/*
* newlpage is a pointer to memory page, it is not associated with a
* buffer. stack->buffer is not touched yet.
* newlpage and newrpage are pointers to memory pages, not associated
* with buffers. stack->buffer is not touched yet.
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
insertdata, updateblkno,
&payloadrdata);
data.node = btree->index->rd_node;
data.rblkno = BufferGetBlockNumber(rbuffer);
......@@ -481,8 +475,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
else
rdata[0].next = payloadrdata;
rpage = BufferGetPage(rbuffer);
if (stack->parent == NULL)
{
/*
......@@ -508,7 +500,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
data.lblkno = BufferGetBlockNumber(lbuffer);
data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
/*
......@@ -517,12 +509,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
* than overwriting the original page directly, so that we can still
* abort gracefully if this fails.)
*/
newrootpg = PageGetTempPage(rpage);
GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
newrootpg = PageGetTempPage(newrpage);
GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
btree->fillRoot(btree, newrootpg,
BufferGetBlockNumber(lbuffer), newlpage,
BufferGetBlockNumber(rbuffer), rpage);
BufferGetBlockNumber(rbuffer), newrpage);
}
else
{
......@@ -530,7 +522,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
data.rrlink = savedRightLink;
data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(rpage)->rightlink = savedRightLink;
GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
}
......@@ -550,16 +542,24 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION();
MarkBufferDirty(rbuffer);
MarkBufferDirty(stack->buffer);
/*
* Restore the temporary copies over the real buffers. But don't free
* the temporary copies yet, WAL record data points to them.
*/
if (stack->parent == NULL)
{
PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
MarkBufferDirty(lbuffer);
newlpage = newrootpg;
memcpy(BufferGetPage(stack->buffer), newrootpg, BLCKSZ);
memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
}
else
{
memcpy(BufferGetPage(stack->buffer), newlpage, BLCKSZ);
memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
}
PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
MarkBufferDirty(stack->buffer);
/* write WAL record */
if (RelationNeedsWAL(btree->index))
......@@ -568,7 +568,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(BufferGetPage(stack->buffer), recptr);
PageSetLSN(rpage, recptr);
PageSetLSN(BufferGetPage(rbuffer), recptr);
if (stack->parent == NULL)
PageSetLSN(BufferGetPage(lbuffer), recptr);
}
......@@ -582,6 +582,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
if (stack->parent == NULL)
UnlockReleaseBuffer(lbuffer);
pfree(newlpage);
pfree(newrpage);
if (newrootpg)
pfree(newrootpg);
/*
* If we split the root, we're done. Otherwise the split is not
* complete until the downlink for the new page has been inserted to
......@@ -592,6 +597,8 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
else
return false;
}
else
elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc);
}
/*
......
This diff is collapsed.
/*-------------------------------------------------------------------------
*
* ginentrypage.c
* page utilities routines for the postgres inverted index access method.
* routines for handling GIN entry tree pages.
*
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
......@@ -15,8 +15,15 @@
#include "postgres.h"
#include "access/gin_private.h"
#include "miscadmin.h"
#include "utils/rel.h"
static void entrySplitPage(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertPayload,
BlockNumber updateblkno, XLogRecData **prdata,
Page *newlpage, Page *newrpage);
/*
* Form a tuple for entry tree.
*
......@@ -27,15 +34,15 @@
* format that is being built here. We build on the assumption that we
* are making a leaf-level key entry containing a posting list of nipd items.
* If the caller is actually trying to make a posting-tree entry, non-leaf
* entry, or pending-list entry, it should pass nipd = 0 and then overwrite
* the t_tid fields as necessary. In any case, ipd can be NULL to skip
* copying any itempointers into the posting list; the caller is responsible
* for filling the posting list afterwards, if ipd = NULL and nipd > 0.
* entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
* the t_tid fields as necessary. In any case, 'data' can be NULL to skip
* filling in the posting list; the caller is responsible for filling it
* afterwards if data = NULL and nipd > 0.
*/
IndexTuple
GinFormTuple(GinState *ginstate,
OffsetNumber attnum, Datum key, GinNullCategory category,
ItemPointerData *ipd, uint32 nipd,
Pointer data, Size dataSize, int nipd,
bool errorTooBig)
{
Datum datums[2];
......@@ -80,27 +87,25 @@ GinFormTuple(GinState *ginstate,
newsize = Max(newsize, minsize);
}
newsize = SHORTALIGN(newsize);
GinSetPostingOffset(itup, newsize);
GinSetNPosting(itup, nipd);
/*
* Add space needed for posting list, if any. Then check that the tuple
* won't be too big to store.
*/
newsize += sizeof(ItemPointerData) * nipd;
newsize += dataSize;
newsize = MAXALIGN(newsize);
if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
if (newsize > GinMaxItemSize)
{
if (errorTooBig)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
(unsigned long) newsize,
(unsigned long) Min(INDEX_SIZE_MASK,
GinMaxItemSize),
(unsigned long) GinMaxItemSize,
RelationGetRelationName(ginstate->index))));
pfree(itup);
return NULL;
......@@ -119,12 +124,20 @@ GinFormTuple(GinState *ginstate,
*/
memset((char *) itup + IndexTupleSize(itup),
0, newsize - IndexTupleSize(itup));
/* set new size in tuple header */
itup->t_info &= ~INDEX_SIZE_MASK;
itup->t_info |= newsize;
}
/*
* Copy in the posting list, if provided
*/
if (data)
{
char *ptr = GinGetPosting(itup);
memcpy(ptr, data, dataSize);
}
/*
* Insert category byte, if needed
*/
......@@ -133,37 +146,45 @@ GinFormTuple(GinState *ginstate,
Assert(IndexTupleHasNulls(itup));
GinSetNullCategory(itup, ginstate, category);
}
/*
* Copy in the posting list, if provided
*/
if (ipd)
memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
return itup;
}
/*
* Sometimes we reduce the number of posting list items in a tuple after
* having built it with GinFormTuple. This function adjusts the size
* fields to match.
* Read item pointers from leaf entry tuple.
*
* Returns a palloc'd array of ItemPointers. The number of items is returned
* in *nitems.
*/
void
GinShortenTuple(IndexTuple itup, uint32 nipd)
ItemPointer
ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
int *nitems)
{
uint32 newsize;
Assert(nipd <= GinGetNPosting(itup));
Pointer ptr = GinGetPosting(itup);
int nipd = GinGetNPosting(itup);
ItemPointer ipd;
int ndecoded;
newsize = GinGetPostingOffset(itup) + sizeof(ItemPointerData) * nipd;
newsize = MAXALIGN(newsize);
Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK));
itup->t_info &= ~INDEX_SIZE_MASK;
itup->t_info |= newsize;
GinSetNPosting(itup, nipd);
if (GinItupIsCompressed(itup))
{
if (nipd > 0)
{
ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
if (nipd != ndecoded)
elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
nipd, ndecoded);
}
else
{
ipd = palloc(0);
}
}
else
{
ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
}
*nitems = nipd;
return ipd;
}
/*
......@@ -492,13 +513,14 @@ entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
* the downlink of the existing item at 'off' is updated to point to
* 'updateblkno'.
*/
static bool
entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
static GinPlaceToPageRC
entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
void *insertPayload, BlockNumber updateblkno,
XLogRecData **prdata)
XLogRecData **prdata, Page *newlpage, Page *newrpage)
{
GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf);
OffsetNumber off = stack->off;
OffsetNumber placed;
int cnt = 0;
......@@ -508,7 +530,13 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
/* quick exit if it doesn't fit */
if (!entryIsEnoughSpace(btree, buf, off, insertData))
return false;
{
entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
prdata, newlpage, newrpage);
return SPLIT;
}
START_CRIT_SECTION();
*prdata = rdata;
entryPreparePage(btree, page, off, insertData, updateblkno);
......@@ -522,6 +550,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
RelationGetRelationName(btree->index));
data.isDelete = insertData->isDelete;
data.offset = off;
rdata[cnt].buffer = buf;
rdata[cnt].buffer_std = false;
......@@ -536,21 +565,24 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
rdata[cnt].len = IndexTupleSize(insertData->entry);
rdata[cnt].next = NULL;
return true;
return INSERTED;
}
/*
* Place tuple and split page, original buffer(lbuf) leaves untouched,
* returns shadow page of lbuf filled new data.
* returns shadow pages filled with new data.
* Tuples are distributed between pages by equal size on its, not
* an equal number!
*/
static Page
entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
static void
entrySplitPage(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertPayload,
BlockNumber updateblkno, XLogRecData **prdata)
BlockNumber updateblkno, XLogRecData **prdata,
Page *newlpage, Page *newrpage)
{
GinBtreeEntryInsertData *insertData = insertPayload;
OffsetNumber off = stack->off;
OffsetNumber i,
maxoff,
separator = InvalidOffsetNumber;
......@@ -561,8 +593,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
char *ptr;
IndexTuple itup;
Page page;
Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
Page rpage = BufferGetPage(rbuf);
Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Size pageSize = PageGetPageSize(lpage);
/* these must be static so they can be returned to caller */
......@@ -651,7 +683,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
rdata[1].len = tupstoresize;
rdata[1].next = NULL;
return lpage;
*newlpage = lpage;
*newrpage = rpage;
}
/*
......@@ -719,7 +752,6 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
btree->findItem = entryLocateLeafEntry;
btree->findChildPtr = entryFindChildPtr;
btree->placeToPage = entryPlaceToPage;
btree->splitPage = entrySplitPage;
btree->fillRoot = ginEntryFillRoot;
btree->prepareDownlink = entryPrepareDownlink;
......
......@@ -487,7 +487,7 @@ ginHeapTupleFastCollect(GinState *ginstate,
IndexTuple itup;
itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
NULL, 0, true);
NULL, 0, 0, true);
itup->t_tid = *ht_ctid;
collector->tuples[collector->ntuples++] = itup;
collector->sumsize += IndexTupleSize(itup);
......
......@@ -71,24 +71,20 @@ callConsistentFn(GinState *ginstate, GinScanKey key)
* Tries to refind previously taken ItemPointer on a posting page.
*/
static bool
findItemInPostingPage(Page page, ItemPointer item, OffsetNumber *off)
needToStepRight(Page page, ItemPointer item)
{
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
int res;
if (GinPageGetOpaque(page)->flags & GIN_DELETED)
/* page was deleted by concurrent vacuum */
return false;
return true;
/*
* scan page to find equal or first greater value
*/
for (*off = FirstOffsetNumber; *off <= maxoff; (*off)++)
if (ginCompareItemPointers(item, GinDataPageGetRightBound(page)) > 0
&& !GinPageRightMost(page))
{
res = ginCompareItemPointers(item, GinDataPageGetItemPointer(page, *off));
if (res <= 0)
return true;
/*
* the item we're looking is > the right bound of the page, so it
* can't be on this page.
*/
return true;
}
return false;
......@@ -143,14 +139,10 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
for (;;)
{
page = BufferGetPage(buffer);
if ((GinPageGetOpaque(page)->flags & GIN_DELETED) == 0 &&
GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber)
if ((GinPageGetOpaque(page)->flags & GIN_DELETED) == 0)
{
tbm_add_tuples(scanEntry->matchBitmap,
GinDataPageGetItemPointer(page, FirstOffsetNumber),
GinPageGetOpaque(page)->maxoff, false);
scanEntry->predictNumberResult += GinPageGetOpaque(page)->maxoff;
int n = GinDataLeafPageGetItemsToTbm(page, scanEntry->matchBitmap);
scanEntry->predictNumberResult += n;
}
if (GinPageRightMost(page))
......@@ -335,8 +327,11 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
}
else
{
tbm_add_tuples(scanEntry->matchBitmap,
GinGetPosting(itup), GinGetNPosting(itup), false);
ItemPointer ipd;
int nipd;
ipd = ginReadTuple(btree->ginstate, scanEntry->attnum, itup, &nipd);
tbm_add_tuples(scanEntry->matchBitmap, ipd, nipd, false);
scanEntry->predictNumberResult += GinGetNPosting(itup);
}
......@@ -450,16 +445,14 @@ restartScanEntry:
IncrBufferRefCount(entry->buffer);
page = BufferGetPage(entry->buffer);
entry->predictNumberResult = stack->predictNumber * GinPageGetOpaque(page)->maxoff;
/*
* Keep page content in memory to prevent durable page locking
* Copy page content to memory to avoid keeping it locked for
* a long time.
*/
entry->list = (ItemPointerData *) palloc(BLCKSZ);
entry->nlist = GinPageGetOpaque(page)->maxoff;
memcpy(entry->list,
GinDataPageGetItemPointer(page, FirstOffsetNumber),
GinPageGetOpaque(page)->maxoff * sizeof(ItemPointerData));
entry->list = GinDataLeafPageGetItems(page, &entry->nlist);
entry->predictNumberResult = stack->predictNumber * entry->nlist;
LockBuffer(entry->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
......@@ -467,9 +460,10 @@ restartScanEntry:
}
else if (GinGetNPosting(itup) > 0)
{
entry->nlist = GinGetNPosting(itup);
entry->list = (ItemPointerData *) palloc(sizeof(ItemPointerData) * entry->nlist);
memcpy(entry->list, GinGetPosting(itup), sizeof(ItemPointerData) * entry->nlist);
entry->list = ginReadTuple(ginstate, entry->attnum, itup,
&entry->nlist);
entry->predictNumberResult = entry->nlist;
entry->isFinished = FALSE;
}
}
......@@ -532,6 +526,7 @@ static void
entryGetNextItem(GinState *ginstate, GinScanEntry entry)
{
Page page;
int i;
for (;;)
{
......@@ -564,35 +559,47 @@ entryGetNextItem(GinState *ginstate, GinScanEntry entry)
page = BufferGetPage(entry->buffer);
entry->offset = InvalidOffsetNumber;
if (!ItemPointerIsValid(&entry->curItem) ||
findItemInPostingPage(page, &entry->curItem, &entry->offset))
if (entry->list)
{
/*
* Found position equal to or greater than stored
*/
entry->nlist = GinPageGetOpaque(page)->maxoff;
memcpy(entry->list,
GinDataPageGetItemPointer(page, FirstOffsetNumber),
GinPageGetOpaque(page)->maxoff * sizeof(ItemPointerData));
pfree(entry->list);
entry->list = NULL;
}
LockBuffer(entry->buffer, GIN_UNLOCK);
/*
* If the page was concurrently split, we have to re-find the
* item we were stopped on. If the page was split more than once,
* the item might not be on this page, but somewhere to the right.
* Keep following the right-links until we re-find the correct
* page.
*/
if (ItemPointerIsValid(&entry->curItem) &&
needToStepRight(page, &entry->curItem))
{
continue;
}
entry->list = GinDataLeafPageGetItems(page, &entry->nlist);
if (!ItemPointerIsValid(&entry->curItem) ||
ginCompareItemPointers(&entry->curItem,
entry->list + entry->offset - 1) == 0)
/* re-find the item we were stopped on. */
if (ItemPointerIsValid(&entry->curItem))
{
for (i = 0; i < entry->nlist; i++)
{
/*
* First pages are deleted or empty, or we found exact
* position, so break inner loop and continue outer one.
*/
break;
if (ginCompareItemPointers(&entry->curItem,
&entry->list[i]) < 0)
{
LockBuffer(entry->buffer, GIN_UNLOCK);
entry->offset = i + 1;
entry->curItem = entry->list[entry->offset - 1];
return;
}
}
/*
* Find greater than entry->curItem position, store it.
*/
}
else
{
LockBuffer(entry->buffer, GIN_UNLOCK);
entry->offset = 1; /* scan all items on the page. */
entry->curItem = entry->list[entry->offset - 1];
return;
}
}
......
......@@ -53,31 +53,42 @@ addItemPointersToLeafTuple(GinState *ginstate,
Datum key;
GinNullCategory category;
IndexTuple res;
ItemPointerData *newItems,
*oldItems;
int oldNPosting,
newNPosting;
GinPostingList *compressedList;
Assert(!GinIsPostingTree(old));
attnum = gintuple_get_attrnum(ginstate, old);
key = gintuple_get_key(ginstate, old, &category);
/* try to build tuple with room for all the items */
res = GinFormTuple(ginstate, attnum, key, category,
NULL, nitem + GinGetNPosting(old),
false);
/* merge the old and new posting lists */
oldItems = ginReadTuple(ginstate, attnum, old, &oldNPosting);
if (res)
newNPosting = oldNPosting + nitem;
newItems = (ItemPointerData *) palloc(sizeof(ItemPointerData) * newNPosting);
newNPosting = ginMergeItemPointers(newItems,
items, nitem,
oldItems, oldNPosting);
/* Compress the posting list, and try to a build tuple with room for it */
res = NULL;
compressedList = ginCompressPostingList(newItems, newNPosting, GinMaxItemSize,
NULL);
pfree(newItems);
if (compressedList)
{
/* good, small enough */
uint32 newnitem;
/* fill in the posting list with union of old and new TIDs */
newnitem = ginMergeItemPointers(GinGetPosting(res),
GinGetPosting(old),
GinGetNPosting(old),
items, nitem);
/* merge might have eliminated some duplicate items */
GinShortenTuple(res, newnitem);
res = GinFormTuple(ginstate, attnum, key, category,
(char *) compressedList,
SizeOfGinPostingList(compressedList),
newNPosting,
false);
pfree(compressedList);
}
else
if (!res)
{
/* posting list would be too big, convert to posting tree */
BlockNumber postingRoot;
......@@ -88,8 +99,8 @@ addItemPointersToLeafTuple(GinState *ginstate,
* already be in order with no duplicates.
*/
postingRoot = createPostingTree(ginstate->index,
GinGetPosting(old),
GinGetNPosting(old),
oldItems,
oldNPosting,
buildStats);
/* Now insert the TIDs-to-be-added into the posting tree */
......@@ -98,9 +109,10 @@ addItemPointersToLeafTuple(GinState *ginstate,
buildStats);
/* And build a new posting-tree-only result tuple */
res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, true);
res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
GinSetPostingTree(res, postingRoot);
}
pfree(oldItems);
return res;
}
......@@ -119,12 +131,19 @@ buildFreshLeafTuple(GinState *ginstate,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats)
{
IndexTuple res;
IndexTuple res = NULL;
GinPostingList *compressedList;
/* try to build a posting list tuple with all the items */
res = GinFormTuple(ginstate, attnum, key, category,
items, nitem, false);
compressedList = ginCompressPostingList(items, nitem, GinMaxItemSize, NULL);
if (compressedList)
{
res = GinFormTuple(ginstate, attnum, key, category,
(char *) compressedList,
SizeOfGinPostingList(compressedList),
nitem, false);
pfree(compressedList);
}
if (!res)
{
/* posting list would be too big, build posting tree */
......@@ -134,7 +153,7 @@ buildFreshLeafTuple(GinState *ginstate,
* Build posting-tree-only result tuple. We do this first so as to
* fail quickly if the key is too big.
*/
res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, true);
res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
/*
* Initialize a new posting tree with the TIDs.
......
This diff is collapsed.
This diff is collapsed.
......@@ -78,7 +78,7 @@ static void
ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
char *ptr;
Buffer buffer;
Page page;
......@@ -89,9 +89,14 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
GinPageGetOpaque(page)->maxoff = data->nitem;
GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
/* Place page data */
memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
GinDataLeafPageSetPostingListSize(page, data->size);
PageSetLSN(page, lsn);
......@@ -100,11 +105,11 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
}
static void
ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
void *rdata)
ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
{
Page page = BufferGetPage(buffer);
ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
OffsetNumber offset = data->offset;
IndexTuple itup;
if (rightblkno != InvalidBlockNumber)
......@@ -138,30 +143,43 @@ ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
}
static void
ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
void *rdata)
ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
{
Pointer segment;
/* Copy the new data to the right place */
segment = ((Pointer) GinDataLeafPageGetPostingList(page))
+ data->unmodifiedsize;
memcpy(segment, data->newdata, data->length - data->unmodifiedsize);
GinDataLeafPageSetPostingListSize(page, data->length);
GinPageSetCompressed(page);
}
static void
ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
{
Page page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
if (isLeaf)
{
ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
ItemPointerData *items = data->items;
OffsetNumber i;
ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
for (i = 0; i < data->nitem; i++)
GinDataPageAddItemPointer(page, &items[i], offset + i);
Assert(GinPageIsLeaf(page));
ginRedoRecompress(page, data);
}
else
{
PostingItem *pitem = (PostingItem *) rdata;
ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
PostingItem *oldpitem;
Assert(!GinPageIsLeaf(page));
/* update link to right page after split */
oldpitem = GinDataPageGetPostingItem(page, offset);
oldpitem = GinDataPageGetPostingItem(page, data->offset);
PostingItemSetBlockNumber(oldpitem, rightblkno);
GinDataPageAddPostingItem(page, pitem, offset);
GinDataPageAddPostingItem(page, &data->newitem, data->offset);
}
}
......@@ -213,12 +231,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (data->flags & GIN_INSERT_ISDATA)
{
Assert(GinPageIsData(page));
ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
}
else
{
Assert(!GinPageIsData(page));
ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
}
PageSetLSN(page, lsn);
......@@ -253,38 +271,42 @@ ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
static void
ginRedoSplitData(Page lpage, Page rpage, void *rdata)
{
ginxlogSplitData *data = (ginxlogSplitData *) rdata;
bool isleaf = GinPageIsLeaf(lpage);
char *ptr = (char *) rdata + sizeof(ginxlogSplitData);
OffsetNumber i;
ItemPointer bound;
if (isleaf)
{
ItemPointer items = (ItemPointer) ptr;
for (i = 0; i < data->separator; i++)
GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
for (i = data->separator; i < data->nitem; i++)
GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata;
Pointer lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf);
Pointer rptr = lptr + data->lsize;
Assert(data->lsize > 0 && data->lsize <= GinDataLeafMaxContentSize);
Assert(data->rsize > 0 && data->rsize <= GinDataLeafMaxContentSize);
memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize);
memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize);
GinDataLeafPageSetPostingListSize(lpage, data->lsize);
GinDataLeafPageSetPostingListSize(rpage, data->rsize);
*GinDataPageGetRightBound(lpage) = data->lrightbound;
*GinDataPageGetRightBound(rpage) = data->rrightbound;
}
else
{
PostingItem *items = (PostingItem *) ptr;
ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata;
PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal));
OffsetNumber i;
OffsetNumber maxoff;
for (i = 0; i < data->separator; i++)
GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
for (i = data->separator; i < data->nitem; i++)
GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
}
/* set up right key */
bound = GinDataPageGetRightBound(lpage);
if (isleaf)
*bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
else
*bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
bound = GinDataPageGetRightBound(rpage);
*bound = data->rightbound;
/* set up right key */
maxoff = GinPageGetOpaque(lpage)->maxoff;
*GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key;
*GinDataPageGetRightBound(rpage) = data->rightbound;
}
}
static void
......@@ -317,9 +339,10 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (isLeaf)
flags |= GIN_LEAF;
if (isData)
flags |= GIN_DATA;
if (isLeaf && isData)
flags |= GIN_COMPRESSED;
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
......@@ -352,7 +375,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED);
if (isData)
{
......@@ -383,13 +406,20 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(lbuffer);
}
/*
* This is functionally the same as heap_xlog_newpage.
*/
static void
ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record);
char *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage);
Buffer buffer;
Page page;
Assert(xlrec->hole_offset < BLCKSZ);
Assert(xlrec->hole_length < BLCKSZ);
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
......@@ -397,41 +427,56 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
return;
}
buffer = XLogReadBuffer(data->node, data->blkno, false);
buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
if (xlrec->hole_length == 0)
{
if (GinPageIsData(page))
{
memcpy(GinDataPageGetData(page),
XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
data->nitem * GinSizeOfDataPageItem(page));
GinPageGetOpaque(page)->maxoff = data->nitem;
}
else
{
OffsetNumber i,
*tod;
IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
memcpy((char *) page, blk, BLCKSZ);
}
else
{
memcpy((char *) page, blk, xlrec->hole_offset);
/* must zero-fill the hole */
MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length);
memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length),
blk + xlrec->hole_offset,
BLCKSZ - (xlrec->hole_offset + xlrec->hole_length));
}
tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
tod[i - 1] = i;
PageSetLSN(page, lsn);
PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
for (i = 0; i < data->nitem; i++)
{
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u",
data->node.spcNode, data->node.dbNode, data->node.relNode);
itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
}
}
static void
ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
Buffer buffer;
Page page;
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
Assert(GinPageIsLeaf(page));
Assert(GinPageIsData(page));
if (lsn > PageGetLSN(page))
{
ginRedoRecompress(page, &xlrec->data);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
......@@ -747,6 +792,9 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_GIN_VACUUM_PAGE:
ginRedoVacuumPage(lsn, record);
break;
case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
ginRedoVacuumDataLeafPage(lsn, record);
break;
case XLOG_GIN_DELETE_PAGE:
ginRedoDeletePage(lsn, record);
break;
......
......@@ -47,8 +47,7 @@ gin_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfoString(buf, "Insert item, ");
desc_node(buf, xlrec->node, xlrec->blkno);
appendStringInfo(buf, " offset: %u isdata: %c isleaf: %c",
xlrec->offset,
appendStringInfo(buf, " isdata: %c isleaf: %c",
(xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F',
(xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F');
if (!(xlrec->flags & GIN_INSERT_ISLEAF))
......@@ -67,24 +66,50 @@ gin_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, " isdelete: %c",
(((ginxlogInsertEntry *) payload)->isDelete) ? 'T' : 'F');
else if (xlrec->flags & GIN_INSERT_ISLEAF)
appendStringInfo(buf, " nitem: %u",
(((ginxlogInsertDataLeaf *) payload)->nitem));
{
ginxlogRecompressDataLeaf *insertData =
(ginxlogRecompressDataLeaf *) payload;
appendStringInfo(buf, " unmodified: %u length: %u (compressed)",
insertData->unmodifiedsize,
insertData->length);
}
else
{
ginxlogInsertDataInternal *insertData = (ginxlogInsertDataInternal *) payload;
appendStringInfo(buf, " pitem: %u-%u/%u",
PostingItemGetBlockNumber((PostingItem *) payload),
ItemPointerGetBlockNumber(&((PostingItem *) payload)->key),
ItemPointerGetOffsetNumber(&((PostingItem *) payload)->key));
PostingItemGetBlockNumber(&insertData->newitem),
ItemPointerGetBlockNumber(&insertData->newitem.key),
ItemPointerGetOffsetNumber(&insertData->newitem.key));
}
}
break;
case XLOG_GIN_SPLIT:
appendStringInfoString(buf, "Page split, ");
desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->flags & GIN_SPLIT_ROOT) ? 'T' : 'F');
{
ginxlogSplit *xlrec = (ginxlogSplit *) rec;
appendStringInfoString(buf, "Page split, ");
desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->flags & GIN_SPLIT_ROOT) ? 'T' : 'F');
appendStringInfo(buf, " isdata: %c isleaf: %c",
(xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F',
(xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F');
}
break;
case XLOG_GIN_VACUUM_PAGE:
appendStringInfoString(buf, "Vacuum page, ");
desc_node(buf, ((ginxlogVacuumPage *) rec)->node, ((ginxlogVacuumPage *) rec)->blkno);
break;
case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
{
ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) rec;
appendStringInfoString(buf, "Vacuum data leaf page, ");
desc_node(buf, xlrec->node, xlrec->blkno);
appendStringInfo(buf, " unmodified: %u length: %u",
xlrec->data.unmodifiedsize,
xlrec->data.length);
}
break;
case XLOG_GIN_DELETE_PAGE:
appendStringInfoString(buf, "Delete page, ");
desc_node(buf, ((ginxlogDeletePage *) rec)->node, ((ginxlogDeletePage *) rec)->blkno);
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment