Commit 848ef42b authored by Kevin Grittner's avatar Kevin Grittner

Add the "snapshot too old" feature

This feature is controlled by a new old_snapshot_threshold GUC.  A
value of -1 disables the feature, and that is the default.  The
value of 0 is just intended for testing.  Above that it is the
number of minutes a snapshot can reach before pruning and vacuum
are allowed to remove dead tuples which the snapshot would
otherwise protect.  The xmin associated with a transaction ID does
still protect dead tuples.  A connection which is using an "old"
snapshot does not get an error unless it accesses a page modified
recently enough that it might not be able to produce accurate
results.

This is similar to the Oracle feature, and we use the same SQLSTATE
and error message for compatibility.
parent 8b65cf4c
......@@ -138,7 +138,8 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
blkno, RBM_NORMAL, bas);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, scan->xs_snapshot, scan->indexRelation,
BGP_TEST_FOR_OLD_SNAPSHOT);
if (!BloomPageIsDeleted(page))
{
......
......@@ -2041,6 +2041,42 @@ include_dir 'conf.d'
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
<term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
<indexterm>
<primary><varname>old_snapshot_threshold</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the minimum time that a snapshot can be used without risk of a
<literal>snapshot too old</> error occurring when using the snapshot.
This parameter can only be set at server start.
</para>
<para>
Beyond the threshold, old data may be vacuumed away. This can help
prevent bloat in the face of snapshots which remain in use for a
long time. To prevent incorrect results due to cleanup of data which
would otherwise be visible to the snapshot, an error is generated
when the snapshot is older than this threshold and the snapshot is
used to read a page which has been modified since the snapshot was
built.
</para>
<para>
A value of <literal>-1</> disables this feature, and is the default.
Useful values for production work probably range from a small number
of hours to a few days. The setting will be coerced to a granularity
of minutes, and small numbers (such as <literal>0</> or
<literal>1min</>) are only allowed because they may sometimes be
useful for testing. While a setting as high as <literal>60d</> is
allowed, please note that in many workloads extreme bloat or
transaction ID wraparound may occur in much shorter time frames.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
</sect1>
......@@ -3051,6 +3087,10 @@ include_dir 'conf.d'
You should also consider setting <varname>hot_standby_feedback</>
on standby server(s) as an alternative to using this parameter.
</para>
<para>
This does not prevent cleanup of dead rows which have reached the age
specified by <varname>old_snapshot_threshold</>.
</para>
</listitem>
</varlistentry>
......@@ -3198,6 +3238,16 @@ include_dir 'conf.d'
until it eventually reaches the primary. Standbys make no other use
of feedback they receive other than to pass upstream.
</para>
<para>
This setting does not override the behavior of
<varname>old_snapshot_threshold</> on the primary; a snapshot on the
standby which exceeds the primary's age threshold can become invalid,
resulting in cancellation of transactions on the standby. This is
because <varname>old_snapshot_threshold</> is intended to provide an
absolute limit on the time which dead rows can contribute to bloat,
which would otherwise be violated because of the configuration of a
standby.
</para>
</listitem>
</varlistentry>
......
......@@ -135,7 +135,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
MemoryContext tupcxt = NULL;
MemoryContext oldcxt = NULL;
revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
for (;;)
{
......@@ -152,7 +152,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
/* normalize the block number to be the first block in the range */
heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
BUFFER_LOCK_SHARE);
BUFFER_LOCK_SHARE, NULL);
/* if range is unsummarized, there's nothing to do */
if (!brtup)
......@@ -285,7 +285,8 @@ brinbeginscan(Relation r, int nkeys, int norderbys)
scan = RelationGetIndexScan(r, nkeys, norderbys);
opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
scan->xs_snapshot);
opaque->bo_bdesc = brin_build_desc(r);
scan->opaque = opaque;
......@@ -368,7 +369,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
MemoryContextResetAndDeleteChildren(perRangeCxt);
tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
&off, &size, BUFFER_LOCK_SHARE);
&off, &size, BUFFER_LOCK_SHARE,
scan->xs_snapshot);
if (tup)
{
tup = brin_copy_tuple(tup, size);
......@@ -647,7 +649,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/*
* Initialize our state, including the deformed tuple state.
*/
revmap = brinRevmapInitialize(index, &pagesPerRange);
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
state = initialize_brin_buildstate(index, revmap, pagesPerRange);
/*
......@@ -1045,7 +1047,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
* the same.)
*/
phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
&offset, &phsz, BUFFER_LOCK_SHARE);
&offset, &phsz, BUFFER_LOCK_SHARE,
NULL);
/* the placeholder tuple must exist */
if (phtup == NULL)
elog(ERROR, "missing placeholder tuple");
......@@ -1080,7 +1083,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
BlockNumber pagesPerRange;
Buffer buf;
revmap = brinRevmapInitialize(index, &pagesPerRange);
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
/*
* Scan the revmap to find unsummarized items.
......@@ -1095,7 +1098,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
CHECK_FOR_INTERRUPTS();
tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
BUFFER_LOCK_SHARE);
BUFFER_LOCK_SHARE, NULL);
if (tup == NULL)
{
/* no revmap entry for this heap range. Summarize it. */
......
......@@ -68,7 +68,8 @@ static void revmap_physical_extend(BrinRevmap *revmap);
* brinRevmapTerminate when caller is done with it.
*/
BrinRevmap *
brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
Snapshot snapshot)
{
BrinRevmap *revmap;
Buffer meta;
......@@ -77,7 +78,7 @@ brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
LockBuffer(meta, BUFFER_LOCK_SHARE);
page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT);
metadata = (BrinMetaPageData *) PageGetContents(page);
revmap = palloc(sizeof(BrinRevmap));
......@@ -187,7 +188,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
*/
BrinTuple *
brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
Buffer *buf, OffsetNumber *off, Size *size, int mode)
Buffer *buf, OffsetNumber *off, Size *size, int mode,
Snapshot snapshot)
{
Relation idxRel = revmap->rm_irel;
BlockNumber mapBlk;
......@@ -264,7 +266,8 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
*buf = ReadBuffer(idxRel, blk);
}
LockBuffer(*buf, mode);
page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(*buf, snapshot, idxRel,
BGP_TEST_FOR_OLD_SNAPSHOT);
/* If we land on a revmap page, start over */
if (BRIN_IS_REGULAR_PAGE(page))
......
......@@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode)
* is share-locked, and stack->parent is NULL.
*/
GinBtreeStack *
ginFindLeafPage(GinBtree btree, bool searchMode)
ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
{
GinBtreeStack *stack;
......@@ -89,7 +89,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
stack->off = InvalidOffsetNumber;
page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);
access = ginTraverseLock(stack->buffer, searchMode);
......@@ -115,8 +116,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishSplit(btree, stack, false, NULL);
......
......@@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
{
/* search for the leaf page where the first item should go to */
btree.itemptr = insertdata.items[insertdata.curitem];
stack = ginFindLeafPage(&btree, false);
stack = ginFindLeafPage(&btree, false, NULL);
ginInsertValue(&btree, stack, &insertdata, buildStats);
}
......@@ -1830,7 +1830,8 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
* Starts a new scan on a posting tree.
*/
GinBtreeStack *
ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
Snapshot snapshot)
{
GinBtreeStack *stack;
......@@ -1838,7 +1839,7 @@ ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
btree->fullScan = TRUE;
stack = ginFindLeafPage(btree, TRUE);
stack = ginFindLeafPage(btree, TRUE, snapshot);
return stack;
}
......@@ -73,7 +73,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
Page page;
/* Descend to the leftmost leaf page */
stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
buffer = stack->buffer;
IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */
......@@ -146,7 +146,8 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
if (moveRightIfItNeeded(btree, stack) == false)
return true;
page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
/*
......@@ -320,7 +321,7 @@ restartScanEntry:
ginPrepareEntryScan(&btreeEntry, entry->attnum,
entry->queryKey, entry->queryCategory,
ginstate);
stackEntry = ginFindLeafPage(&btreeEntry, true);
stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
needUnlock = TRUE;
......@@ -385,7 +386,7 @@ restartScanEntry:
needUnlock = FALSE;
stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
rootPostingTree);
rootPostingTree, snapshot);
entry->buffer = stack->buffer;
/*
......@@ -627,7 +628,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
entry->btree.itemptr.ip_posid++;
}
entry->btree.fullScan = false;
stack = ginFindLeafPage(&entry->btree, true);
stack = ginFindLeafPage(&entry->btree, true, snapshot);
/* we don't need the stack, just the buffer. */
entry->buffer = stack->buffer;
......@@ -1335,8 +1336,8 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos)
ItemPointerSetInvalid(&pos->item);
for (;;)
{
page = BufferGetPage(pos->pendingBuffer, NULL,
NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
maxoff = PageGetMaxOffsetNumber(page);
if (pos->firstOffset > maxoff)
......@@ -1516,8 +1517,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
memset(datumExtracted + pos->firstOffset - 1, 0,
sizeof(bool) * (pos->lastOffset - pos->firstOffset));
page = BufferGetPage(pos->pendingBuffer, NULL,
NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
for (i = 0; i < so->nkeys; i++)
{
......@@ -1710,7 +1711,8 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
*ntids = 0;
LockBuffer(metabuffer, GIN_SHARE);
page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation,
BGP_TEST_FOR_OLD_SNAPSHOT);
blkno = GinPageGetMeta(page)->head;
/*
......
......@@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate,
ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
stack = ginFindLeafPage(&btree, false);
stack = ginFindLeafPage(&btree, false, NULL);
page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
if (btree.findItem(&btree, stack))
......
......@@ -336,7 +336,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(scan->indexRelation, buffer);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = GistPageGetOpaque(page);
/*
......
......@@ -278,7 +278,8 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
maxoffnum = PageGetMaxOffsetNumber(page);
for (offnum = ItemPointerGetOffsetNumber(current);
offnum <= maxoffnum;
......
......@@ -188,8 +188,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
/* Read the metapage */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
page = BufferGetPage(metabuf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(metabuf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
metap = HashPageGetMeta(page);
/*
......@@ -242,8 +242,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
/* Fetch the primary bucket page for the bucket */
buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
page = BufferGetPage(buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
......@@ -350,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
_hash_readnext(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
......@@ -391,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
_hash_readprev(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
}
......
......@@ -394,7 +394,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
dp = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(buffer, snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
......@@ -537,7 +538,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
......@@ -582,7 +583,8 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
if (!scan->rs_inited)
......@@ -616,7 +618,8 @@ heapgettup(HeapScanDesc scan,
heapgetpage(scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
......@@ -745,7 +748,8 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines;
if (backward)
......@@ -832,7 +836,8 @@ heapgettup_pagemode(HeapScanDesc scan,
lineindex = scan->rs_cindex + 1;
}
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
......@@ -875,7 +880,8 @@ heapgettup_pagemode(HeapScanDesc scan,
page = scan->rs_cblock; /* current page */
}
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
......@@ -908,7 +914,8 @@ heapgettup_pagemode(HeapScanDesc scan,
heapgetpage(scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
......@@ -1027,7 +1034,8 @@ heapgettup_pagemode(HeapScanDesc scan,
heapgetpage(scan, page);
dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
linesleft = lines;
if (backward)
......@@ -1871,7 +1879,7 @@ heap_fetch(Relation relation,
* Need share lock on buffer to examine tuple commit status.
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, snapshot, relation, BGP_TEST_FOR_OLD_SNAPSHOT);
/*
* We'd better check for out-of-range offnum in case of VACUUM since the
......@@ -2200,7 +2208,8 @@ heap_get_latest_tid(Relation relation,
*/
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, snapshot, relation,
BGP_TEST_FOR_OLD_SNAPSHOT);
/*
* Check for bogus item number. This is not treated as an error
......
......@@ -92,12 +92,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
* need to use the horizon that includes slots, otherwise the data-only
* horizon can be used. Note that the toast relation of user defined
* relations are *not* considered catalog relations.
*
* It is OK to apply the old snapshot limit before acquiring the cleanup
* lock because the worst that can happen is that we are not quite as
* aggressive about the cleanup (by however many transaction IDs are
* consumed between this point and acquiring the lock). This allows us to
* save significant overhead in the case where the page is found not to be
* prunable.
*/
if (IsCatalogRelation(relation) ||
RelationIsAccessibleInLogicalDecoding(relation))
OldestXmin = RecentGlobalXmin;
else
OldestXmin = RecentGlobalDataXmin;
OldestXmin =
TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
relation);
Assert(TransactionIdIsValid(OldestXmin));
......
......@@ -119,7 +119,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
top:
/* find the first page containing this key */
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
offset = InvalidOffsetNumber;
......@@ -135,7 +135,7 @@ top:
* precise description.
*/
buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
true, stack, BT_WRITE);
true, stack, BT_WRITE, NULL);
/*
* If we're not allowing duplicates, make sure the key isn't already in
......@@ -1682,7 +1682,8 @@ _bt_insert_parent(Relation rel,
elog(DEBUG2, "concurrent ROOT page split");
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* Find the leftmost page at the next level up */
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
NULL);
/* Set up a phony stack entry pointing there */
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
......
......@@ -1255,7 +1255,7 @@ _bt_pagedel(Relation rel, Buffer buf)
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
false, &lbuf, BT_READ);
false, &lbuf, BT_READ, NULL);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);
......
......@@ -79,6 +79,10 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
* address of the leaf-page buffer, which is read-locked and pinned.
* No locks are held on the parent pages, however!
*
* If the snapshot parameter is not NULL, "old snapshot" checking will take
* place during the descent through the tree. This is not needed when
* positioning for an insert or delete, so NULL is used for those cases.
*
* NOTE that the returned buffer is read-locked regardless of the access
* parameter. However, access = BT_WRITE will allow an empty root page
* to be created and returned. When access = BT_READ, an empty index
......@@ -87,7 +91,7 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
*/
BTStack
_bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
Buffer *bufP, int access)
Buffer *bufP, int access, Snapshot snapshot)
{
BTStack stack_in = NULL;
......@@ -126,7 +130,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
*/
*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
(access == BT_WRITE), stack_in,
BT_READ);
BT_READ, snapshot);
/* if this is a leaf page, we're done */
page = BufferGetPage(*bufP, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
......@@ -199,6 +203,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
* On entry, we have the buffer pinned and a lock of the type specified by
* 'access'. If we move right, we release the buffer and lock and acquire
* the same on the right sibling. Return value is the buffer we stop at.
*
* If the snapshot parameter is not NULL, "old snapshot" checking will take
* place during the descent through the tree. This is not needed when
* positioning for an insert or delete, so NULL is used for those cases.
*/
Buffer
_bt_moveright(Relation rel,
......@@ -208,7 +216,8 @@ _bt_moveright(Relation rel,
bool nextkey,
bool forupdate,
BTStack stack,
int access)
int access,
Snapshot snapshot)
{
Page page;
BTPageOpaque opaque;
......@@ -233,7 +242,7 @@ _bt_moveright(Relation rel,
for (;;)
{
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_RIGHTMOST(opaque))
......@@ -972,7 +981,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* Use the manufactured insertion scan key to descend the tree and
* position ourselves on the target leaf page.
*/
stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
scan->xs_snapshot);
/* don't need to keep the stack around... */
_bt_freestack(stack);
......@@ -1337,8 +1347,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* step right one page */
so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
/* check for deleted page */
page = BufferGetPage(so->currPos.buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
......@@ -1412,8 +1422,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
* it's not half-dead and contains matching tuples. Else loop back
* and do it all again.
*/
page = BufferGetPage(so->currPos.buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
......@@ -1476,7 +1486,7 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
/* check for interrupts while we're not holding any buffer lock */
CHECK_FOR_INTERRUPTS();
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
......@@ -1502,14 +1512,14 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
break;
blkno = opaque->btpo_next;
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
page = BufferGetPage(buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
/* Return to the original page to see what's up */
buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(opaque))
{
......@@ -1526,8 +1536,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
RelationGetRelationName(rel));
blkno = opaque->btpo_next;
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
page = BufferGetPage(buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_ISDELETED(opaque))
break;
......@@ -1564,7 +1574,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
* The returned buffer is pinned and read-locked.
*/
Buffer
_bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
Snapshot snapshot)
{
Buffer buf;
Page page;
......@@ -1586,7 +1597,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
if (!BufferIsValid(buf))
return InvalidBuffer;
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
for (;;)
......@@ -1605,8 +1616,8 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
page = BufferGetPage(buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
......@@ -1659,7 +1670,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
* version of _bt_search(). We don't maintain a stack since we know we
* won't need it.
*/
buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
if (!BufferIsValid(buf))
{
......
......@@ -341,7 +341,7 @@ redirect:
}
/* else new pointer points to the same page, no work needed */
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, snapshot, index, BGP_TEST_FOR_OLD_SNAPSHOT);
isnull = SpGistPageStoresNulls(page) ? true : false;
......
......@@ -489,7 +489,8 @@ vacuum_set_xid_limits(Relation rel,
* working on a particular table at any time, and that each vacuum is
* always an independent transaction.
*/
*oldestXmin = GetOldestXmin(rel, true);
*oldestXmin =
TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
Assert(TransactionIdIsNormal(*oldestXmin));
......
......@@ -1660,7 +1660,8 @@ should_attempt_truncation(LVRelStats *vacrelstats)
possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
if (possibly_freeable > 0 &&
(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
old_snapshot_threshold < 0)
return true;
else
return false;
......
......@@ -4114,3 +4114,43 @@ IssuePendingWritebacks(WritebackContext *context)
context->nr_pending = 0;
}
/*
* Check whether the given snapshot is too old to have safely read the given
* page from the given table. If so, throw a "snapshot too old" error.
*
* This test generally needs to be performed after every BufferGetPage() call
* that is executed as part of a scan. It is not needed for calls made for
* modifying the page (for example, to position to the right place to insert a
* new index tuple or for vacuuming). To minimize errors of omission, the
* BufferGetPage() macro accepts parameters to specify whether the test should
* be run, and supply the necessary snapshot and relation parameters. See the
* declaration of BufferGetPage() for more details.
*
* Note that a NULL snapshot argument is allowed and causes a fast return
* without error; this is to support call sites which can be called from
* either scans or index modification areas.
*
* For best performance, keep the tests that are fastest and/or most likely to
* exclude a page from old snapshot testing near the front.
*/
extern Page
TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page)
{
Assert(relation != NULL);
if (old_snapshot_threshold >= 0
&& (snapshot) != NULL
&& (snapshot)->satisfies == HeapTupleSatisfiesMVCC
&& !XLogRecPtrIsInvalid((snapshot)->lsn)
&& PageGetLSN(page) > (snapshot)->lsn
&& !IsCatalogRelation(relation)
&& !RelationIsAccessibleInLogicalDecoding(relation)
&& (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
ereport(ERROR,
(errcode(ERRCODE_SNAPSHOT_TOO_OLD),
errmsg("snapshot too old")));
return page;
}
......@@ -43,6 +43,7 @@
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "utils/snapmgr.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
......@@ -136,6 +137,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
......@@ -247,6 +249,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
/*
* Set up other modules that need some shared memory space
*/
SnapMgrInit();
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
......
......@@ -1759,6 +1759,15 @@ GetSnapshotData(Snapshot snapshot)
snapshot->regd_count = 0;
snapshot->copied = false;
/*
* Capture the current time and WAL stream location in case this snapshot
* becomes old enough to need to fall back on the special "old snapshot"
* logic.
*/
snapshot->lsn = GetXLogInsertRecPtr();
snapshot->whenTaken = GetSnapshotCurrentTimestamp();
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
return snapshot;
}
......
......@@ -46,3 +46,4 @@ CommitTsControlLock 38
CommitTsLock 39
ReplicationOriginLock 40
MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42
......@@ -417,6 +417,10 @@ Section: Class 58 - System Error (errors external to PostgreSQL itself)
58P01 E ERRCODE_UNDEFINED_FILE undefined_file
58P02 E ERRCODE_DUPLICATE_FILE duplicate_file
Section: Class 72 - Snapshot Failure
# (class borrowed from Oracle)
72000 E ERRCODE_SNAPSHOT_TOO_OLD snapshot_too_old
Section: Class F0 - Configuration File Error
# (PostgreSQL-specific error class)
......
......@@ -2677,6 +2677,17 @@ static struct config_int ConfigureNamesInt[] =
check_autovacuum_work_mem, NULL, NULL
},
{
{"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
gettext_noop("A value of -1 disables this feature."),
GUC_UNIT_MIN
},
&old_snapshot_threshold,
-1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60,
NULL, NULL, NULL
},
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
gettext_noop("Time between issuing TCP keepalives."),
......
......@@ -166,6 +166,8 @@
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8
#max_parallel_degree = 0 # max number of worker processes per node
#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate
# (change requires restart)
#------------------------------------------------------------------------------
......
......@@ -46,20 +46,82 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "lib/pairingheap.h"
#include "miscadmin.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/resowner_private.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
/*
* GUC parameters
*/
int old_snapshot_threshold; /* number of minutes, -1 disables */
/*
* Structure for dealing with old_snapshot_threshold implementation.
*/
typedef struct OldSnapshotControlData
{
/*
* Variables for old snapshot handling are shared among processes and are
* only allowed to move forward.
*/
slock_t mutex_current; /* protect current timestamp */
int64 current_timestamp; /* latest snapshot timestamp */
slock_t mutex_latest_xmin; /* protect latest snapshot xmin */
TransactionId latest_xmin; /* latest snapshot xmin */
slock_t mutex_threshold; /* protect threshold fields */
int64 threshold_timestamp; /* earlier snapshot is old */
TransactionId threshold_xid; /* earlier xid may be gone */
/*
* Keep one xid per minute for old snapshot error handling.
*
* Use a circular buffer with a head offset, a count of entries currently
* used, and a timestamp corresponding to the xid at the head offset. A
* count_used value of zero means that there are no times stored; a
* count_used value of old_snapshot_threshold means that the buffer is
* full and the head must be advanced to add new entries. Use timestamps
* aligned to minute boundaries, since that seems less surprising than
* aligning based on the first usage timestamp.
*
* It is OK if the xid for a given time slot is from earlier than
* calculated by adding the number of minutes corresponding to the
* (possibly wrapped) distance from the head offset to the time of the
* head entry, since that just results in the vacuuming of old tuples
* being slightly less aggressive. It would not be OK for it to be off in
* the other direction, since it might result in vacuuming tuples that are
* still expected to be there.
*
* Use of an SLRU was considered but not chosen because it is more
* heavyweight than is needed for this, and would probably not be any less
* code to implement.
*
* Persistence is not needed.
*/
int head_offset; /* subscript of oldest tracked time */
int64 head_timestamp; /* time corresponding to head xid */
int count_used; /* how many slots are in use */
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
} OldSnapshotControlData;
typedef struct OldSnapshotControlData *OldSnapshotControl;
static volatile OldSnapshotControl oldSnapshotControl;
/*
* CurrentSnapshot points to the only snapshot taken in transaction-snapshot
* mode, and to the latest one taken in a read-committed transaction.
......@@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL;
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
static int64 AlignTimestampToMinuteBoundary(int64 ts);
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
......@@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData
CommandId curcid;
} SerializedSnapshotData;
Size
SnapMgrShmemSize(void)
{
Size size;
size = offsetof(OldSnapshotControlData, xid_by_minute);
if (old_snapshot_threshold > 0)
size = add_size(size, mul_size(sizeof(TransactionId),
old_snapshot_threshold));
return size;
}
/*
* Initialize for managing old snapshot detection.
*/
void
SnapMgrInit(void)
{
bool found;
/*
* Create or attach to the OldSnapshotControl structure.
*/
oldSnapshotControl = (OldSnapshotControl)
ShmemInitStruct("OldSnapshotControlData",
SnapMgrShmemSize(), &found);
if (!found)
{
SpinLockInit(&oldSnapshotControl->mutex_current);
oldSnapshotControl->current_timestamp = 0;
SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
oldSnapshotControl->latest_xmin = InvalidTransactionId;
SpinLockInit(&oldSnapshotControl->mutex_threshold);
oldSnapshotControl->threshold_timestamp = 0;
oldSnapshotControl->threshold_xid = InvalidTransactionId;
oldSnapshotControl->head_offset = 0;
oldSnapshotControl->head_timestamp = 0;
oldSnapshotControl->count_used = 0;
}
}
/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
......@@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void)
return false;
}
/*
* Return an int64 timestamp which is exactly on a minute boundary.
*
* If the argument is already aligned, return that value, otherwise move to
* the next minute boundary following the given time.
*/
static int64
AlignTimestampToMinuteBoundary(int64 ts)
{
int64 retval = ts + (USECS_PER_MINUTE - 1);
return retval - (retval % USECS_PER_MINUTE);
}
/*
* Get current timestamp for snapshots as int64 that never moves backward.
*/
int64
GetSnapshotCurrentTimestamp(void)
{
int64 now = GetCurrentIntegerTimestamp();
/*
* Don't let time move backward; if it hasn't advanced, use the old value.
*/
SpinLockAcquire(&oldSnapshotControl->mutex_current);
if (now <= oldSnapshotControl->current_timestamp)
now = oldSnapshotControl->current_timestamp;
else
oldSnapshotControl->current_timestamp = now;
SpinLockRelease(&oldSnapshotControl->mutex_current);
return now;
}
/*
* Get timestamp through which vacuum may have processed based on last stored
* value for threshold_timestamp.
*
* XXX: So far, we never trust that a 64-bit value can be read atomically; if
* that ever changes, we could get rid of the spinlock here.
*/
int64
GetOldSnapshotThresholdTimestamp(void)
{
int64 threshold_timestamp;
SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
threshold_timestamp = oldSnapshotControl->threshold_timestamp;
SpinLockRelease(&oldSnapshotControl->mutex_threshold);
return threshold_timestamp;
}
static void
SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
{
SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
oldSnapshotControl->threshold_timestamp = ts;
oldSnapshotControl->threshold_xid = xlimit;
SpinLockRelease(&oldSnapshotControl->mutex_threshold);
}
/*
* TransactionIdLimitedForOldSnapshots
*
* Apply old snapshot limit, if any. This is intended to be called for page
* pruning and table vacuuming, to allow old_snapshot_threshold to override
* the normal global xmin value. Actual testing for snapshot too old will be
* based on whether a snapshot timestamp is prior to the threshold timestamp
* set in this function.
*/
TransactionId
TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
Relation relation)
{
if (TransactionIdIsNormal(recentXmin)
&& old_snapshot_threshold >= 0
&& RelationNeedsWAL(relation)
&& !IsCatalogRelation(relation)
&& !RelationIsAccessibleInLogicalDecoding(relation))
{
int64 ts = GetSnapshotCurrentTimestamp();
TransactionId xlimit = recentXmin;
TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
bool same_ts_as_threshold = false;
/*
* Zero threshold always overrides to latest xmin, if valid. Without
* some heuristic it will find its own snapshot too old on, for
* example, a simple UPDATE -- which would make it useless for most
* testing, but there is no principled way to ensure that it doesn't
* fail in this way. Use a five-second delay to try to get useful
* testing behavior, but this may need adjustment.
*/
if (old_snapshot_threshold == 0)
{
if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
&& TransactionIdFollows(latest_xmin, xlimit))
xlimit = latest_xmin;
ts -= 5 * USECS_PER_SEC;
SetOldSnapshotThresholdTimestamp(ts, xlimit);
return xlimit;
}
ts = AlignTimestampToMinuteBoundary(ts)
- (old_snapshot_threshold * USECS_PER_MINUTE);
/* Check for fast exit without LW locking. */
SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
if (ts == oldSnapshotControl->threshold_timestamp)
{
xlimit = oldSnapshotControl->threshold_xid;
same_ts_as_threshold = true;
}
SpinLockRelease(&oldSnapshotControl->mutex_threshold);
if (!same_ts_as_threshold)
{
LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
if (oldSnapshotControl->count_used > 0
&& ts >= oldSnapshotControl->head_timestamp)
{
int offset;
offset = ((ts - oldSnapshotControl->head_timestamp)
/ USECS_PER_MINUTE);
if (offset > oldSnapshotControl->count_used - 1)
offset = oldSnapshotControl->count_used - 1;
offset = (oldSnapshotControl->head_offset + offset)
% old_snapshot_threshold;
xlimit = oldSnapshotControl->xid_by_minute[offset];
if (NormalTransactionIdFollows(xlimit, recentXmin))
SetOldSnapshotThresholdTimestamp(ts, xlimit);
}
LWLockRelease(OldSnapshotTimeMapLock);
}
/*
* Failsafe protection against vacuuming work of active transaction.
*
* This is not an assertion because we avoid the spinlock for
* performance, leaving open the possibility that xlimit could advance
* and be more current; but it seems prudent to apply this limit. It
* might make pruning a tiny bit less agressive than it could be, but
* protects against data loss bugs.
*/
if (TransactionIdIsNormal(latest_xmin)
&& TransactionIdPrecedes(latest_xmin, xlimit))
xlimit = latest_xmin;
if (NormalTransactionIdFollows(xlimit, recentXmin))
return xlimit;
}
return recentXmin;
}
/*
* Take care of the circular buffer that maps time to xid.
*/
void
MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
{
int64 ts;
/* Fast exit when old_snapshot_threshold is not used. */
if (old_snapshot_threshold < 0)
return;
/* Keep track of the latest xmin seen by any process. */
SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
oldSnapshotControl->latest_xmin = xmin;
SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
/* No further tracking needed for 0 (used for testing). */
if (old_snapshot_threshold == 0)
return;
/*
* We don't want to do something stupid with unusual values, but we don't
* want to litter the log with warnings or break otherwise normal
* processing for this feature; so if something seems unreasonable, just
* log at DEBUG level and return without doing anything.
*/
if (whenTaken < 0)
{
elog(DEBUG1,
"MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
(long) whenTaken);
return;
}
if (!TransactionIdIsNormal(xmin))
{
elog(DEBUG1,
"MaintainOldSnapshotTimeMapping called with xmin = %lu",
(unsigned long) xmin);
return;
}
ts = AlignTimestampToMinuteBoundary(whenTaken);
LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
Assert(oldSnapshotControl->head_offset >= 0);
Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
Assert(oldSnapshotControl->count_used >= 0);
Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
if (oldSnapshotControl->count_used == 0)
{
/* set up first entry for empty mapping */
oldSnapshotControl->head_offset = 0;
oldSnapshotControl->head_timestamp = ts;
oldSnapshotControl->count_used = 1;
oldSnapshotControl->xid_by_minute[0] = xmin;
}
else if (ts < oldSnapshotControl->head_timestamp)
{
/* old ts; log it at DEBUG */
LWLockRelease(OldSnapshotTimeMapLock);
elog(DEBUG1,
"MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
(long) whenTaken);
return;
}
else if (ts <= (oldSnapshotControl->head_timestamp +
((oldSnapshotControl->count_used - 1)
* USECS_PER_MINUTE)))
{
/* existing mapping; advance xid if possible */
int bucket = (oldSnapshotControl->head_offset
+ ((ts - oldSnapshotControl->head_timestamp)
/ USECS_PER_MINUTE))
% old_snapshot_threshold;
if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
oldSnapshotControl->xid_by_minute[bucket] = xmin;
}
else
{
/* We need a new bucket, but it might not be the very next one. */
int advance = ((ts - oldSnapshotControl->head_timestamp)
/ USECS_PER_MINUTE);
oldSnapshotControl->head_timestamp = ts;
if (advance >= old_snapshot_threshold)
{
/* Advance is so far that all old data is junk; start over. */
oldSnapshotControl->head_offset = 0;
oldSnapshotControl->count_used = 1;
oldSnapshotControl->xid_by_minute[0] = xmin;
}
else
{
/* Store the new value in one or more buckets. */
int i;
for (i = 0; i < advance; i++)
{
if (oldSnapshotControl->count_used == old_snapshot_threshold)
{
/* Map full and new value replaces old head. */
int old_head = oldSnapshotControl->head_offset;
if (old_head == (old_snapshot_threshold - 1))
oldSnapshotControl->head_offset = 0;
else
oldSnapshotControl->head_offset = old_head + 1;
oldSnapshotControl->xid_by_minute[old_head] = xmin;
}
else
{
/* Extend map to unused entry. */
int new_tail = (oldSnapshotControl->head_offset
+ oldSnapshotControl->count_used)
% old_snapshot_threshold;
oldSnapshotControl->count_used++;
oldSnapshotControl->xid_by_minute[new_tail] = xmin;
}
}
}
}
LWLockRelease(OldSnapshotTimeMapLock);
}
/*
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
* access to behave just like it did at a certain point in the past.
......
......@@ -18,12 +18,13 @@
#include "storage/itemptr.h"
#include "storage/off.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
/* struct definition lives in brin_revmap.c */
typedef struct BrinRevmap BrinRevmap;
extern BrinRevmap *brinRevmapInitialize(Relation idxrel,
BlockNumber *pagesPerRange);
BlockNumber *pagesPerRange, Snapshot snapshot);
extern void brinRevmapTerminate(BrinRevmap *revmap);
extern void brinRevmapExtend(BrinRevmap *revmap,
......@@ -34,6 +35,6 @@ extern void brinSetHeapBlockItemptr(Buffer rmbuf, BlockNumber pagesPerRange,
BlockNumber heapBlk, ItemPointerData tid);
extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap,
BlockNumber heapBlk, Buffer *buf, OffsetNumber *off,
Size *size, int mode);
Size *size, int mode, Snapshot snapshot);
#endif /* BRIN_REVMAP_H */
......@@ -703,7 +703,7 @@ typedef struct
* PostingItem
*/
extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot);
extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
......@@ -731,7 +731,7 @@ extern void GinPageDeletePostingItem(Page page, OffsetNumber offset);
extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats);
extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno);
extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, Snapshot snapshot);
extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
......
......@@ -710,17 +710,18 @@ extern int _bt_pagedel(Relation rel, Buffer buf);
*/
extern BTStack _bt_search(Relation rel,
int keysz, ScanKey scankey, bool nextkey,
Buffer *bufP, int access);
Buffer *bufP, int access, Snapshot snapshot);
extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
int access);
int access, Snapshot snapshot);
extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey);
extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
Page page, OffsetNumber offnum);
extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
Snapshot snapshot);
/*
* prototypes for functions in nbtutils.c
......
......@@ -180,11 +180,26 @@ extern PGDLLIMPORT int32 *LocalRefCount;
/*
* BufferGetPage
* Returns the page associated with a buffer.
*
* agetest will normally be a literal, so use a macro at the outer level to
* give the compiler a chance to optimize away the runtime code to check it.
*
* TestForOldSnapshot(), if it doesn't throw an error, will return the page
* argument it is passed, so the same result will go back to this macro's
* caller for either agetest value; it is a matter of whether to call the
* function to perform the test. For call sites where the check is not needed
* (which is the vast majority of them), the snapshot and relation parameters
* can, and generally should, be NULL.
*/
#define BufferGetPage(buffer, snapshot, relation, agetest) \
( \
AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST), \
((Page)BufferGetBlock(buffer)) \
( \
AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST || (agetest) == BGP_TEST_FOR_OLD_SNAPSHOT), \
((agetest) == BGP_NO_SNAPSHOT_TEST) \
) ? \
((Page)BufferGetBlock(buffer)) \
: \
(TestForOldSnapshot(snapshot, relation, (Page)BufferGetBlock(buffer))) \
)
/*
......
......@@ -15,6 +15,7 @@
#define REL_H
#include "access/tupdesc.h"
#include "access/xlog.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "fmgr.h"
......
......@@ -14,10 +14,20 @@
#define SNAPMGR_H
#include "fmgr.h"
#include "utils/relcache.h"
#include "utils/resowner.h"
#include "utils/snapshot.h"
/* GUC variables */
extern int old_snapshot_threshold;
extern Size SnapMgrShmemSize(void);
extern void SnapMgrInit(void);
extern int64 GetSnapshotCurrentTimestamp(void);
extern int64 GetOldSnapshotThresholdTimestamp(void);
extern bool FirstSnapshotSet;
extern TransactionId TransactionXmin;
......@@ -54,6 +64,9 @@ extern void ImportSnapshot(const char *idstr);
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
extern bool ThereAreNoPriorRegisteredSnapshots(void);
extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
Relation relation);
extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
extern char *ExportSnapshot(Snapshot snapshot);
......
......@@ -14,6 +14,7 @@
#define SNAPSHOT_H
#include "access/htup.h"
#include "access/xlogdefs.h"
#include "lib/pairingheap.h"
#include "storage/buf.h"
......@@ -105,6 +106,9 @@ typedef struct SnapshotData
uint32 active_count; /* refcount on ActiveSnapshot stack */
uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
int64 whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
/*
......
......@@ -8,6 +8,7 @@ SUBDIRS = \
brin \
commit_ts \
dummy_seclabel \
snapshot_too_old \
test_ddl_deparse \
test_extensions \
test_parser \
......
# src/test/modules/snapshot_too_old/Makefile
EXTRA_CLEAN = ./isolation_output
ISOLATIONCHECKS=sto_using_cursor sto_using_select
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/snapshot_too_old
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
# Disabled because these tests require "old_snapshot_threshold" >= 0, which
# typical installcheck users do not have (e.g. buildfarm clients).
installcheck:;
# But it can nonetheless be very helpful to run tests on preexisting
# installation, allow to do so, but only if requested explicitly.
installcheck-force: isolationcheck-install-force
check: isolationcheck
submake-isolation:
$(MAKE) -C $(top_builddir)/src/test/isolation all
submake-test_snapshot_too_old:
$(MAKE) -C $(top_builddir)/src/test/modules/snapshot_too_old
isolationcheck: | submake-isolation submake-test_snapshot_too_old temp-install
$(MKDIR_P) isolation_output
$(pg_isolation_regress_check) \
--temp-config $(top_srcdir)/src/test/modules/snapshot_too_old/sto.conf \
--outputdir=./isolation_output \
$(ISOLATIONCHECKS)
isolationcheck-install-force: all | submake-isolation submake-test_snapshot_too_old temp-install
$(pg_isolation_regress_installcheck) \
$(ISOLATIONCHECKS)
.PHONY: check submake-test_snapshot_too_old isolationcheck isolationcheck-install-force
temp-install: EXTRA_INSTALL=src/test/modules/snapshot_too_old
Parsed test spec with 2 sessions
starting permutation: s1decl s1f1 s1sleep s1f2 s2u
step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
step s1f1: FETCH FIRST FROM cursor1;
c
1
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: FETCH FIRST FROM cursor1;
c
1
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
starting permutation: s1decl s1f1 s1sleep s2u s1f2
step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
step s1f1: FETCH FIRST FROM cursor1;
c
1
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1f2: FETCH FIRST FROM cursor1;
ERROR: snapshot too old
starting permutation: s1decl s1f1 s2u s1sleep s1f2
step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
step s1f1: FETCH FIRST FROM cursor1;
c
1
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: FETCH FIRST FROM cursor1;
ERROR: snapshot too old
starting permutation: s1decl s2u s1f1 s1sleep s1f2
step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1f1: FETCH FIRST FROM cursor1;
c
1
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: FETCH FIRST FROM cursor1;
ERROR: snapshot too old
starting permutation: s2u s1decl s1f1 s1sleep s1f2
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
step s1f1: FETCH FIRST FROM cursor1;
c
2
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: FETCH FIRST FROM cursor1;
ERROR: snapshot too old
Parsed test spec with 2 sessions
starting permutation: s1f1 s1sleep s1f2 s2u
step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
c
1
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
c
1
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
starting permutation: s1f1 s1sleep s2u s1f2
step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
c
1
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
ERROR: snapshot too old
starting permutation: s1f1 s2u s1sleep s1f2
step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
c
1
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
ERROR: snapshot too old
starting permutation: s2u s1f1 s1sleep s1f2
step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
c
2
step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
setting pg_sleep
0
step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
ERROR: snapshot too old
# This test provokes a "snapshot too old" error using a cursor.
#
# The sleep is needed because with a threshold of zero a statement could error
# on changes it made. With more normal settings no external delay is needed,
# but we don't want these tests to run long enough to see that, since
# granularity is in minutes.
#
# Since results depend on the value of old_snapshot_threshold, sneak that into
# the line generated by the sleep, so that a surprising values isn't so hard
# to identify.
setup
{
CREATE TABLE sto1 (c int NOT NULL);
INSERT INTO sto1 SELECT generate_series(1, 1000);
CREATE TABLE sto2 (c int NOT NULL);
}
setup
{
VACUUM ANALYZE sto1;
}
teardown
{
DROP TABLE sto1, sto2;
}
session "s1"
setup { BEGIN ISOLATION LEVEL REPEATABLE READ; }
step "s1decl" { DECLARE cursor1 CURSOR FOR SELECT c FROM sto1; }
step "s1f1" { FETCH FIRST FROM cursor1; }
step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
step "s1f2" { FETCH FIRST FROM cursor1; }
teardown { COMMIT; }
session "s2"
step "s2u" { UPDATE sto1 SET c = 1001 WHERE c = 1; }
# This test provokes a "snapshot too old" error using SELECT statements.
#
# The sleep is needed because with a threshold of zero a statement could error
# on changes it made. With more normal settings no external delay is needed,
# but we don't want these tests to run long enough to see that, since
# granularity is in minutes.
#
# Since results depend on the value of old_snapshot_threshold, sneak that into
# the line generated by the sleep, so that a surprising values isn't so hard
# to identify.
setup
{
CREATE TABLE sto1 (c int NOT NULL);
INSERT INTO sto1 SELECT generate_series(1, 1000);
CREATE TABLE sto2 (c int NOT NULL);
}
setup
{
VACUUM ANALYZE sto1;
}
teardown
{
DROP TABLE sto1, sto2;
}
session "s1"
setup { BEGIN ISOLATION LEVEL REPEATABLE READ; }
step "s1f1" { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
step "s1f2" { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
teardown { COMMIT; }
session "s2"
step "s2u" { UPDATE sto1 SET c = 1001 WHERE c = 1; }
autovacuum = off
old_snapshot_threshold = 0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment