Commit 0a0e2b52 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Make non-MVCC snapshots exempt from predicate locking. Scans with non-MVCC

snapshots, like in REINDEX, are basically non-transactional operations. The
DDL operation itself might participate in SSI, but there's separate
functions for that.

Kevin Grittner and Dan Ports, with some changes by me.
parent 707195c8
...@@ -274,7 +274,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) ...@@ -274,7 +274,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
else else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer); CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
buffer, snapshot);
if (valid) if (valid)
scan->rs_vistuples[ntup++] = lineoff; scan->rs_vistuples[ntup++] = lineoff;
...@@ -469,7 +470,8 @@ heapgettup(HeapScanDesc scan, ...@@ -469,7 +470,8 @@ heapgettup(HeapScanDesc scan,
snapshot, snapshot,
scan->rs_cbuf); scan->rs_cbuf);
CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf); CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
scan->rs_cbuf, snapshot);
if (valid && key != NULL) if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
...@@ -478,7 +480,7 @@ heapgettup(HeapScanDesc scan, ...@@ -478,7 +480,7 @@ heapgettup(HeapScanDesc scan,
if (valid) if (valid)
{ {
if (!scan->rs_relpredicatelocked) if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple); PredicateLockTuple(scan->rs_rd, tuple, snapshot);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return; return;
} }
...@@ -747,7 +749,7 @@ heapgettup_pagemode(HeapScanDesc scan, ...@@ -747,7 +749,7 @@ heapgettup_pagemode(HeapScanDesc scan,
if (valid) if (valid)
{ {
if (!scan->rs_relpredicatelocked) if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple); PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
scan->rs_cindex = lineindex; scan->rs_cindex = lineindex;
return; return;
} }
...@@ -755,7 +757,7 @@ heapgettup_pagemode(HeapScanDesc scan, ...@@ -755,7 +757,7 @@ heapgettup_pagemode(HeapScanDesc scan,
else else
{ {
if (!scan->rs_relpredicatelocked) if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple); PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
scan->rs_cindex = lineindex; scan->rs_cindex = lineindex;
return; return;
} }
...@@ -1470,9 +1472,9 @@ heap_fetch(Relation relation, ...@@ -1470,9 +1472,9 @@ heap_fetch(Relation relation,
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
if (valid) if (valid)
PredicateLockTuple(relation, tuple); PredicateLockTuple(relation, tuple, snapshot);
CheckForSerializableConflictOut(valid, relation, tuple, buffer); CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
...@@ -1588,11 +1590,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, ...@@ -1588,11 +1590,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
/* If it's visible per the snapshot, we must return it */ /* If it's visible per the snapshot, we must return it */
valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer); CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer,
snapshot);
if (valid) if (valid)
{ {
ItemPointerSetOffsetNumber(tid, offnum); ItemPointerSetOffsetNumber(tid, offnum);
PredicateLockTuple(relation, &heapTuple); PredicateLockTuple(relation, &heapTuple, snapshot);
if (all_dead) if (all_dead)
*all_dead = false; *all_dead = false;
return true; return true;
...@@ -1750,7 +1753,7 @@ heap_get_latest_tid(Relation relation, ...@@ -1750,7 +1753,7 @@ heap_get_latest_tid(Relation relation,
* result candidate. * result candidate.
*/ */
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer); valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &tp, buffer); CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
if (valid) if (valid)
*tid = ctid; *tid = ctid;
......
...@@ -126,7 +126,7 @@ do { \ ...@@ -126,7 +126,7 @@ do { \
} while(0) } while(0)
static IndexScanDesc index_beginscan_internal(Relation indexRelation, static IndexScanDesc index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys); int nkeys, int norderbys, Snapshot snapshot);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -234,7 +234,7 @@ index_beginscan(Relation heapRelation, ...@@ -234,7 +234,7 @@ index_beginscan(Relation heapRelation,
{ {
IndexScanDesc scan; IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, norderbys); scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
/* /*
* Save additional parameters into the scandesc. Everything else was set * Save additional parameters into the scandesc. Everything else was set
...@@ -259,7 +259,7 @@ index_beginscan_bitmap(Relation indexRelation, ...@@ -259,7 +259,7 @@ index_beginscan_bitmap(Relation indexRelation,
{ {
IndexScanDesc scan; IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, 0); scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
/* /*
* Save additional parameters into the scandesc. Everything else was set * Save additional parameters into the scandesc. Everything else was set
...@@ -275,7 +275,7 @@ index_beginscan_bitmap(Relation indexRelation, ...@@ -275,7 +275,7 @@ index_beginscan_bitmap(Relation indexRelation,
*/ */
static IndexScanDesc static IndexScanDesc
index_beginscan_internal(Relation indexRelation, index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys) int nkeys, int norderbys, Snapshot snapshot)
{ {
IndexScanDesc scan; IndexScanDesc scan;
FmgrInfo *procedure; FmgrInfo *procedure;
...@@ -284,7 +284,7 @@ index_beginscan_internal(Relation indexRelation, ...@@ -284,7 +284,7 @@ index_beginscan_internal(Relation indexRelation,
GET_REL_PROCEDURE(ambeginscan); GET_REL_PROCEDURE(ambeginscan);
if (!(indexRelation->rd_am->ampredlocks)) if (!(indexRelation->rd_am->ampredlocks))
PredicateLockRelation(indexRelation); PredicateLockRelation(indexRelation, snapshot);
/* /*
* We hold a reference count to the relcache entry throughout the scan. * We hold a reference count to the relcache entry throughout the scan.
...@@ -602,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -602,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
scan->xs_cbuf); scan->xs_cbuf);
CheckForSerializableConflictOut(valid, scan->heapRelation, CheckForSerializableConflictOut(valid, scan->heapRelation,
heapTuple, scan->xs_cbuf); heapTuple, scan->xs_cbuf,
scan->xs_snapshot);
if (valid) if (valid)
{ {
...@@ -624,7 +625,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) ...@@ -624,7 +625,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
else else
scan->xs_next_hot = InvalidOffsetNumber; scan->xs_next_hot = InvalidOffsetNumber;
PredicateLockTuple(scan->heapRelation, heapTuple); PredicateLockTuple(scan->heapRelation, heapTuple, scan->xs_snapshot);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
......
...@@ -64,10 +64,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, ...@@ -64,10 +64,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
/* If index is empty and access = BT_READ, no root page is created. */ /* If index is empty and access = BT_READ, no root page is created. */
if (!BufferIsValid(*bufP)) if (!BufferIsValid(*bufP))
{
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return (BTStack) NULL; return (BTStack) NULL;
}
/* Loop iterates once per level descended in the tree */ /* Loop iterates once per level descended in the tree */
for (;;) for (;;)
...@@ -92,11 +89,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, ...@@ -92,11 +89,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
page = BufferGetPage(*bufP); page = BufferGetPage(*bufP);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISLEAF(opaque)) if (P_ISLEAF(opaque))
{
if (access == BT_READ)
PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
break; break;
}
/* /*
* Find the appropriate item on the internal page, and get the child * Find the appropriate item on the internal page, and get the child
...@@ -855,9 +848,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ...@@ -855,9 +848,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf)) if (!BufferIsValid(buf))
{ {
/* Only get here if index is completely empty */ /*
* We only get here if the index is completely empty.
* Lock relation because nothing finer to lock exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
return false; return false;
} }
else
PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot);
/* initialize moreLeft/moreRight appropriately for scan direction */ /* initialize moreLeft/moreRight appropriately for scan direction */
if (ScanDirectionIsForward(dir)) if (ScanDirectionIsForward(dir))
...@@ -1153,7 +1153,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) ...@@ -1153,7 +1153,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque)) if (!P_IGNORE(opaque))
{ {
PredicateLockPage(rel, blkno); PredicateLockPage(rel, blkno, scan->xs_snapshot);
/* see if there are any matches on this page */ /* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */ /* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
...@@ -1201,7 +1201,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) ...@@ -1201,7 +1201,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque)) if (!P_IGNORE(opaque))
{ {
PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf)); PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot);
/* see if there are any matches on this page */ /* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */ /* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
...@@ -1363,11 +1363,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost) ...@@ -1363,11 +1363,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
buf = _bt_gettrueroot(rel); buf = _bt_gettrueroot(rel);
if (!BufferIsValid(buf)) if (!BufferIsValid(buf))
{
/* empty index... */
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return InvalidBuffer; return InvalidBuffer;
}
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -1444,13 +1440,16 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) ...@@ -1444,13 +1440,16 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf)) if (!BufferIsValid(buf))
{ {
/* empty index... */ /*
PredicateLockRelation(rel); /* Nothing finer to lock exists. */ * Empty index. Lock the whole relation, as nothing finer to lock
* exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
so->currPos.buf = InvalidBuffer; so->currPos.buf = InvalidBuffer;
return false; return false;
} }
PredicateLockPage(rel, BufferGetBlockNumber(buf)); PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque)); Assert(P_ISLEAF(opaque));
......
...@@ -113,7 +113,8 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot) ...@@ -113,7 +113,8 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
TupleTableSlot * TupleTableSlot *
ExecSeqScan(SeqScanState *node) ExecSeqScan(SeqScanState *node)
{ {
PredicateLockRelation(node->ss_currentRelation); PredicateLockRelation(node->ss_currentRelation,
node->ss_currentScanDesc->rs_snapshot);
node->ss_currentScanDesc->rs_relpredicatelocked = true; node->ss_currentScanDesc->rs_relpredicatelocked = true;
return ExecScan((ScanState *) node, return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext, (ExecScanAccessMtd) SeqNext,
......
...@@ -148,9 +148,11 @@ ...@@ -148,9 +148,11 @@
* predicate lock maintenance * predicate lock maintenance
* RegisterSerializableTransaction(Snapshot snapshot) * RegisterSerializableTransaction(Snapshot snapshot)
* RegisterPredicateLockingXid(void) * RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation) * PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno) * PredicateLockPage(Relation relation, BlockNumber blkno,
* PredicateLockTuple(Relation relation, HeapTuple tuple) * Snapshot snapshot)
* PredicateLockTuple(Relation relation, HeapTuple tuple,
* Snapshot snapshot)
* PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
* BlockNumber newblkno); * BlockNumber newblkno);
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
...@@ -160,7 +162,8 @@ ...@@ -160,7 +162,8 @@
* *
* conflict detection (may also trigger rollback) * conflict detection (may also trigger rollback)
* CheckForSerializableConflictOut(bool visible, Relation relation, * CheckForSerializableConflictOut(bool visible, Relation relation,
* HeapTupleData *tup, Buffer buffer) * HeapTupleData *tup, Buffer buffer,
* Snapshot snapshot)
* CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup, * CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
* Buffer buffer) * Buffer buffer)
* CheckTableForSerializableConflictIn(Relation relation) * CheckTableForSerializableConflictIn(Relation relation)
...@@ -258,26 +261,6 @@ ...@@ -258,26 +261,6 @@
#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0) #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
#define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0) #define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0)
/*
* Is this relation exempt from predicate locking? We don't do predicate
* locking on system or temporary relations.
*/
#define SkipPredicateLocksForRelation(relation) \
(((relation)->rd_id < FirstBootstrapObjectId) \
|| RelationUsesLocalBuffers(relation))
/*
* When a public interface method is called for serializing a relation within
* the current transaction, this is the test to see if we should do a quick
* return.
*/
#define SkipSerialization(relation) \
((!IsolationIsSerializable()) \
|| ((MySerializableXact == InvalidSerializableXact)) \
|| ReleasePredicateLocksIfROSafe() \
|| SkipPredicateLocksForRelation(relation))
/* /*
* Compute the hash code associated with a PREDICATELOCKTARGETTAG. * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
* *
...@@ -444,7 +427,6 @@ static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag); ...@@ -444,7 +427,6 @@ static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
static void DropAllPredicateLocksFromTable(const Relation relation, static void DropAllPredicateLocksFromTable(const Relation relation,
bool transfer); bool transfer);
static void SetNewSxactGlobalXmin(void); static void SetNewSxactGlobalXmin(void);
static bool ReleasePredicateLocksIfROSafe(void);
static void ClearOldPredicateLocks(void); static void ClearOldPredicateLocks(void);
static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
bool summarize); bool summarize);
...@@ -454,6 +436,91 @@ static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); ...@@ -454,6 +436,91 @@ static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
SERIALIZABLEXACT *writer); SERIALIZABLEXACT *writer);
/*------------------------------------------------------------------------*/
/*
* Does this relation participate in predicate locking? Temporary and system
* relations are exempt.
*/
static inline bool
PredicateLockingNeededForRelation(Relation relation)
{
return !(relation->rd_id < FirstBootstrapObjectId ||
RelationUsesLocalBuffers(relation));
}
/*
* When a public interface method is called for a read, this is the test to
* see if we should do a quick return.
*
* Note: this function has side-effects! If this transaction has been flagged
* as RO-safe since the last call, we release all predicate locks and reset
* MySerializableXact. That makes subsequent calls to return quickly.
*
* This is marked as 'inline' to make to eliminate the function call overhead
* in the common case that serialization is not needed.
*/
static inline bool
SerializationNeededForRead(Relation relation, Snapshot snapshot)
{
/* Nothing to do if this is not a serializable transaction */
if (MySerializableXact == InvalidSerializableXact)
return false;
/*
* Don't acquire locks or conflict when scanning with a special snapshot.
* This excludes things like CLUSTER and REINDEX. They use the wholesale
* functions TransferPredicateLocksToHeapRelation() and
* CheckTableForSerializableConflictIn() to participate serialization, but
* the scans involved don't need serialization.
*/
if (!IsMVCCSnapshot(snapshot))
return false;
/*
* Check if we have just become "RO-safe". If we have, immediately release
* all locks as they're not needed anymore. This also resets
* MySerializableXact, so that subsequent calls to this function can exit
* quickly.
*
* A transaction is flagged as RO_SAFE if all concurrent R/W
* transactions commit without having conflicts out to an earlier
* snapshot, thus ensuring that no conflicts are possible for this
* transaction.
*/
if (SxactIsROSafe(MySerializableXact))
{
ReleasePredicateLocks(false);
return false;
}
/* Check if the relation doesn't participate in predicate locking */
if (!PredicateLockingNeededForRelation(relation))
return false;
return true; /* no excuse to skip predicate locking */
}
/*
* Like SerializationNeededForRead(), but called on writes.
* The logic is the same, but there is no snapshot and we can't be RO-safe.
*/
static inline bool
SerializationNeededForWrite(Relation relation)
{
/* Nothing to do if this is not a serializable transaction */
if (MySerializableXact == InvalidSerializableXact)
return false;
/* Check if the relation doesn't participate in predicate locking */
if (!PredicateLockingNeededForRelation(relation))
return false;
return true; /* no excuse to skip predicate locking */
}
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
/* /*
...@@ -2244,11 +2311,11 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag) ...@@ -2244,11 +2311,11 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
* Clear any finer-grained predicate locks this session has on the relation. * Clear any finer-grained predicate locks this session has on the relation.
*/ */
void void
PredicateLockRelation(const Relation relation) PredicateLockRelation(const Relation relation, const Snapshot snapshot)
{ {
PREDICATELOCKTARGETTAG tag; PREDICATELOCKTARGETTAG tag;
if (SkipSerialization(relation)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
SET_PREDICATELOCKTARGETTAG_RELATION(tag, SET_PREDICATELOCKTARGETTAG_RELATION(tag,
...@@ -2267,11 +2334,12 @@ PredicateLockRelation(const Relation relation) ...@@ -2267,11 +2334,12 @@ PredicateLockRelation(const Relation relation)
* Clear any finer-grained predicate locks this session has on the relation. * Clear any finer-grained predicate locks this session has on the relation.
*/ */
void void
PredicateLockPage(const Relation relation, const BlockNumber blkno) PredicateLockPage(const Relation relation, const BlockNumber blkno,
const Snapshot snapshot)
{ {
PREDICATELOCKTARGETTAG tag; PREDICATELOCKTARGETTAG tag;
if (SkipSerialization(relation)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
SET_PREDICATELOCKTARGETTAG_PAGE(tag, SET_PREDICATELOCKTARGETTAG_PAGE(tag,
...@@ -2289,13 +2357,14 @@ PredicateLockPage(const Relation relation, const BlockNumber blkno) ...@@ -2289,13 +2357,14 @@ PredicateLockPage(const Relation relation, const BlockNumber blkno)
* Skip if this is a temporary table. * Skip if this is a temporary table.
*/ */
void void
PredicateLockTuple(const Relation relation, const HeapTuple tuple) PredicateLockTuple(const Relation relation, const HeapTuple tuple,
const Snapshot snapshot)
{ {
PREDICATELOCKTARGETTAG tag; PREDICATELOCKTARGETTAG tag;
ItemPointer tid; ItemPointer tid;
TransactionId targetxmin; TransactionId targetxmin;
if (SkipSerialization(relation)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
/* /*
...@@ -2666,7 +2735,7 @@ DropAllPredicateLocksFromTable(const Relation relation, bool transfer) ...@@ -2666,7 +2735,7 @@ DropAllPredicateLocksFromTable(const Relation relation, bool transfer)
if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)) if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
return; return;
if (SkipPredicateLocksForRelation(relation)) if (!PredicateLockingNeededForRelation(relation))
return; return;
dbId = relation->rd_node.dbNode; dbId = relation->rd_node.dbNode;
...@@ -2881,7 +2950,7 @@ PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, ...@@ -2881,7 +2950,7 @@ PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)) if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
return; return;
if (SkipPredicateLocksForRelation(relation)) if (!PredicateLockingNeededForRelation(relation))
return; return;
Assert(oldblkno != newblkno); Assert(oldblkno != newblkno);
...@@ -3310,30 +3379,6 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3310,30 +3379,6 @@ ReleasePredicateLocks(const bool isCommit)
} }
} }
/*
* ReleasePredicateLocksIfROSafe
* Check if the current transaction is read only and operating on
* a safe snapshot. If so, release predicate locks and return
* true.
*
* A transaction is flagged as RO_SAFE if all concurrent R/W
* transactions commit without having conflicts out to an earlier
* snapshot, thus ensuring that no conflicts are possible for this
* transaction. Thus, we call this function as part of the
* SkipSerialization check on all public interface methods.
*/
static bool
ReleasePredicateLocksIfROSafe(void)
{
if (SxactIsROSafe(MySerializableXact))
{
ReleasePredicateLocks(false);
return true;
}
else
return false;
}
/* /*
* Clear old predicate locks, belonging to committed transactions that are no * Clear old predicate locks, belonging to committed transactions that are no
* longer interesting to any in-progress transaction. * longer interesting to any in-progress transaction.
...@@ -3679,7 +3724,8 @@ XidIsConcurrent(TransactionId xid) ...@@ -3679,7 +3724,8 @@ XidIsConcurrent(TransactionId xid)
*/ */
void void
CheckForSerializableConflictOut(const bool visible, const Relation relation, CheckForSerializableConflictOut(const bool visible, const Relation relation,
const HeapTuple tuple, const Buffer buffer) const HeapTuple tuple, const Buffer buffer,
const Snapshot snapshot)
{ {
TransactionId xid; TransactionId xid;
SERIALIZABLEXIDTAG sxidtag; SERIALIZABLEXIDTAG sxidtag;
...@@ -3687,7 +3733,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation, ...@@ -3687,7 +3733,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
SERIALIZABLEXACT *sxact; SERIALIZABLEXACT *sxact;
HTSV_Result htsvResult; HTSV_Result htsvResult;
if (SkipSerialization(relation)) if (!SerializationNeededForRead(relation, snapshot))
return; return;
if (SxactIsMarkedForDeath(MySerializableXact)) if (SxactIsMarkedForDeath(MySerializableXact))
...@@ -4064,7 +4110,7 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, ...@@ -4064,7 +4110,7 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
{ {
PREDICATELOCKTARGETTAG targettag; PREDICATELOCKTARGETTAG targettag;
if (SkipSerialization(relation)) if (!SerializationNeededForWrite(relation))
return; return;
if (SxactIsMarkedForDeath(MySerializableXact)) if (SxactIsMarkedForDeath(MySerializableXact))
...@@ -4160,7 +4206,7 @@ CheckTableForSerializableConflictIn(const Relation relation) ...@@ -4160,7 +4206,7 @@ CheckTableForSerializableConflictIn(const Relation relation)
if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)) if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
return; return;
if (SkipSerialization(relation)) if (!SerializationNeededForWrite(relation))
return; return;
/* /*
......
...@@ -44,16 +44,17 @@ extern bool PageIsPredicateLocked(const Relation relation, const BlockNumber blk ...@@ -44,16 +44,17 @@ extern bool PageIsPredicateLocked(const Relation relation, const BlockNumber blk
/* predicate lock maintenance */ /* predicate lock maintenance */
extern Snapshot RegisterSerializableTransaction(Snapshot snapshot); extern Snapshot RegisterSerializableTransaction(Snapshot snapshot);
extern void RegisterPredicateLockingXid(const TransactionId xid); extern void RegisterPredicateLockingXid(const TransactionId xid);
extern void PredicateLockRelation(const Relation relation); extern void PredicateLockRelation(const Relation relation, const Snapshot snapshot);
extern void PredicateLockPage(const Relation relation, const BlockNumber blkno); extern void PredicateLockPage(const Relation relation, const BlockNumber blkno, const Snapshot snapshot);
extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple); extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple, const Snapshot snapshot);
extern void PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno); extern void PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
extern void PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno); extern void PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
extern void TransferPredicateLocksToHeapRelation(const Relation relation); extern void TransferPredicateLocksToHeapRelation(const Relation relation);
extern void ReleasePredicateLocks(const bool isCommit); extern void ReleasePredicateLocks(const bool isCommit);
/* conflict detection (may also trigger rollback) */ /* conflict detection (may also trigger rollback) */
extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple, const Buffer buffer); extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple,
const Buffer buffer, const Snapshot snapshot);
extern void CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, const Buffer buffer); extern void CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, const Buffer buffer);
extern void CheckTableForSerializableConflictIn(const Relation relation); extern void CheckTableForSerializableConflictIn(const Relation relation);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment