Commit 6f38d4da authored by Thomas Munro's avatar Thomas Munro

Remove dependency on HeapTuple from predicate locking functions.

The following changes make the predicate locking functions more
generic and suitable for use by future access methods:

- PredicateLockTuple() is renamed to PredicateLockTID().  It takes
  ItemPointer and inserting transaction ID instead of HeapTuple.

- CheckForSerializableConflictIn() takes blocknum instead of buffer.

- CheckForSerializableConflictOut() no longer takes HeapTuple or buffer.

Author: Ashwin Agrawal
Reviewed-by: Andres Freund, Kuntal Ghosh, Thomas Munro
Discussion: https://postgr.es/m/CALfoeiv0k3hkEb3Oqk%3DziWqtyk2Jys1UOK5hwRBNeANT_yX%2Bng%40mail.gmail.com
parent 4589c6a2
...@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode, ...@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode,
stack->predictNumber = 1; stack->predictNumber = 1;
if (rootConflictCheck) if (rootConflictCheck)
CheckForSerializableConflictIn(btree->index, NULL, stack->buffer); CheckForSerializableConflictIn(btree->index, NULL, btree->rootBlkno);
for (;;) for (;;)
{ {
......
...@@ -246,7 +246,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) ...@@ -246,7 +246,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
* tree, so it conflicts with all serializable scans. All scans acquire a * tree, so it conflicts with all serializable scans. All scans acquire a
* predicate lock on the metabuffer to represent that. * predicate lock on the metabuffer to represent that.
*/ */
CheckForSerializableConflictIn(index, NULL, metabuffer); CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize) if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
{ {
......
...@@ -216,7 +216,8 @@ ginEntryInsert(GinState *ginstate, ...@@ -216,7 +216,8 @@ ginEntryInsert(GinState *ginstate,
return; return;
} }
CheckForSerializableConflictIn(ginstate->index, NULL, stack->buffer); CheckForSerializableConflictIn(ginstate->index, NULL,
BufferGetBlockNumber(stack->buffer));
/* modify an existing leaf entry */ /* modify an existing leaf entry */
itup = addItemPointersToLeafTuple(ginstate, itup, itup = addItemPointersToLeafTuple(ginstate, itup,
items, nitem, buildStats, stack->buffer); items, nitem, buildStats, stack->buffer);
...@@ -225,7 +226,8 @@ ginEntryInsert(GinState *ginstate, ...@@ -225,7 +226,8 @@ ginEntryInsert(GinState *ginstate,
} }
else else
{ {
CheckForSerializableConflictIn(ginstate->index, NULL, stack->buffer); CheckForSerializableConflictIn(ginstate->index, NULL,
BufferGetBlockNumber(stack->buffer));
/* no match, so construct a new leaf entry */ /* no match, so construct a new leaf entry */
itup = buildFreshLeafTuple(ginstate, attnum, key, category, itup = buildFreshLeafTuple(ginstate, attnum, key, category,
items, nitem, buildStats, stack->buffer); items, nitem, buildStats, stack->buffer);
......
...@@ -1264,7 +1264,7 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, ...@@ -1264,7 +1264,7 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
* Check for any rw conflicts (in serializable isolation level) just * Check for any rw conflicts (in serializable isolation level) just
* before we intend to modify the page * before we intend to modify the page
*/ */
CheckForSerializableConflictIn(state->r, NULL, stack->buffer); CheckForSerializableConflictIn(state->r, NULL, BufferGetBlockNumber(stack->buffer));
/* Insert the tuple(s) to the page, splitting the page if necessary */ /* Insert the tuple(s) to the page, splitting the page if necessary */
is_split = gistplacetopage(state->r, state->freespace, giststate, is_split = gistplacetopage(state->r, state->freespace, giststate,
......
...@@ -88,7 +88,7 @@ restart_insert: ...@@ -88,7 +88,7 @@ restart_insert:
&usedmetap); &usedmetap);
Assert(usedmetap != NULL); Assert(usedmetap != NULL);
CheckForSerializableConflictIn(rel, NULL, buf); CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(buf));
/* remember the primary bucket buffer to release the pin on it at end. */ /* remember the primary bucket buffer to release the pin on it at end. */
bucket_buf = buf; bucket_buf = buf;
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "access/multixact.h" #include "access/multixact.h"
#include "access/parallel.h" #include "access/parallel.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/subtrans.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/tableam.h" #include "access/tableam.h"
#include "access/transam.h" #include "access/transam.h"
...@@ -446,7 +447,7 @@ heapgetpage(TableScanDesc sscan, BlockNumber page) ...@@ -446,7 +447,7 @@ heapgetpage(TableScanDesc sscan, BlockNumber page)
else else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
&loctup, buffer, snapshot); &loctup, buffer, snapshot);
if (valid) if (valid)
...@@ -668,7 +669,7 @@ heapgettup(HeapScanDesc scan, ...@@ -668,7 +669,7 @@ heapgettup(HeapScanDesc scan,
snapshot, snapshot,
scan->rs_cbuf); scan->rs_cbuf);
CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
tuple, scan->rs_cbuf, tuple, scan->rs_cbuf,
snapshot); snapshot);
...@@ -1477,9 +1478,10 @@ heap_fetch(Relation relation, ...@@ -1477,9 +1478,10 @@ heap_fetch(Relation relation,
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
if (valid) if (valid)
PredicateLockTuple(relation, tuple, snapshot); PredicateLockTID(relation, &(tuple->t_self), snapshot,
HeapTupleHeaderGetXmin(tuple->t_data));
CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot); HeapCheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
...@@ -1610,13 +1612,14 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1610,13 +1612,14 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
{ {
/* If it's visible per the snapshot, we must return it */ /* If it's visible per the snapshot, we must return it */
valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, heapTuple, HeapCheckForSerializableConflictOut(valid, relation, heapTuple,
buffer, snapshot); buffer, snapshot);
if (valid) if (valid)
{ {
ItemPointerSetOffsetNumber(tid, offnum); ItemPointerSetOffsetNumber(tid, offnum);
PredicateLockTuple(relation, heapTuple, snapshot); PredicateLockTID(relation, &heapTuple->t_self, snapshot,
HeapTupleHeaderGetXmin(heapTuple->t_data));
if (all_dead) if (all_dead)
*all_dead = false; *all_dead = false;
return true; return true;
...@@ -1750,7 +1753,7 @@ heap_get_latest_tid(TableScanDesc sscan, ...@@ -1750,7 +1753,7 @@ heap_get_latest_tid(TableScanDesc sscan,
* candidate. * candidate.
*/ */
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot); HeapCheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
if (valid) if (valid)
*tid = ctid; *tid = ctid;
...@@ -1905,7 +1908,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, ...@@ -1905,7 +1908,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
* lock "gaps" as index page locks do. So we don't need to specify a * lock "gaps" as index page locks do. So we don't need to specify a
* buffer when making the call, which makes for a faster check. * buffer when making the call, which makes for a faster check.
*/ */
CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); CheckForSerializableConflictIn(relation, NULL, InvalidBlockNumber);
/* NO EREPORT(ERROR) from here till changes are logged */ /* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION(); START_CRIT_SECTION();
...@@ -2159,7 +2162,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, ...@@ -2159,7 +2162,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
* lock "gaps" as index page locks do. So we don't need to specify a * lock "gaps" as index page locks do. So we don't need to specify a
* buffer when making the call, which makes for a faster check. * buffer when making the call, which makes for a faster check.
*/ */
CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); CheckForSerializableConflictIn(relation, NULL, InvalidBlockNumber);
ndone = 0; ndone = 0;
while (ndone < ntuples) while (ndone < ntuples)
...@@ -2350,7 +2353,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, ...@@ -2350,7 +2353,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
* lock "gaps" as index page locks do. So we don't need to specify a * lock "gaps" as index page locks do. So we don't need to specify a
* buffer when making the call. * buffer when making the call.
*/ */
CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); CheckForSerializableConflictIn(relation, NULL, InvalidBlockNumber);
/* /*
* If tuples are cachable, mark them for invalidation from the caches in * If tuples are cachable, mark them for invalidation from the caches in
...@@ -2664,7 +2667,7 @@ l1: ...@@ -2664,7 +2667,7 @@ l1:
* being visible to the scan (i.e., an exclusive buffer content lock is * being visible to the scan (i.e., an exclusive buffer content lock is
* continuously held from this point until the tuple delete is visible). * continuously held from this point until the tuple delete is visible).
*/ */
CheckForSerializableConflictIn(relation, &tp, buffer); CheckForSerializableConflictIn(relation, tid, BufferGetBlockNumber(buffer));
/* replace cid with a combo cid if necessary */ /* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
...@@ -3580,7 +3583,7 @@ l2: ...@@ -3580,7 +3583,7 @@ l2:
* will include checking the relation level, there is no benefit to a * will include checking the relation level, there is no benefit to a
* separate check for the new tuple. * separate check for the new tuple.
*/ */
CheckForSerializableConflictIn(relation, &oldtup, buffer); CheckForSerializableConflictIn(relation, otid, BufferGetBlockNumber(buffer));
/* /*
* At this point newbuf and buffer are both pinned and locked, and newbuf * At this point newbuf and buffer are both pinned and locked, and newbuf
...@@ -9043,3 +9046,93 @@ heap_mask(char *pagedata, BlockNumber blkno) ...@@ -9043,3 +9046,93 @@ heap_mask(char *pagedata, BlockNumber blkno)
} }
} }
} }
/*
* HeapCheckForSerializableConflictOut
* We are reading a tuple which has been modified. If it is visible to
* us but has been deleted, that indicates a rw-conflict out. If it's
* not visible and was created by a concurrent (overlapping)
* serializable transaction, that is also a rw-conflict out,
*
* We will determine the top level xid of the writing transaction with which
* we may be in conflict, and check for overlap with our own transaction.
* If the transactions overlap (i.e., they cannot see each other's writes),
* then we have a conflict out.
*
* This function should be called just about anywhere in heapam.c where a
* tuple has been read. The caller must hold at least a shared lock on the
* buffer, because this function might set hint bits on the tuple. There is
* currently no known reason to call this function from an index AM.
*/
void
HeapCheckForSerializableConflictOut(bool visible, Relation relation,
HeapTuple tuple, Buffer buffer,
Snapshot snapshot)
{
TransactionId xid;
HTSV_Result htsvResult;
if (!CheckForSerializableConflictOutNeeded(relation, snapshot))
return;
/*
* Check to see whether the tuple has been written to by a concurrent
* transaction, either to create it not visible to us, or to delete it
* while it is visible to us. The "visible" bool indicates whether the
* tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
* is going on with it.
*/
htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer);
switch (htsvResult)
{
case HEAPTUPLE_LIVE:
if (visible)
return;
xid = HeapTupleHeaderGetXmin(tuple->t_data);
break;
case HEAPTUPLE_RECENTLY_DEAD:
if (!visible)
return;
xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
xid = HeapTupleHeaderGetXmin(tuple->t_data);
break;
case HEAPTUPLE_DEAD:
return;
default:
/*
* The only way to get to this default clause is if a new value is
* added to the enum type without adding it to this switch
* statement. That's a bug, so elog.
*/
elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
/*
* In spite of having all enum values covered and calling elog on
* this default, some compilers think this is a code path which
* allows xid to be used below without initialization. Silence
* that warning.
*/
xid = InvalidTransactionId;
}
Assert(TransactionIdIsValid(xid));
Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
/*
* Find top level xid. Bail out if xid is too early to be a conflict, or
* if it's our own xid.
*/
if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
return;
xid = SubTransGetTopmostTransaction(xid);
if (TransactionIdPrecedes(xid, TransactionXmin))
return;
return CheckForSerializableConflictOut(relation, xid, snapshot);
}
...@@ -2171,9 +2171,10 @@ heapam_scan_bitmap_next_block(TableScanDesc scan, ...@@ -2171,9 +2171,10 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
if (valid) if (valid)
{ {
hscan->rs_vistuples[ntup++] = offnum; hscan->rs_vistuples[ntup++] = offnum;
PredicateLockTuple(scan->rs_rd, &loctup, snapshot); PredicateLockTID(scan->rs_rd, &loctup.t_self, snapshot,
HeapTupleHeaderGetXmin(loctup.t_data));
} }
CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, HeapCheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
buffer, snapshot); buffer, snapshot);
} }
} }
...@@ -2361,7 +2362,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, ...@@ -2361,7 +2362,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
/* in pagemode, heapgetpage did this for us */ /* in pagemode, heapgetpage did this for us */
if (!pagemode) if (!pagemode)
CheckForSerializableConflictOut(visible, scan->rs_rd, tuple, HeapCheckForSerializableConflictOut(visible, scan->rs_rd, tuple,
hscan->rs_cbuf, scan->rs_snapshot); hscan->rs_cbuf, scan->rs_snapshot);
/* Try next tuple from same page. */ /* Try next tuple from same page. */
......
...@@ -180,8 +180,8 @@ index_insert(Relation indexRelation, ...@@ -180,8 +180,8 @@ index_insert(Relation indexRelation,
if (!(indexRelation->rd_indam->ampredlocks)) if (!(indexRelation->rd_indam->ampredlocks))
CheckForSerializableConflictIn(indexRelation, CheckForSerializableConflictIn(indexRelation,
(HeapTuple) NULL, (ItemPointer) NULL,
InvalidBuffer); InvalidBlockNumber);
return indexRelation->rd_indam->aminsert(indexRelation, values, isnull, return indexRelation->rd_indam->aminsert(indexRelation, values, isnull,
heap_t_ctid, heapRelation, heap_t_ctid, heapRelation,
......
...@@ -285,7 +285,7 @@ top: ...@@ -285,7 +285,7 @@ top:
* checkingunique and !heapkeyspace cases, but it's okay to use the * checkingunique and !heapkeyspace cases, but it's okay to use the
* first page the value could be on (with scantid omitted) instead. * first page the value could be on (with scantid omitted) instead.
*/ */
CheckForSerializableConflictIn(rel, NULL, insertstate.buf); CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(insertstate.buf));
/* /*
* Do the insertion. Note that insertstate contains cached binary * Do the insertion. Note that insertstate contains cached binary
...@@ -528,7 +528,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel, ...@@ -528,7 +528,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
* otherwise be masked by this unique constraint * otherwise be masked by this unique constraint
* violation. * violation.
*/ */
CheckForSerializableConflictIn(rel, NULL, insertstate->buf); CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(insertstate->buf));
/* /*
* This is a definite conflict. Break the tuple down into * This is a definite conflict. Break the tuple down into
......
...@@ -163,8 +163,8 @@ ...@@ -163,8 +163,8 @@
* PredicateLockRelation(Relation relation, Snapshot snapshot) * PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno, * PredicateLockPage(Relation relation, BlockNumber blkno,
* Snapshot snapshot) * Snapshot snapshot)
* PredicateLockTuple(Relation relation, HeapTuple tuple, * PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot,
* Snapshot snapshot) * TransactionId insert_xid)
* PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
* BlockNumber newblkno) * BlockNumber newblkno)
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
...@@ -173,11 +173,10 @@ ...@@ -173,11 +173,10 @@
* ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* *
* conflict detection (may also trigger rollback) * conflict detection (may also trigger rollback)
* CheckForSerializableConflictOut(bool visible, Relation relation, * CheckForSerializableConflictOut(Relation relation, TransactionId xid,
* HeapTupleData *tup, Buffer buffer,
* Snapshot snapshot) * Snapshot snapshot)
* CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup, * CheckForSerializableConflictIn(Relation relation, ItemPointer tid,
* Buffer buffer) * BlockNumber blkno)
* CheckTableForSerializableConflictIn(Relation relation) * CheckTableForSerializableConflictIn(Relation relation)
* *
* final rollback checking * final rollback checking
...@@ -193,8 +192,6 @@ ...@@ -193,8 +192,6 @@
#include "postgres.h" #include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/parallel.h" #include "access/parallel.h"
#include "access/slru.h" #include "access/slru.h"
#include "access/subtrans.h" #include "access/subtrans.h"
...@@ -2538,28 +2535,28 @@ PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot) ...@@ -2538,28 +2535,28 @@ PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
} }
/* /*
* PredicateLockTuple * PredicateLockTID
* *
* Gets a predicate lock at the tuple level. * Gets a predicate lock at the tuple level.
* Skip if not in full serializable transaction isolation level. * Skip if not in full serializable transaction isolation level.
* Skip if this is a temporary table. * Skip if this is a temporary table.
*/ */
void void
PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot) PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot,
TransactionId tuple_xid)
{ {
PREDICATELOCKTARGETTAG tag; PREDICATELOCKTARGETTAG tag;
ItemPointer tid;
if (!SerializationNeededForRead(relation, snapshot)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
/* /*
* If it's a heap tuple, return if this xact wrote it. * Return if this xact wrote it.
*/ */
if (relation->rd_index == NULL) if (relation->rd_index == NULL)
{ {
/* If we wrote it; we already have a write lock. */ /* If we wrote it; we already have a write lock. */
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) if (TransactionIdIsCurrentTransactionId(tuple_xid))
return; return;
} }
...@@ -2575,7 +2572,6 @@ PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot) ...@@ -2575,7 +2572,6 @@ PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
if (PredicateLockExists(&tag)) if (PredicateLockExists(&tag))
return; return;
tid = &(tuple->t_self);
SET_PREDICATELOCKTARGETTAG_TUPLE(tag, SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
...@@ -4020,33 +4016,41 @@ XidIsConcurrent(TransactionId xid) ...@@ -4020,33 +4016,41 @@ XidIsConcurrent(TransactionId xid)
return false; return false;
} }
bool
CheckForSerializableConflictOutNeeded(Relation relation, Snapshot snapshot)
{
if (!SerializationNeededForRead(relation, snapshot))
return false;
/* Check if someone else has already decided that we need to die */
if (SxactIsDoomed(MySerializableXact))
{
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to read/write dependencies among transactions"),
errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
errhint("The transaction might succeed if retried.")));
}
return true;
}
/* /*
* CheckForSerializableConflictOut * CheckForSerializableConflictOut
* We are reading a tuple which has been modified. If it is visible to * A table AM is reading a tuple that has been modified. After determining
* us but has been deleted, that indicates a rw-conflict out. If it's * that it is visible to us, it should call this function with the top
* not visible and was created by a concurrent (overlapping) * level xid of the writing transaction.
* serializable transaction, that is also a rw-conflict out,
*
* We will determine the top level xid of the writing transaction with which
* we may be in conflict, and check for overlap with our own transaction.
* If the transactions overlap (i.e., they cannot see each other's writes),
* then we have a conflict out.
* *
* This function should be called just about anywhere in heapam.c where a * This function will check for overlap with our own transaction. If the
* tuple has been read. The caller must hold at least a shared lock on the * transactions overlap (i.e., they cannot see each other's writes), then we
* buffer, because this function might set hint bits on the tuple. There is * have a conflict out.
* currently no known reason to call this function from an index AM.
*/ */
void void
CheckForSerializableConflictOut(bool visible, Relation relation, CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot snapshot)
HeapTuple tuple, Buffer buffer,
Snapshot snapshot)
{ {
TransactionId xid;
SERIALIZABLEXIDTAG sxidtag; SERIALIZABLEXIDTAG sxidtag;
SERIALIZABLEXID *sxid; SERIALIZABLEXID *sxid;
SERIALIZABLEXACT *sxact; SERIALIZABLEXACT *sxact;
HTSV_Result htsvResult;
if (!SerializationNeededForRead(relation, snapshot)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
...@@ -4060,64 +4064,8 @@ CheckForSerializableConflictOut(bool visible, Relation relation, ...@@ -4060,64 +4064,8 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."), errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
errhint("The transaction might succeed if retried."))); errhint("The transaction might succeed if retried.")));
} }
/*
* Check to see whether the tuple has been written to by a concurrent
* transaction, either to create it not visible to us, or to delete it
* while it is visible to us. The "visible" bool indicates whether the
* tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
* is going on with it.
*/
htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer);
switch (htsvResult)
{
case HEAPTUPLE_LIVE:
if (visible)
return;
xid = HeapTupleHeaderGetXmin(tuple->t_data);
break;
case HEAPTUPLE_RECENTLY_DEAD:
if (!visible)
return;
xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
xid = HeapTupleHeaderGetXmin(tuple->t_data);
break;
case HEAPTUPLE_DEAD:
return;
default:
/*
* The only way to get to this default clause is if a new value is
* added to the enum type without adding it to this switch
* statement. That's a bug, so elog.
*/
elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
/*
* In spite of having all enum values covered and calling elog on
* this default, some compilers think this is a code path which
* allows xid to be used below without initialization. Silence
* that warning.
*/
xid = InvalidTransactionId;
}
Assert(TransactionIdIsValid(xid)); Assert(TransactionIdIsValid(xid));
Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
/*
* Find top level xid. Bail out if xid is too early to be a conflict, or
* if it's our own xid.
*/
if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
return;
xid = SubTransGetTopmostTransaction(xid);
if (TransactionIdPrecedes(xid, TransactionXmin))
return;
if (TransactionIdEquals(xid, GetTopTransactionIdIfAny())) if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
return; return;
...@@ -4423,8 +4371,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) ...@@ -4423,8 +4371,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
* tuple itself. * tuple itself.
*/ */
void void
CheckForSerializableConflictIn(Relation relation, HeapTuple tuple, CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
Buffer buffer)
{ {
PREDICATELOCKTARGETTAG targettag; PREDICATELOCKTARGETTAG targettag;
...@@ -4454,22 +4401,22 @@ CheckForSerializableConflictIn(Relation relation, HeapTuple tuple, ...@@ -4454,22 +4401,22 @@ CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
* It is not possible to take and hold a lock across the checks for all * It is not possible to take and hold a lock across the checks for all
* granularities because each target could be in a separate partition. * granularities because each target could be in a separate partition.
*/ */
if (tuple != NULL) if (tid != NULL)
{ {
SET_PREDICATELOCKTARGETTAG_TUPLE(targettag, SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
ItemPointerGetBlockNumber(&(tuple->t_self)), ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(&(tuple->t_self))); ItemPointerGetOffsetNumber(tid));
CheckTargetForConflictsIn(&targettag); CheckTargetForConflictsIn(&targettag);
} }
if (BufferIsValid(buffer)) if (blkno != InvalidBlockNumber)
{ {
SET_PREDICATELOCKTARGETTAG_PAGE(targettag, SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
BufferGetBlockNumber(buffer)); blkno);
CheckTargetForConflictsIn(&targettag); CheckTargetForConflictsIn(&targettag);
} }
......
...@@ -220,5 +220,7 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data, ...@@ -220,5 +220,7 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
HeapTuple htup, HeapTuple htup,
Buffer buffer, Buffer buffer,
CommandId *cmin, CommandId *cmax); CommandId *cmin, CommandId *cmax);
extern void HeapCheckForSerializableConflictOut(bool valid, Relation relation, HeapTuple tuple,
Buffer buffer, Snapshot snapshot);
#endif /* HEAPAM_H */ #endif /* HEAPAM_H */
...@@ -57,16 +57,17 @@ extern void SetSerializableTransactionSnapshot(Snapshot snapshot, ...@@ -57,16 +57,17 @@ extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
extern void RegisterPredicateLockingXid(TransactionId xid); extern void RegisterPredicateLockingXid(TransactionId xid);
extern void PredicateLockRelation(Relation relation, Snapshot snapshot); extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot); extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot); extern void PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot,
TransactionId insert_xid);
extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void TransferPredicateLocksToHeapRelation(Relation relation); extern void TransferPredicateLocksToHeapRelation(Relation relation);
extern void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe); extern void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe);
/* conflict detection (may also trigger rollback) */ /* conflict detection (may also trigger rollback) */
extern void CheckForSerializableConflictOut(bool valid, Relation relation, HeapTuple tuple, extern bool CheckForSerializableConflictOutNeeded(Relation relation, Snapshot snapshot);
Buffer buffer, Snapshot snapshot); extern void CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot snapshot);
extern void CheckForSerializableConflictIn(Relation relation, HeapTuple tuple, Buffer buffer); extern void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno);
extern void CheckTableForSerializableConflictIn(Relation relation); extern void CheckTableForSerializableConflictIn(Relation relation);
/* final rollback checking */ /* final rollback checking */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment