Commit 8776faa8 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Adjust SP-GiST WAL record formats to reduce alignment padding.

The way the code was written, the padding was copied from uninitialized
memory areas.. Because the structs are local variables in the code where
the WAL records are constructed, making them larger and zeroing the padding
bytes would not make the code very pretty, so rather than fixing this
directly by zeroing out the padding bytes, it seems more clear to not try to
align the tuples in the WAL records. The redo functions are taught to copy
the tuple header to a local variable to avoid unaligned access.

Stable-branches have the same problem, but we can't change the WAL format
there, so fix in master only. Reading a few random extra bytes at the stack
is harmless in practice, so it's not worth crafting a different
back-patchable fix.

Per reports from Kevin Grittner and Andres Freund, using clang static
analyzer and Valgrind, respectively.
parent d4d48a5e
......@@ -217,7 +217,6 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
xlrec.nodeI = 0;
ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
/* we assume sizeof(xlrec) is at least int-aligned */
ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
ACCEPT_RDATA_BUFFER(current->buffer, 2);
......@@ -533,9 +532,9 @@ moveLeafs(Relation index, SpGistState *state,
{
XLogRecPtr recptr;
ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogMoveLeafs, 0);
ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * nDelete, 1);
ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nInsert, 2);
ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
ACCEPT_RDATA_BUFFER(current->buffer, 4);
ACCEPT_RDATA_BUFFER(nbuf, 5);
......@@ -1116,9 +1115,8 @@ doPickSplit(Relation index, SpGistState *state,
leafdata = leafptr = (char *) palloc(totalLeafSizes);
ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
nRdata = 2;
ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogPickSplit, 0);
nRdata = 1;
/* Here we begin making the changes to the target pages */
START_CRIT_SECTION();
......@@ -1152,7 +1150,7 @@ doPickSplit(Relation index, SpGistState *state,
{
xlrec.nDelete = nToDelete;
ACCEPT_RDATA_DATA(toDelete,
MAXALIGN(sizeof(OffsetNumber) * nToDelete),
sizeof(OffsetNumber) * nToDelete,
nRdata);
nRdata++;
ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
......@@ -1251,13 +1249,11 @@ doPickSplit(Relation index, SpGistState *state,
}
xlrec.nInsert = nToInsert;
ACCEPT_RDATA_DATA(toInsert,
MAXALIGN(sizeof(OffsetNumber) * nToInsert),
nRdata);
ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nToInsert, nRdata);
nRdata++;
ACCEPT_RDATA_DATA(leafPageSelect,
MAXALIGN(sizeof(uint8) * nToInsert),
nRdata);
ACCEPT_RDATA_DATA(leafPageSelect, sizeof(uint8) * nToInsert, nRdata);
nRdata++;
ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, nRdata);
nRdata++;
ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
nRdata++;
......@@ -1518,7 +1514,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
xlrec.newPage = false;
ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
/* we assume sizeof(xlrec) is at least int-aligned */
ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
ACCEPT_RDATA_BUFFER(current->buffer, 2);
......@@ -1733,7 +1728,6 @@ spgSplitNodeAction(Relation index, SpGistState *state,
xlrec.newPage = false;
ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
/* we assume sizeof(xlrec) is at least int-aligned */
ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
ACCEPT_RDATA_BUFFER(current->buffer, 3);
......
......@@ -327,8 +327,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
xlrec.blkno = BufferGetBlockNumber(buffer);
STORE_STATE(&bds->spgstate, xlrec.stateSrc);
ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
/* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumLeaf, 0);
ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1);
ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2);
ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3);
......
......@@ -109,13 +109,15 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
{
char *ptr = XLogRecGetData(record);
spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
SpGistLeafTuple leafTuple;
char *leafTuple;
SpGistLeafTupleData leafTupleHdr;
Buffer buffer;
Page page;
/* we assume this is adequately aligned */
ptr += sizeof(spgxlogAddLeaf);
leafTuple = (SpGistLeafTuple) ptr;
leafTuple = ptr;
/* the leaf tuple is unaligned, so make a copy to access its header */
memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
/*
* In normal operation we would have both current and parent pages locked
......@@ -142,7 +144,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
{
/* normal cases, tuple was added by SpGistPageAddNewItem */
addOrReplaceTuple(page, (Item) leafTuple, leafTuple->size,
addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
xldata->offnumLeaf);
/* update head tuple's chain link if needed */
......@@ -152,7 +154,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
head = (SpGistLeafTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumHeadLeaf));
Assert(head->nextOffset == leafTuple->nextOffset);
Assert(head->nextOffset == leafTupleHdr.nextOffset);
head->nextOffset = xldata->offnumLeaf;
}
}
......@@ -161,10 +163,10 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
/* replacing a DEAD tuple */
PageIndexTupleDelete(page, xldata->offnumLeaf);
if (PageAddItem(page,
(Item) leafTuple, leafTuple->size,
(Item) leafTuple, leafTupleHdr.size,
xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
leafTuple->size);
leafTupleHdr.size);
}
PageSetLSN(page, lsn);
......@@ -217,11 +219,11 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
ptr += MAXALIGN(sizeof(spgxlogMoveLeafs));
ptr += SizeOfSpgxlogMoveLeafs;
toDelete = (OffsetNumber *) ptr;
ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nMoves);
ptr += sizeof(OffsetNumber) * xldata->nMoves;
toInsert = (OffsetNumber *) ptr;
ptr += MAXALIGN(sizeof(OffsetNumber) * nInsert);
ptr += sizeof(OffsetNumber) * nInsert;
/* now ptr points to the list of leaf tuples */
......@@ -252,10 +254,20 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
for (i = 0; i < nInsert; i++)
{
SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
char *leafTuple;
SpGistLeafTupleData leafTupleHdr;
addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
ptr += lt->size;
/*
* the tuples are not aligned, so must copy to access
* the size field.
*/
leafTuple = ptr;
memcpy(&leafTupleHdr, leafTuple,
sizeof(SpGistLeafTupleData));
addOrReplaceTuple(page, (Item) leafTuple,
leafTupleHdr.size, toInsert[i]);
ptr += leafTupleHdr.size;
}
PageSetLSN(page, lsn);
......@@ -321,15 +333,17 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
{
char *ptr = XLogRecGetData(record);
spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
SpGistInnerTuple innerTuple;
char *innerTuple;
SpGistInnerTupleData innerTupleHdr;
SpGistState state;
Buffer buffer;
Page page;
int bbi;
/* we assume this is adequately aligned */
ptr += sizeof(spgxlogAddNode);
innerTuple = (SpGistInnerTuple) ptr;
innerTuple = ptr;
/* the tuple is unaligned, so make a copy to access its header */
memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
fillFakeState(&state, xldata->stateSrc);
......@@ -348,11 +362,11 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
PageIndexTupleDelete(page, xldata->offnum);
if (PageAddItem(page, (Item) innerTuple, innerTuple->size,
if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
xldata->offnum,
false, false) != xldata->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
innerTuple->size);
innerTupleHdr.size);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
......@@ -393,7 +407,7 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
addOrReplaceTuple(page, (Item) innerTuple,
innerTuple->size, xldata->offnumNew);
innerTupleHdr.size, xldata->offnumNew);
/*
* If parent is in this same page, don't advance LSN;
......@@ -508,16 +522,21 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
{
char *ptr = XLogRecGetData(record);
spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
SpGistInnerTuple prefixTuple;
SpGistInnerTuple postfixTuple;
char *prefixTuple;
SpGistInnerTupleData prefixTupleHdr;
char *postfixTuple;
SpGistInnerTupleData postfixTupleHdr;
Buffer buffer;
Page page;
/* we assume this is adequately aligned */
ptr += sizeof(spgxlogSplitTuple);
prefixTuple = (SpGistInnerTuple) ptr;
ptr += prefixTuple->size;
postfixTuple = (SpGistInnerTuple) ptr;
prefixTuple = ptr;
/* the prefix tuple is unaligned, so make a copy to access its header */
memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
ptr += prefixTupleHdr.size;
postfixTuple = ptr;
/* postfix tuple is also unaligned */
memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
/*
* In normal operation we would have both pages locked simultaneously; but
......@@ -543,7 +562,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
addOrReplaceTuple(page, (Item) postfixTuple,
postfixTuple->size, xldata->offnumPostfix);
postfixTupleHdr.size, xldata->offnumPostfix);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
......@@ -564,14 +583,14 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
PageIndexTupleDelete(page, xldata->offnumPrefix);
if (PageAddItem(page, (Item) prefixTuple, prefixTuple->size,
if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
prefixTuple->size);
prefixTupleHdr.size);
if (xldata->blknoPostfix == xldata->blknoPrefix)
addOrReplaceTuple(page, (Item) postfixTuple,
postfixTuple->size,
postfixTupleHdr.size,
xldata->offnumPostfix);
PageSetLSN(page, lsn);
......@@ -587,7 +606,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
{
char *ptr = XLogRecGetData(record);
spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
SpGistInnerTuple innerTuple;
char *innerTuple;
SpGistInnerTupleData innerTupleHdr;
SpGistState state;
OffsetNumber *toDelete;
OffsetNumber *toInsert;
......@@ -602,15 +622,18 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
fillFakeState(&state, xldata->stateSrc);
ptr += MAXALIGN(sizeof(spgxlogPickSplit));
innerTuple = (SpGistInnerTuple) ptr;
ptr += innerTuple->size;
ptr += SizeOfSpgxlogPickSplit;
toDelete = (OffsetNumber *) ptr;
ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nDelete);
ptr += sizeof(OffsetNumber) * xldata->nDelete;
toInsert = (OffsetNumber *) ptr;
ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nInsert);
ptr += sizeof(OffsetNumber) * xldata->nInsert;
leafPageSelect = (uint8 *) ptr;
ptr += MAXALIGN(sizeof(uint8) * xldata->nInsert);
ptr += sizeof(uint8) * xldata->nInsert;
innerTuple = ptr;
/* the inner tuple is unaligned, so make a copy to access its header */
memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
ptr += innerTupleHdr.size;
/* now ptr points to the list of leaf tuples */
......@@ -735,15 +758,20 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
/* restore leaf tuples to src and/or dest page */
for (i = 0; i < xldata->nInsert; i++)
{
SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
char *leafTuple;
SpGistLeafTupleData leafTupleHdr;
ptr += lt->size;
/* the tuples are not aligned, so must copy to access the size field. */
leafTuple = ptr;
memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
ptr += leafTupleHdr.size;
page = leafPageSelect[i] ? destPage : srcPage;
if (page == NULL)
continue; /* no need to touch this page */
addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
toInsert[i]);
}
/* Now update src and dest page LSNs if needed */
......@@ -776,7 +804,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
addOrReplaceTuple(page, (Item) innerTuple, innerTuple->size,
addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
xldata->offnumInner);
/* if inner is also parent, update link while we're here */
......@@ -861,7 +889,7 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
fillFakeState(&state, xldata->stateSrc);
ptr += sizeof(spgxlogVacuumLeaf);
ptr += SizeOfSpgxlogVacuumLeaf;
toDead = (OffsetNumber *) ptr;
ptr += sizeof(OffsetNumber) * xldata->nDead;
toPlaceholder = (OffsetNumber *) ptr;
......@@ -941,8 +969,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
ptr += sizeof(spgxlogVacuumRoot);
toDelete = (OffsetNumber *) ptr;
toDelete = xldata->offsets;
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
......@@ -974,8 +1001,7 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
ptr += sizeof(spgxlogVacuumRedirect);
itemToPlaceholder = (OffsetNumber *) ptr;
itemToPlaceholder = xldata->offsets;
/*
* If any redirection tuples are being removed, make sure there are no
......
......@@ -422,10 +422,7 @@ typedef struct spgxlogAddLeaf
OffsetNumber offnumParent;
uint16 nodeI;
/*
* new leaf tuple follows, on an intalign boundary (replay only needs to
* fetch its size field, so that should be enough alignment)
*/
/* new leaf tuple follows (unaligned!) */
} spgxlogAddLeaf;
typedef struct spgxlogMoveLeafs
......@@ -449,9 +446,7 @@ typedef struct spgxlogMoveLeafs
* data follows:
* array of deleted tuple numbers, length nMoves
* array of inserted tuple numbers, length nMoves + 1 or 1
* list of leaf tuples, length nMoves + 1 or 1 (must be maxaligned)
* the tuple number arrays are padded to maxalign boundaries so that the
* leaf tuples will be suitably aligned
* list of leaf tuples, length nMoves + 1 or 1 (unaligned!)
*
* Note: if replaceDead is true then there is only one inserted tuple
* number and only one leaf tuple in the data, because we are not copying
......@@ -463,8 +458,11 @@ typedef struct spgxlogMoveLeafs
* Parent page
*----------
*/
OffsetNumber offsets[1];
} spgxlogMoveLeafs;
#define SizeOfSpgxlogMoveLeafs offsetof(spgxlogMoveLeafs, offsets)
typedef struct spgxlogAddNode
{
RelFileNode node;
......@@ -483,8 +481,7 @@ typedef struct spgxlogAddNode
spgxlogState stateSrc;
/*
* updated inner tuple follows, on an intalign boundary (replay only needs
* to fetch its size field, so that should be enough alignment)
* updated inner tuple follows (unaligned!)
*/
} spgxlogAddNode;
......@@ -500,9 +497,8 @@ typedef struct spgxlogSplitTuple
bool newPage; /* need to init that page? */
/*
* new prefix inner tuple follows, then new postfix inner tuple, on
* intalign boundaries (replay only needs to fetch size fields, so that
* should be enough alignment)
* new prefix inner tuple follows, then new postfix inner tuple
* (both are unaligned!)
*/
} spgxlogSplitTuple;
......@@ -531,13 +527,11 @@ typedef struct spgxlogPickSplit
/*----------
* data follows:
* new inner tuple (assumed to have a maxaligned length)
* array of deleted tuple numbers, length nDelete
* array of inserted tuple numbers, length nInsert
* array of page selector bytes for inserted tuples, length nInsert
* list of leaf tuples, length nInsert (must be maxaligned)
* the tuple number and page selector arrays are padded to maxalign
* boundaries so that the leaf tuples will be suitably aligned
* new inner tuple (unaligned!)
* list of leaf tuples, length nInsert (unaligned!)
*
* Buffer references in the rdata array are:
* Src page (only if not root and not being init'd)
......@@ -546,8 +540,11 @@ typedef struct spgxlogPickSplit
* Parent page (if any; could be same as Inner)
*----------
*/
OffsetNumber offsets[1];
} spgxlogPickSplit;
#define SizeOfSpgxlogPickSplit offsetof(spgxlogPickSplit, offsets)
typedef struct spgxlogVacuumLeaf
{
RelFileNode node;
......@@ -570,8 +567,11 @@ typedef struct spgxlogVacuumLeaf
* tuple numbers to insert in nextOffset links
*----------
*/
OffsetNumber offsets[1];
} spgxlogVacuumLeaf;
#define SizeOfSpgxlogVacuumLeaf offsetof(spgxlogVacuumLeaf, offsets)
typedef struct spgxlogVacuumRoot
{
/* vacuum a root page when it is also a leaf */
......@@ -583,8 +583,11 @@ typedef struct spgxlogVacuumRoot
spgxlogState stateSrc;
/* offsets of tuples to delete follow */
OffsetNumber offsets[1];
} spgxlogVacuumRoot;
#define SizeOfSpgxlogVacuumRoot offsetof(spgxlogVacuumRoot, offsets)
typedef struct spgxlogVacuumRedirect
{
RelFileNode node;
......@@ -595,8 +598,11 @@ typedef struct spgxlogVacuumRedirect
TransactionId newestRedirectXid; /* newest XID of removed redirects */
/* offsets of redirect tuples to make placeholders follow */
OffsetNumber offsets[1];
} spgxlogVacuumRedirect;
#define SizeOfSpgxlogVacuumRedirect offsetof(spgxlogVacuumRedirect, offsets)
/*
* The "flags" argument for SpGistGetBuffer should be either GBUF_LEAF to
* get a leaf page, or GBUF_INNER_PARITY(blockNumber) to get an inner
......
......@@ -55,7 +55,7 @@ typedef struct BkpBlock
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD07D /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD07E /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment