Commit 9a8ee1dc authored by Andres Freund's avatar Andres Freund

tableam: Add and use table_fetch_row_version().

This is essentially the tableam version of heapam_fetch(),
i.e. fetching a tuple identified by a tid, performing visibility
checks.

Note that this different from table_index_fetch_tuple(), which is for
index lookups. It therefore has to handle a tid pointing to an earlier
version of a tuple if the AM uses an optimization like heap's HOT. Add
comments to that end.

This commit removes the stats_relation argument from heap_fetch, as
it's been unused for a long time.

Author: Andres Freund
Reviewed-By: Haribabu Kommi
Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
parent c77e1220
...@@ -1388,8 +1388,7 @@ bool ...@@ -1388,8 +1388,7 @@ bool
heap_fetch(Relation relation, heap_fetch(Relation relation,
Snapshot snapshot, Snapshot snapshot,
HeapTuple tuple, HeapTuple tuple,
Buffer *userbuf, Buffer *userbuf)
Relation stats_relation)
{ {
ItemPointer tid = &(tuple->t_self); ItemPointer tid = &(tuple->t_self);
ItemId lp; ItemId lp;
...@@ -1468,10 +1467,6 @@ heap_fetch(Relation relation, ...@@ -1468,10 +1467,6 @@ heap_fetch(Relation relation,
*/ */
*userbuf = buffer; *userbuf = buffer;
/* Count the successful fetch against appropriate rel, if any */
if (stats_relation != NULL)
pgstat_count_heap_fetch(stats_relation);
return true; return true;
} }
...@@ -5097,7 +5092,7 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid, ...@@ -5097,7 +5092,7 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
block = ItemPointerGetBlockNumber(&tupid); block = ItemPointerGetBlockNumber(&tupid);
ItemPointerCopy(&tupid, &(mytup.t_self)); ItemPointerCopy(&tupid, &(mytup.t_self));
if (!heap_fetch(rel, SnapshotAny, &mytup, &buf, NULL)) if (!heap_fetch(rel, SnapshotAny, &mytup, &buf))
{ {
/* /*
* if we fail to find the updated version of the tuple, it's * if we fail to find the updated version of the tuple, it's
......
...@@ -148,6 +148,30 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, ...@@ -148,6 +148,30 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
* ------------------------------------------------------------------------ * ------------------------------------------------------------------------
*/ */
static bool
heapam_fetch_row_version(Relation relation,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
Buffer buffer;
Assert(TTS_IS_BUFFERTUPLE(slot));
bslot->base.tupdata.t_self = *tid;
if (heap_fetch(relation, snapshot, &bslot->base.tupdata, &buffer))
{
/* store in slot, transferring existing pin */
ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata, slot, buffer);
slot->tts_tableOid = RelationGetRelid(relation);
return true;
}
return false;
}
static bool static bool
heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
Snapshot snapshot) Snapshot snapshot)
...@@ -338,7 +362,7 @@ tuple_lock_retry: ...@@ -338,7 +362,7 @@ tuple_lock_retry:
errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
tuple->t_self = *tid; tuple->t_self = *tid;
if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, NULL)) if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer))
{ {
/* /*
* If xmin isn't what we're expecting, the slot must have * If xmin isn't what we're expecting, the slot must have
...@@ -517,6 +541,7 @@ static const TableAmRoutine heapam_methods = { ...@@ -517,6 +541,7 @@ static const TableAmRoutine heapam_methods = {
.tuple_update = heapam_tuple_update, .tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock, .tuple_lock = heapam_tuple_lock,
.tuple_fetch_row_version = heapam_fetch_row_version,
.tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot, .tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
}; };
......
...@@ -62,6 +62,7 @@ GetTableAmRoutine(Oid amhandler) ...@@ -62,6 +62,7 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->index_fetch_end != NULL); Assert(routine->index_fetch_end != NULL);
Assert(routine->index_fetch_tuple != NULL); Assert(routine->index_fetch_tuple != NULL);
Assert(routine->tuple_fetch_row_version != NULL);
Assert(routine->tuple_satisfies_snapshot != NULL); Assert(routine->tuple_satisfies_snapshot != NULL);
Assert(routine->tuple_insert != NULL); Assert(routine->tuple_insert != NULL);
......
...@@ -14,10 +14,11 @@ ...@@ -14,10 +14,11 @@
#include "postgres.h" #include "postgres.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/sysattr.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/relation.h"
#include "access/sysattr.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
...@@ -3379,42 +3380,12 @@ GetTupleForTrigger(EState *estate, ...@@ -3379,42 +3380,12 @@ GetTupleForTrigger(EState *estate,
} }
else else
{ {
Page page;
ItemId lp;
Buffer buffer;
BufferHeapTupleTableSlot *boldslot;
HeapTuple tuple;
Assert(TTS_IS_BUFFERTUPLE(oldslot));
ExecClearTuple(oldslot);
boldslot = (BufferHeapTupleTableSlot *) oldslot;
tuple = &boldslot->base.tupdata;
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
/* /*
* Although we already know this tuple is valid, we must lock the * We expect the tuple to be present, thus very simple error handling
* buffer to ensure that no one has a buffer cleanup lock; otherwise * suffices.
* they might move the tuple while we try to copy it. But we can
* release the lock before actually doing the heap_copytuple call,
* since holding pin is sufficient to prevent anyone from getting a
* cleanup lock they don't already hold.
*/ */
LockBuffer(buffer, BUFFER_LOCK_SHARE); if (!table_fetch_row_version(relation, tid, SnapshotAny, oldslot))
elog(ERROR, "failed to fetch tuple for trigger");
page = BufferGetPage(buffer);
lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
Assert(ItemIdIsNormal(lp));
tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
tuple->t_len = ItemIdGetLength(lp);
tuple->t_self = *tid;
tuple->t_tableOid = RelationGetRelid(relation);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ExecStorePinnedBufferHeapTuple(tuple, oldslot, buffer);
} }
return true; return true;
...@@ -4193,8 +4164,6 @@ AfterTriggerExecute(EState *estate, ...@@ -4193,8 +4164,6 @@ AfterTriggerExecute(EState *estate,
AfterTriggerShared evtshared = GetTriggerSharedData(event); AfterTriggerShared evtshared = GetTriggerSharedData(event);
Oid tgoid = evtshared->ats_tgoid; Oid tgoid = evtshared->ats_tgoid;
TriggerData LocTriggerData; TriggerData LocTriggerData;
HeapTupleData tuple1;
HeapTupleData tuple2;
HeapTuple rettuple; HeapTuple rettuple;
int tgindx; int tgindx;
bool should_free_trig = false; bool should_free_trig = false;
...@@ -4271,19 +4240,12 @@ AfterTriggerExecute(EState *estate, ...@@ -4271,19 +4240,12 @@ AfterTriggerExecute(EState *estate,
default: default:
if (ItemPointerIsValid(&(event->ate_ctid1))) if (ItemPointerIsValid(&(event->ate_ctid1)))
{ {
Buffer buffer;
LocTriggerData.tg_trigslot = ExecGetTriggerOldSlot(estate, relInfo); LocTriggerData.tg_trigslot = ExecGetTriggerOldSlot(estate, relInfo);
ItemPointerCopy(&(event->ate_ctid1), &(tuple1.t_self)); if (!table_fetch_row_version(rel, &(event->ate_ctid1), SnapshotAny, LocTriggerData.tg_trigslot))
if (!heap_fetch(rel, SnapshotAny, &tuple1, &buffer, NULL))
elog(ERROR, "failed to fetch tuple1 for AFTER trigger"); elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
ExecStorePinnedBufferHeapTuple(&tuple1,
LocTriggerData.tg_trigslot,
buffer);
LocTriggerData.tg_trigtuple = LocTriggerData.tg_trigtuple =
ExecFetchSlotHeapTuple(LocTriggerData.tg_trigslot, false, ExecFetchSlotHeapTuple(LocTriggerData.tg_trigslot, false, &should_free_trig);
&should_free_trig);
} }
else else
{ {
...@@ -4295,19 +4257,12 @@ AfterTriggerExecute(EState *estate, ...@@ -4295,19 +4257,12 @@ AfterTriggerExecute(EState *estate,
AFTER_TRIGGER_2CTID && AFTER_TRIGGER_2CTID &&
ItemPointerIsValid(&(event->ate_ctid2))) ItemPointerIsValid(&(event->ate_ctid2)))
{ {
Buffer buffer;
LocTriggerData.tg_newslot = ExecGetTriggerNewSlot(estate, relInfo); LocTriggerData.tg_newslot = ExecGetTriggerNewSlot(estate, relInfo);
ItemPointerCopy(&(event->ate_ctid2), &(tuple2.t_self)); if (!table_fetch_row_version(rel, &(event->ate_ctid2), SnapshotAny, LocTriggerData.tg_newslot))
if (!heap_fetch(rel, SnapshotAny, &tuple2, &buffer, NULL))
elog(ERROR, "failed to fetch tuple2 for AFTER trigger"); elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
ExecStorePinnedBufferHeapTuple(&tuple2,
LocTriggerData.tg_newslot,
buffer);
LocTriggerData.tg_newtuple = LocTriggerData.tg_newtuple =
ExecFetchSlotHeapTuple(LocTriggerData.tg_newslot, false, ExecFetchSlotHeapTuple(LocTriggerData.tg_newslot, false, &should_free_new);
&should_free_new);
} }
else else
{ {
......
...@@ -2649,17 +2649,10 @@ EvalPlanQualFetchRowMarks(EPQState *epqstate) ...@@ -2649,17 +2649,10 @@ EvalPlanQualFetchRowMarks(EPQState *epqstate)
else else
{ {
/* ordinary table, fetch the tuple */ /* ordinary table, fetch the tuple */
HeapTupleData tuple; if (!table_fetch_row_version(erm->relation,
Buffer buffer; (ItemPointer) DatumGetPointer(datum),
SnapshotAny, slot))
tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
NULL))
elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck"); elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
/* successful, store tuple */
ExecStorePinnedBufferHeapTuple(&tuple, slot, buffer);
ExecMaterializeSlot(slot);
} }
} }
else else
......
...@@ -229,17 +229,13 @@ ExecCheckTIDVisible(EState *estate, ...@@ -229,17 +229,13 @@ ExecCheckTIDVisible(EState *estate,
TupleTableSlot *tempSlot) TupleTableSlot *tempSlot)
{ {
Relation rel = relinfo->ri_RelationDesc; Relation rel = relinfo->ri_RelationDesc;
Buffer buffer;
HeapTupleData tuple;
/* Redundantly check isolation level */ /* Redundantly check isolation level */
if (!IsolationUsesXactSnapshot()) if (!IsolationUsesXactSnapshot())
return; return;
tuple.t_self = *tid; if (!table_fetch_row_version(rel, tid, SnapshotAny, tempSlot))
if (!heap_fetch(rel, SnapshotAny, &tuple, &buffer, NULL))
elog(ERROR, "failed to fetch conflicting tuple for ON CONFLICT"); elog(ERROR, "failed to fetch conflicting tuple for ON CONFLICT");
ExecStorePinnedBufferHeapTuple(&tuple, tempSlot, buffer);
ExecCheckTupleVisible(estate, rel, tempSlot); ExecCheckTupleVisible(estate, rel, tempSlot);
ExecClearTuple(tempSlot); ExecClearTuple(tempSlot);
} }
...@@ -874,21 +870,9 @@ ldelete:; ...@@ -874,21 +870,9 @@ ldelete:;
} }
else else
{ {
BufferHeapTupleTableSlot *bslot; if (!table_fetch_row_version(resultRelationDesc, tupleid,
HeapTuple deltuple; SnapshotAny, slot))
Buffer buffer;
Assert(TTS_IS_BUFFERTUPLE(slot));
ExecClearTuple(slot);
bslot = (BufferHeapTupleTableSlot *) slot;
deltuple = &bslot->base.tupdata;
deltuple->t_self = *tupleid;
if (!heap_fetch(resultRelationDesc, SnapshotAny,
deltuple, &buffer, NULL))
elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING"); elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING");
ExecStorePinnedBufferHeapTuple(deltuple, slot, buffer);
} }
} }
......
...@@ -310,7 +310,6 @@ TidNext(TidScanState *node) ...@@ -310,7 +310,6 @@ TidNext(TidScanState *node)
Relation heapRelation; Relation heapRelation;
HeapTuple tuple; HeapTuple tuple;
TupleTableSlot *slot; TupleTableSlot *slot;
Buffer buffer = InvalidBuffer;
ItemPointerData *tidList; ItemPointerData *tidList;
int numTids; int numTids;
bool bBackward; bool bBackward;
...@@ -376,19 +375,10 @@ TidNext(TidScanState *node) ...@@ -376,19 +375,10 @@ TidNext(TidScanState *node)
if (node->tss_isCurrentOf) if (node->tss_isCurrentOf)
heap_get_latest_tid(heapRelation, snapshot, &tuple->t_self); heap_get_latest_tid(heapRelation, snapshot, &tuple->t_self);
if (heap_fetch(heapRelation, snapshot, tuple, &buffer, NULL)) if (table_fetch_row_version(heapRelation, &tuple->t_self, snapshot,
{ slot))
/*
* Store the scanned tuple in the scan tuple slot of the scan
* state, transferring the pin to the slot.
*/
ExecStorePinnedBufferHeapTuple(tuple, /* tuple to store */
slot, /* slot to store in */
buffer); /* buffer associated with
* tuple */
return slot; return slot;
}
/* Bad TID or failed snapshot qual; try next */ /* Bad TID or failed snapshot qual; try next */
if (bBackward) if (bBackward)
node->tss_TidPtr--; node->tss_TidPtr--;
......
...@@ -128,7 +128,7 @@ extern bool heap_getnextslot(TableScanDesc sscan, ...@@ -128,7 +128,7 @@ extern bool heap_getnextslot(TableScanDesc sscan,
ScanDirection direction, struct TupleTableSlot *slot); ScanDirection direction, struct TupleTableSlot *slot);
extern bool heap_fetch(Relation relation, Snapshot snapshot, extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, Relation stats_relation); HeapTuple tuple, Buffer *userbuf);
extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation, extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
Buffer buffer, Snapshot snapshot, HeapTuple heapTuple, Buffer buffer, Snapshot snapshot, HeapTuple heapTuple,
bool *all_dead, bool first_call); bool *all_dead, bool first_call);
......
...@@ -271,6 +271,17 @@ typedef struct TableAmRoutine ...@@ -271,6 +271,17 @@ typedef struct TableAmRoutine
* ------------------------------------------------------------------------ * ------------------------------------------------------------------------
*/ */
/*
* Fetch tuple at `tid` into `slot, after doing a visibility test
* according to `snapshot`. If a tuple was found and passed the visibility
* test, returns true, false otherwise.
*/
bool (*tuple_fetch_row_version) (Relation rel,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot);
/* /*
* Does the tuple in `slot` satisfy `snapshot`? The slot needs to be of * Does the tuple in `slot` satisfy `snapshot`? The slot needs to be of
* the appropriate type for the AM. * the appropriate type for the AM.
...@@ -574,9 +585,9 @@ table_index_fetch_end(struct IndexFetchTableData *scan) ...@@ -574,9 +585,9 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
} }
/* /*
* Fetches tuple at `tid` into `slot`, after doing a visibility test according * Fetches, as part of an index scan, tuple at `tid` into `slot`, after doing
* to `snapshot`. If a tuple was found and passed the visibility test, returns * a visibility test according to `snapshot`. If a tuple was found and passed
* true, false otherwise. * the visibility test, returns true, false otherwise.
* *
* *call_again needs to be false on the first call to table_index_fetch_tuple() for * *call_again needs to be false on the first call to table_index_fetch_tuple() for
* a tid. If there potentially is another tuple matching the tid, *call_again * a tid. If there potentially is another tuple matching the tid, *call_again
...@@ -586,6 +597,13 @@ table_index_fetch_end(struct IndexFetchTableData *scan) ...@@ -586,6 +597,13 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
* *all_dead will be set to true by table_index_fetch_tuple() iff it is guaranteed * *all_dead will be set to true by table_index_fetch_tuple() iff it is guaranteed
* that no backend needs to see that tuple. Index AMs can use that do avoid * that no backend needs to see that tuple. Index AMs can use that do avoid
* returning that tid in future searches. * returning that tid in future searches.
*
* The difference between this function and table_fetch_row_version is that
* this function returns the currently visible version of a row if the AM
* supports storing multiple row versions reachable via a single index entry
* (like heap's HOT). Whereas table_fetch_row_version only evaluates the the
* tuple exactly at `tid`. Outside of index entry ->table tuple lookups,
* table_fetch_row_version is what's usually needed.
*/ */
static inline bool static inline bool
table_index_fetch_tuple(struct IndexFetchTableData *scan, table_index_fetch_tuple(struct IndexFetchTableData *scan,
...@@ -606,6 +624,25 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan, ...@@ -606,6 +624,25 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
* ------------------------------------------------------------------------ * ------------------------------------------------------------------------
*/ */
/*
* Fetch tuple at `tid` into `slot, after doing a visibility test according to
* `snapshot`. If a tuple was found and passed the visibility test, returns
* true, false otherwise.
*
* See table_index_fetch_tuple's comment about what the difference between
* these functions is. This function is the correct to use outside of
* index entry->table tuple lookups.
*/
static inline bool
table_fetch_row_version(Relation rel,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot)
{
return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
}
/* /*
* Return true iff tuple in slot satisfies the snapshot. * Return true iff tuple in slot satisfies the snapshot.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment