Commit 4da99ea4 authored by Robert Haas's avatar Robert Haas

Avoid having two copies of the HOT-chain search logic.

It's been like this since HOT was originally introduced, but the logic
is complex enough that this is a recipe for bugs, as we've already
found out with SSI.  So refactor heap_hot_search_buffer() so that it
can satisfy the needs of index_getnext(), and make index_getnext() use
that rather than duplicating the logic.

This change was originally proposed by Heikki Linnakangas as part of a
larger refactoring oriented towards allowing index-only scans.  I
extracted and adjusted this part, since it seems to have independent
merit.  Review by Jeff Davis.
parent 8c8745b2
...@@ -1514,6 +1514,10 @@ heap_fetch(Relation relation, ...@@ -1514,6 +1514,10 @@ heap_fetch(Relation relation,
* found, we update *tid to reference that tuple's offset number, and * found, we update *tid to reference that tuple's offset number, and
* return TRUE. If no match, return FALSE without modifying *tid. * return TRUE. If no match, return FALSE without modifying *tid.
* *
* heapTuple is a caller-supplied buffer. When a match is found, we return
* the tuple here, in addition to updating *tid. If no match is found, the
* contents of this buffer on return are undefined.
*
* If all_dead is not NULL, we check non-visible tuples to see if they are * If all_dead is not NULL, we check non-visible tuples to see if they are
* globally dead; *all_dead is set TRUE if all members of the HOT chain * globally dead; *all_dead is set TRUE if all members of the HOT chain
* are vacuumable, FALSE if not. * are vacuumable, FALSE if not.
...@@ -1524,28 +1528,31 @@ heap_fetch(Relation relation, ...@@ -1524,28 +1528,31 @@ heap_fetch(Relation relation,
*/ */
bool bool
heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
Snapshot snapshot, bool *all_dead) Snapshot snapshot, HeapTuple heapTuple,
bool *all_dead, bool first_call)
{ {
Page dp = (Page) BufferGetPage(buffer); Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId; TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum; OffsetNumber offnum;
bool at_chain_start; bool at_chain_start;
bool valid; bool valid;
bool skip;
/* If this is not the first call, previous call returned a (live!) tuple */
if (all_dead) if (all_dead)
*all_dead = true; *all_dead = first_call;
Assert(TransactionIdIsValid(RecentGlobalXmin)); Assert(TransactionIdIsValid(RecentGlobalXmin));
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer)); Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid); offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true; at_chain_start = first_call;
skip = !first_call;
/* Scan through possible multiple members of HOT-chain */ /* Scan through possible multiple members of HOT-chain */
for (;;) for (;;)
{ {
ItemId lp; ItemId lp;
HeapTupleData heapTuple;
/* check for bogus TID */ /* check for bogus TID */
if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp)) if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
...@@ -1568,15 +1575,15 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1568,15 +1575,15 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
break; break;
} }
heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp); heapTuple->t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple.t_len = ItemIdGetLength(lp); heapTuple->t_len = ItemIdGetLength(lp);
heapTuple.t_tableOid = relation->rd_id; heapTuple->t_tableOid = relation->rd_id;
heapTuple.t_self = *tid; heapTuple->t_self = *tid;
/* /*
* Shouldn't see a HEAP_ONLY tuple at chain start. * Shouldn't see a HEAP_ONLY tuple at chain start.
*/ */
if (at_chain_start && HeapTupleIsHeapOnly(&heapTuple)) if (at_chain_start && HeapTupleIsHeapOnly(heapTuple))
break; break;
/* /*
...@@ -1585,21 +1592,32 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1585,21 +1592,32 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
*/ */
if (TransactionIdIsValid(prev_xmax) && if (TransactionIdIsValid(prev_xmax) &&
!TransactionIdEquals(prev_xmax, !TransactionIdEquals(prev_xmax,
HeapTupleHeaderGetXmin(heapTuple.t_data))) HeapTupleHeaderGetXmin(heapTuple->t_data)))
break; break;
/*
* When first_call is true (and thus, skip is initally false) we'll
* return the first tuple we find. But on later passes, heapTuple
* will initially be pointing to the tuple we returned last time.
* Returning it again would be incorrect (and would loop forever),
* so we skip it and return the next match we find.
*/
if (!skip)
{
/* If it's visible per the snapshot, we must return it */ /* If it's visible per the snapshot, we must return it */
valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer, CheckForSerializableConflictOut(valid, relation, heapTuple,
snapshot); buffer, snapshot);
if (valid) if (valid)
{ {
ItemPointerSetOffsetNumber(tid, offnum); ItemPointerSetOffsetNumber(tid, offnum);
PredicateLockTuple(relation, &heapTuple, snapshot); PredicateLockTuple(relation, heapTuple, snapshot);
if (all_dead) if (all_dead)
*all_dead = false; *all_dead = false;
return true; return true;
} }
}
skip = false;
/* /*
* If we can't see it, maybe no one else can either. At caller * If we can't see it, maybe no one else can either. At caller
...@@ -1607,7 +1625,7 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1607,7 +1625,7 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
* transactions. * transactions.
*/ */
if (all_dead && *all_dead && if (all_dead && *all_dead &&
HeapTupleSatisfiesVacuum(heapTuple.t_data, RecentGlobalXmin, HeapTupleSatisfiesVacuum(heapTuple->t_data, RecentGlobalXmin,
buffer) != HEAPTUPLE_DEAD) buffer) != HEAPTUPLE_DEAD)
*all_dead = false; *all_dead = false;
...@@ -1615,13 +1633,13 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1615,13 +1633,13 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
* Check to see if HOT chain continues past this tuple; if so fetch * Check to see if HOT chain continues past this tuple; if so fetch
* the next offnum and loop around. * the next offnum and loop around.
*/ */
if (HeapTupleIsHotUpdated(&heapTuple)) if (HeapTupleIsHotUpdated(heapTuple))
{ {
Assert(ItemPointerGetBlockNumber(&heapTuple.t_data->t_ctid) == Assert(ItemPointerGetBlockNumber(&heapTuple->t_data->t_ctid) ==
ItemPointerGetBlockNumber(tid)); ItemPointerGetBlockNumber(tid));
offnum = ItemPointerGetOffsetNumber(&heapTuple.t_data->t_ctid); offnum = ItemPointerGetOffsetNumber(&heapTuple->t_data->t_ctid);
at_chain_start = false; at_chain_start = false;
prev_xmax = HeapTupleHeaderGetXmax(heapTuple.t_data); prev_xmax = HeapTupleHeaderGetXmax(heapTuple->t_data);
} }
else else
break; /* end of chain */ break; /* end of chain */
...@@ -1643,10 +1661,12 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, ...@@ -1643,10 +1661,12 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
{ {
bool result; bool result;
Buffer buffer; Buffer buffer;
HeapTupleData heapTuple;
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE); LockBuffer(buffer, BUFFER_LOCK_SHARE);
result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead); result = heap_hot_search_buffer(tid, relation, buffer, snapshot,
&heapTuple, all_dead, true);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
return result; return result;
......
...@@ -113,9 +113,7 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys) ...@@ -113,9 +113,7 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
ItemPointerSetInvalid(&scan->xs_ctup.t_self); ItemPointerSetInvalid(&scan->xs_ctup.t_self);
scan->xs_ctup.t_data = NULL; scan->xs_ctup.t_data = NULL;
scan->xs_cbuf = InvalidBuffer; scan->xs_cbuf = InvalidBuffer;
scan->xs_hot_dead = false; scan->xs_continue_hot = false;
scan->xs_next_hot = InvalidOffsetNumber;
scan->xs_prev_xmax = InvalidTransactionId;
return scan; return scan;
} }
......
...@@ -335,7 +335,7 @@ index_rescan(IndexScanDesc scan, ...@@ -335,7 +335,7 @@ index_rescan(IndexScanDesc scan,
scan->xs_cbuf = InvalidBuffer; scan->xs_cbuf = InvalidBuffer;
} }
scan->xs_next_hot = InvalidOffsetNumber; scan->xs_continue_hot = false;
scan->kill_prior_tuple = false; /* for safety */ scan->kill_prior_tuple = false; /* for safety */
...@@ -417,7 +417,7 @@ index_restrpos(IndexScanDesc scan) ...@@ -417,7 +417,7 @@ index_restrpos(IndexScanDesc scan)
SCAN_CHECKS; SCAN_CHECKS;
GET_SCAN_PROCEDURE(amrestrpos); GET_SCAN_PROCEDURE(amrestrpos);
scan->xs_next_hot = InvalidOffsetNumber; scan->xs_continue_hot = false;
scan->kill_prior_tuple = false; /* for safety */ scan->kill_prior_tuple = false; /* for safety */
...@@ -443,26 +443,18 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -443,26 +443,18 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
HeapTuple heapTuple = &scan->xs_ctup; HeapTuple heapTuple = &scan->xs_ctup;
ItemPointer tid = &heapTuple->t_self; ItemPointer tid = &heapTuple->t_self;
FmgrInfo *procedure; FmgrInfo *procedure;
bool all_dead = false;
SCAN_CHECKS; SCAN_CHECKS;
GET_SCAN_PROCEDURE(amgettuple); GET_SCAN_PROCEDURE(amgettuple);
Assert(TransactionIdIsValid(RecentGlobalXmin)); Assert(TransactionIdIsValid(RecentGlobalXmin));
/*
* We always reset xs_hot_dead; if we are here then either we are just
* starting the scan, or we previously returned a visible tuple, and in
* either case it's inappropriate to kill the prior index entry.
*/
scan->xs_hot_dead = false;
for (;;) for (;;)
{ {
OffsetNumber offnum; bool got_heap_tuple;
bool at_chain_start;
Page dp;
if (scan->xs_next_hot != InvalidOffsetNumber) if (scan->xs_continue_hot)
{ {
/* /*
* We are resuming scan of a HOT chain after having returned an * We are resuming scan of a HOT chain after having returned an
...@@ -471,10 +463,6 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -471,10 +463,6 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
Assert(BufferIsValid(scan->xs_cbuf)); Assert(BufferIsValid(scan->xs_cbuf));
Assert(ItemPointerGetBlockNumber(tid) == Assert(ItemPointerGetBlockNumber(tid) ==
BufferGetBlockNumber(scan->xs_cbuf)); BufferGetBlockNumber(scan->xs_cbuf));
Assert(TransactionIdIsValid(scan->xs_prev_xmax));
offnum = scan->xs_next_hot;
at_chain_start = false;
scan->xs_next_hot = InvalidOffsetNumber;
} }
else else
{ {
...@@ -488,7 +476,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -488,7 +476,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
* comments in RelationGetIndexScan(). * comments in RelationGetIndexScan().
*/ */
if (!scan->xactStartedInRecovery) if (!scan->xactStartedInRecovery)
scan->kill_prior_tuple = scan->xs_hot_dead; scan->kill_prior_tuple = all_dead;
/* /*
* The AM's gettuple proc finds the next index entry matching the * The AM's gettuple proc finds the next index entry matching the
...@@ -521,151 +509,31 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -521,151 +509,31 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
if (prev_buf != scan->xs_cbuf) if (prev_buf != scan->xs_cbuf)
heap_page_prune_opt(scan->heapRelation, scan->xs_cbuf, heap_page_prune_opt(scan->heapRelation, scan->xs_cbuf,
RecentGlobalXmin); RecentGlobalXmin);
/* Prepare to scan HOT chain starting at index-referenced offnum */
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true;
/* We don't know what the first tuple's xmin should be */
scan->xs_prev_xmax = InvalidTransactionId;
/* Initialize flag to detect if all entries are dead */
scan->xs_hot_dead = true;
} }
/* Obtain share-lock on the buffer so we can examine visibility */ /* Obtain share-lock on the buffer so we can examine visibility */
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE); LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
got_heap_tuple = heap_hot_search_buffer(tid, scan->heapRelation,
scan->xs_cbuf,
scan->xs_snapshot,
&scan->xs_ctup,
&all_dead,
!scan->xs_continue_hot);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
dp = (Page) BufferGetPage(scan->xs_cbuf); if (got_heap_tuple)
/* Scan through possible multiple members of HOT-chain */
for (;;)
{
ItemId lp;
ItemPointer ctid;
bool valid;
/* check for bogus TID */
if (offnum < FirstOffsetNumber ||
offnum > PageGetMaxOffsetNumber(dp))
break;
lp = PageGetItemId(dp, offnum);
/* check for unused, dead, or redirected items */
if (!ItemIdIsNormal(lp))
{
/* We should only see a redirect at start of chain */
if (ItemIdIsRedirected(lp) && at_chain_start)
{
/* Follow the redirect */
offnum = ItemIdGetRedirect(lp);
at_chain_start = false;
continue;
}
/* else must be end of chain */
break;
}
/*
* We must initialize all of *heapTuple (ie, scan->xs_ctup) since
* it is returned to the executor on success.
*/
heapTuple->t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple->t_len = ItemIdGetLength(lp);
ItemPointerSetOffsetNumber(tid, offnum);
heapTuple->t_tableOid = RelationGetRelid(scan->heapRelation);
ctid = &heapTuple->t_data->t_ctid;
/*
* Shouldn't see a HEAP_ONLY tuple at chain start. (This test
* should be unnecessary, since the chain root can't be removed
* while we have pin on the index entry, but let's make it
* anyway.)
*/
if (at_chain_start && HeapTupleIsHeapOnly(heapTuple))
break;
/*
* The xmin should match the previous xmax value, else chain is
* broken. (Note: this test is not optional because it protects
* us against the case where the prior chain member's xmax aborted
* since we looked at it.)
*/
if (TransactionIdIsValid(scan->xs_prev_xmax) &&
!TransactionIdEquals(scan->xs_prev_xmax,
HeapTupleHeaderGetXmin(heapTuple->t_data)))
break;
/* If it's visible per the snapshot, we must return it */
valid = HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
scan->xs_cbuf);
CheckForSerializableConflictOut(valid, scan->heapRelation,
heapTuple, scan->xs_cbuf,
scan->xs_snapshot);
if (valid)
{ {
/* /*
* If the snapshot is MVCC, we know that it could accept at * Only in a non-MVCC snapshot can more than one member of the
* most one member of the HOT chain, so we can skip examining * HOT chain be visible.
* any more members. Otherwise, check for continuation of the
* HOT-chain, and set state for next time.
*/ */
if (IsMVCCSnapshot(scan->xs_snapshot)) scan->xs_continue_hot = !IsMVCCSnapshot(scan->xs_snapshot);
scan->xs_next_hot = InvalidOffsetNumber;
else if (HeapTupleIsHotUpdated(heapTuple))
{
Assert(ItemPointerGetBlockNumber(ctid) ==
ItemPointerGetBlockNumber(tid));
scan->xs_next_hot = ItemPointerGetOffsetNumber(ctid);
scan->xs_prev_xmax = HeapTupleHeaderGetXmax(heapTuple->t_data);
}
else
scan->xs_next_hot = InvalidOffsetNumber;
PredicateLockTuple(scan->heapRelation, heapTuple, scan->xs_snapshot);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
pgstat_count_heap_fetch(scan->indexRelation); pgstat_count_heap_fetch(scan->indexRelation);
return heapTuple; return heapTuple;
} }
/*
* If we can't see it, maybe no one else can either. Check to see
* if the tuple is dead to all transactions. If we find that all
* the tuples in the HOT chain are dead, we'll signal the index AM
* to not return that TID on future indexscans.
*/
if (scan->xs_hot_dead &&
HeapTupleSatisfiesVacuum(heapTuple->t_data, RecentGlobalXmin,
scan->xs_cbuf) != HEAPTUPLE_DEAD)
scan->xs_hot_dead = false;
/*
* Check to see if HOT chain continues past this tuple; if so
* fetch the next offnum (we don't bother storing it into
* xs_next_hot, but must store xs_prev_xmax), and loop around.
*/
if (HeapTupleIsHotUpdated(heapTuple))
{
Assert(ItemPointerGetBlockNumber(ctid) ==
ItemPointerGetBlockNumber(tid));
offnum = ItemPointerGetOffsetNumber(ctid);
at_chain_start = false;
scan->xs_prev_xmax = HeapTupleHeaderGetXmax(heapTuple->t_data);
}
else
break; /* end of chain */
} /* loop over a single HOT chain */
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
/* Loop around to ask index AM for another TID */ /* Loop around to ask index AM for another TID */
scan->xs_next_hot = InvalidOffsetNumber; scan->xs_continue_hot = false;
} }
/* Release any held pin on a heap page */ /* Release any held pin on a heap page */
......
...@@ -349,9 +349,11 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres) ...@@ -349,9 +349,11 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
{ {
OffsetNumber offnum = tbmres->offsets[curslot]; OffsetNumber offnum = tbmres->offsets[curslot];
ItemPointerData tid; ItemPointerData tid;
HeapTupleData heapTuple;
ItemPointerSet(&tid, page, offnum); ItemPointerSet(&tid, page, offnum);
if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, NULL)) if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot,
&heapTuple, NULL, true))
scan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid); scan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid);
} }
} }
......
...@@ -83,7 +83,8 @@ extern bool heap_fetch(Relation relation, Snapshot snapshot, ...@@ -83,7 +83,8 @@ extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf, HeapTuple tuple, Buffer *userbuf, bool keep_buf,
Relation stats_relation); Relation stats_relation);
extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation, extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
Buffer buffer, Snapshot snapshot, bool *all_dead); Buffer buffer, Snapshot snapshot, HeapTuple heapTuple,
bool *all_dead, bool first_call);
extern bool heap_hot_search(ItemPointer tid, Relation relation, extern bool heap_hot_search(ItemPointer tid, Relation relation,
Snapshot snapshot, bool *all_dead); Snapshot snapshot, bool *all_dead);
......
...@@ -84,9 +84,7 @@ typedef struct IndexScanDescData ...@@ -84,9 +84,7 @@ typedef struct IndexScanDescData
bool xs_recheck; /* T means scan keys must be rechecked */ bool xs_recheck; /* T means scan keys must be rechecked */
/* state data for traversing HOT chains in index_getnext */ /* state data for traversing HOT chains in index_getnext */
bool xs_hot_dead; /* T if all members of HOT chain are dead */ bool xs_continue_hot; /* T if must keep walking HOT chain */
OffsetNumber xs_next_hot; /* next member of HOT chain, if any */
TransactionId xs_prev_xmax; /* previous HOT chain member's XMAX, if any */
} IndexScanDescData; } IndexScanDescData;
/* Struct for heap-or-index scans of system tables */ /* Struct for heap-or-index scans of system tables */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment