Commit 6655a729 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Use full 64-bit XID for checking if a deleted GiST page is old enough.

Otherwise, after a deleted page gets even older, it becomes unrecyclable
again. B-tree has the same problem, and has had since time immemorial,
but let's at least fix this in GiST, where this is new.

Backpatch to v12, where GiST page deletion was introduced.

Reviewed-by: Andrey Borodin
Discussion: https://www.postgresql.org/message-id/835A15A5-F1B4-4446-A711-BF48357EB602%40yandex-team.ru
parent 9eb5607e
......@@ -882,9 +882,27 @@ gistNewBuffer(Relation r)
bool
gistPageRecyclable(Page page)
{
return PageIsNew(page) ||
(GistPageIsDeleted(page) &&
TransactionIdPrecedes(GistPageGetDeleteXid(page), RecentGlobalXmin));
if (PageIsNew(page))
return true;
if (GistPageIsDeleted(page))
{
/*
* The page was deleted, but when? If it was just deleted, a scan
* might have seen the downlink to it, and will read the page later.
* As long as that can happen, we must keep the deleted page around as
* a tombstone.
*
* Compare the deletion XID with RecentGlobalXmin. If deleteXid <
* RecentGlobalXmin, then no scan that's still in progress could have
* seen its downlink, and we can recycle it.
*/
FullTransactionId deletexid_full = GistPageGetDeleteXid(page);
FullTransactionId recentxmin_full = GetFullRecentGlobalXmin();
if (FullTransactionIdPrecedes(deletexid_full, recentxmin_full))
return true;
}
return false;
}
bytea *
......
......@@ -595,7 +595,7 @@ gistdeletepage(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
ItemId iid;
IndexTuple idxtuple;
XLogRecPtr recptr;
TransactionId txid;
FullTransactionId txid;
/*
* Check that the leaf is still empty and deletable.
......@@ -648,14 +648,13 @@ gistdeletepage(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
* currently in progress must have ended. (That's much more conservative
* than needed, but let's keep it safe and simple.)
*/
txid = ReadNewTransactionId();
txid = ReadNextFullTransactionId();
START_CRIT_SECTION();
/* mark the page as deleted */
MarkBufferDirty(leafBuffer);
GistPageSetDeleteXid(leafPage, txid);
GistPageSetDeleted(leafPage);
GistPageSetDeleted(leafPage, txid);
stats->stats.pages_deleted++;
/* remove the downlink from the parent */
......
......@@ -356,8 +356,7 @@ gistRedoPageDelete(XLogReaderState *record)
{
Page page = (Page) BufferGetPage(leafBuffer);
GistPageSetDeleteXid(page, xldata->deleteXid);
GistPageSetDeleted(page);
GistPageSetDeleted(page, xldata->deleteXid);
PageSetLSN(page, lsn);
MarkBufferDirty(leafBuffer);
......@@ -396,8 +395,27 @@ gistRedoPageReuse(XLogReaderState *record)
*/
if (InHotStandby)
{
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
FullTransactionId latestRemovedFullXid = xlrec->latestRemovedFullXid;
FullTransactionId nextFullXid = ReadNextFullTransactionId();
uint64 diff;
/*
* ResolveRecoveryConflictWithSnapshot operates on 32-bit
* TransactionIds, so truncate the logged FullTransactionId. If the
* logged value is very old, so that XID wrap-around already happened
* on it, there can't be any snapshots that still see it.
*/
nextFullXid = ReadNextFullTransactionId();
diff = U64FromFullTransactionId(nextFullXid) -
U64FromFullTransactionId(latestRemovedFullXid);
if (diff < MaxTransactionId / 2)
{
TransactionId latestRemovedXid;
latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
xlrec->node);
}
}
}
......@@ -554,7 +572,7 @@ gistXLogSplit(bool page_is_leaf,
* downlink from the parent page.
*/
XLogRecPtr
gistXLogPageDelete(Buffer buffer, TransactionId xid,
gistXLogPageDelete(Buffer buffer, FullTransactionId xid,
Buffer parentBuffer, OffsetNumber downlinkOffset)
{
gistxlogPageDelete xlrec;
......@@ -578,7 +596,7 @@ gistXLogPageDelete(Buffer buffer, TransactionId xid,
* Write XLOG record about reuse of a deleted page.
*/
void
gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid)
gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid)
{
gistxlogPageReuse xlrec_reuse;
......@@ -591,7 +609,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
/* XLOG stuff */
xlrec_reuse.node = rel->rd_node;
xlrec_reuse.block = blkno;
xlrec_reuse.latestRemovedXid = latestRemovedXid;
xlrec_reuse.latestRemovedFullXid = latestRemovedXid;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec_reuse, SizeOfGistxlogPageReuse);
......
......@@ -26,10 +26,11 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
static void
out_gistxlogPageReuse(StringInfo buf, gistxlogPageReuse *xlrec)
{
appendStringInfo(buf, "rel %u/%u/%u; blk %u; latestRemovedXid %u",
appendStringInfo(buf, "rel %u/%u/%u; blk %u; latestRemovedXid %u:%u",
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->block,
xlrec->latestRemovedXid);
EpochFromFullTransactionId(xlrec->latestRemovedFullXid),
XidFromFullTransactionId(xlrec->latestRemovedFullXid));
}
static void
......@@ -50,8 +51,10 @@ out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
static void
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
{
appendStringInfo(buf, "deleteXid %u; downlink %u",
xlrec->deleteXid, xlrec->downlinkOffset);
appendStringInfo(buf, "deleteXid %u:%u; downlink %u",
EpochFromFullTransactionId(xlrec->deleteXid),
XidFromFullTransactionId(xlrec->deleteXid),
xlrec->downlinkOffset);
}
void
......
......@@ -956,6 +956,36 @@ xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
return 0;
}
/*
* Get current RecentGlobalXmin value, as a FullTransactionId.
*/
FullTransactionId
GetFullRecentGlobalXmin(void)
{
FullTransactionId nextxid_full;
uint32 nextxid_epoch;
TransactionId nextxid_xid;
uint32 epoch;
Assert(TransactionIdIsNormal(RecentGlobalXmin));
/*
* Compute the epoch from the next XID's epoch. This relies on the fact
* that RecentGlobalXmin must be within the 2 billion XID horizon from the
* next XID.
*/
nextxid_full = ReadNextFullTransactionId();
nextxid_epoch = EpochFromFullTransactionId(nextxid_full);
nextxid_xid = XidFromFullTransactionId(nextxid_full);
if (RecentGlobalXmin > nextxid_xid)
epoch = nextxid_epoch - 1;
else
epoch = nextxid_epoch;
return FullTransactionIdFromEpochAndXid(epoch, RecentGlobalXmin);
}
/*
* SnapshotResetXmin
*
......
......@@ -16,6 +16,7 @@
#ifndef GIST_H
#define GIST_H
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "storage/block.h"
......@@ -140,8 +141,6 @@ typedef struct GISTENTRY
#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page))
#define GistPageIsDeleted(page) ( GistPageGetOpaque(page)->flags & F_DELETED)
#define GistPageSetDeleted(page) ( GistPageGetOpaque(page)->flags |= F_DELETED)
#define GistPageSetNonDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_DELETED)
#define GistTuplesDeleted(page) ( GistPageGetOpaque(page)->flags & F_TUPLES_DELETED)
#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
......@@ -158,9 +157,45 @@ typedef struct GISTENTRY
#define GistPageGetNSN(page) ( PageXLogRecPtrGet(GistPageGetOpaque(page)->nsn))
#define GistPageSetNSN(page, val) ( PageXLogRecPtrSet(GistPageGetOpaque(page)->nsn, val))
/* For deleted pages we store last xid which could see the page in scan */
#define GistPageGetDeleteXid(page) ( ((PageHeader) (page))->pd_prune_xid )
#define GistPageSetDeleteXid(page, val) ( ((PageHeader) (page))->pd_prune_xid = val)
/*
* On a deleted page, we store this struct. A deleted page doesn't contain any
* tuples, so we don't use the normal page layout with line pointers. Instead,
* this struct is stored right after the standard page header. pd_lower points
* to the end of this struct. If we add fields to this struct in the future, we
* can distinguish the old and new formats by pd_lower.
*/
typedef struct GISTDeletedPageContents
{
/* last xid which could see the page in a scan */
FullTransactionId deleteXid;
} GISTDeletedPageContents;
static inline void
GistPageSetDeleted(Page page, FullTransactionId deletexid)
{
Assert(PageIsEmpty(page));
GistPageGetOpaque(page)->flags |= F_DELETED;
((PageHeader) page)->pd_lower = MAXALIGN(SizeOfPageHeaderData) + sizeof(GISTDeletedPageContents);
((GISTDeletedPageContents *) PageGetContents(page))->deleteXid = deletexid;
}
static inline FullTransactionId
GistPageGetDeleteXid(Page page)
{
Assert(GistPageIsDeleted(page));
/* Is the deleteXid field present? */
if (((PageHeader) page)->pd_lower >= MAXALIGN(SizeOfPageHeaderData) +
offsetof(GISTDeletedPageContents, deleteXid) + sizeof(FullTransactionId))
{
return ((GISTDeletedPageContents *) PageGetContents(page))->deleteXid;
}
else
return FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId);
}
/*
* Vector of GISTENTRY structs; user-defined methods union and picksplit
......
......@@ -426,11 +426,11 @@ extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
/* gistxlog.c */
extern XLogRecPtr gistXLogPageDelete(Buffer buffer,
TransactionId xid, Buffer parentBuffer,
FullTransactionId xid, Buffer parentBuffer,
OffsetNumber downlinkOffset);
extern void gistXLogPageReuse(Relation rel, BlockNumber blkno,
TransactionId latestRemovedXid);
FullTransactionId latestRemovedXid);
extern XLogRecPtr gistXLogUpdate(Buffer buffer,
OffsetNumber *todelete, int ntodelete,
......
......@@ -83,7 +83,7 @@ typedef struct gistxlogPageSplit
*/
typedef struct gistxlogPageDelete
{
TransactionId deleteXid; /* last Xid which could see page in scan */
FullTransactionId deleteXid; /* last Xid which could see page in scan */
OffsetNumber downlinkOffset; /* Offset of downlink referencing this
* page */
} gistxlogPageDelete;
......@@ -98,10 +98,10 @@ typedef struct gistxlogPageReuse
{
RelFileNode node;
BlockNumber block;
TransactionId latestRemovedXid;
FullTransactionId latestRemovedFullXid;
} gistxlogPageReuse;
#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, latestRemovedXid) + sizeof(TransactionId))
#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, latestRemovedFullXid) + sizeof(FullTransactionId))
extern void gist_redo(XLogReaderState *record);
extern void gist_desc(StringInfo buf, XLogReaderState *record);
......
......@@ -13,6 +13,7 @@
#ifndef SNAPMGR_H
#define SNAPMGR_H
#include "access/transam.h"
#include "fmgr.h"
#include "utils/relcache.h"
#include "utils/resowner.h"
......@@ -122,6 +123,8 @@ extern void UnregisterSnapshot(Snapshot snapshot);
extern Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner);
extern void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner);
extern FullTransactionId GetFullRecentGlobalXmin(void);
extern void AtSubCommit_Snapshot(int level);
extern void AtSubAbort_Snapshot(int level);
extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment