Commit 47ad7912 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix bugs in Serializable Snapshot Isolation.

Change the way UPDATEs are handled. Instead of maintaining a chain of
tuple-level locks in shared memory, copy any existing locks on the old
tuple to the new tuple at UPDATE. Any existing page-level lock needs to
be duplicated too, as a lock on the new tuple. That was neglected
previously.

Store xmin on tuple-level predicate locks, to distinguish a lock on an old
already-recycled tuple from a new tuple at the same physical location.
Failure to distinguish them caused loops in the tuple-lock chains, as
reported by YAMAMOTO Takashi. Although we don't use the chain representation
of UPDATEs anymore, it seems like a good idea to store the xmin to avoid
some false positives if no other reason.

CheckSingleTargetForConflictsIn now correctly handles the case where a lock
that's being held is not reflected in the local lock table. That happens
if another backend acquires a lock on our behalf due to an UPDATE or a page
split.

PredicateLockPageCombine now retains locks for the page that is being
removed, rather than removing them. This prevents a potentially dangerous
false-positive inconsistency where the local lock table believes that a lock
is held, but it is actually not.

Dan Ports and Kevin Grittner
parent 16143d64
...@@ -824,7 +824,6 @@ restart: ...@@ -824,7 +824,6 @@ restart:
if (_bt_page_recyclable(page)) if (_bt_page_recyclable(page))
{ {
/* Okay to recycle this page */ /* Okay to recycle this page */
Assert(!PageIsPredicateLocked(rel, blkno));
RecordFreeIndexPage(rel, blkno); RecordFreeIndexPage(rel, blkno);
vstate->totFreePages++; vstate->totFreePages++;
stats->pages_deleted++; stats->pages_deleted++;
......
...@@ -124,10 +124,6 @@ ...@@ -124,10 +124,6 @@
* SerializableXactHashLock * SerializableXactHashLock
* - Protects both PredXact and SerializableXidHash. * - Protects both PredXact and SerializableXidHash.
* *
* PredicateLockNextRowLinkLock
* - Protects the priorVersionOfRow and nextVersionOfRow fields of
* PREDICATELOCKTARGET when linkage is being created or destroyed.
*
* *
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
...@@ -444,8 +440,6 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, ...@@ -444,8 +440,6 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
bool summarize); bool summarize);
static bool XidIsConcurrent(TransactionId xid); static bool XidIsConcurrent(TransactionId xid);
static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag); static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
static bool CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
PREDICATELOCKTARGETTAG *nexttargettag);
static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
SERIALIZABLEXACT *writer); SERIALIZABLEXACT *writer);
...@@ -1044,7 +1038,6 @@ InitPredicateLocks(void) ...@@ -1044,7 +1038,6 @@ InitPredicateLocks(void)
PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1; PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
PredXact->CanPartialClearThrough = 0; PredXact->CanPartialClearThrough = 0;
PredXact->HavePartialClearedThrough = 0; PredXact->HavePartialClearedThrough = 0;
PredXact->NeedTargetLinkCleanup = false;
requestSize = mul_size((Size) max_table_size, requestSize = mul_size((Size) max_table_size,
PredXactListElementDataSize); PredXactListElementDataSize);
PredXact->element = ShmemAlloc(requestSize); PredXact->element = ShmemAlloc(requestSize);
...@@ -1651,9 +1644,10 @@ PageIsPredicateLocked(const Relation relation, const BlockNumber blkno) ...@@ -1651,9 +1644,10 @@ PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
* Important note: this function may return false even if the lock is * Important note: this function may return false even if the lock is
* being held, because it uses the local lock table which is not * being held, because it uses the local lock table which is not
* updated if another transaction modifies our lock list (e.g. to * updated if another transaction modifies our lock list (e.g. to
* split an index page). However, it will never return true if the * split an index page). It can also return true when a coarser
* lock is not held. We only use this function in circumstances where * granularity lock that covers this target is being held. Be careful
* such false negatives are acceptable. * to only use this function in circumstances where such errors are
* acceptable!
*/ */
static bool static bool
PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag) PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
...@@ -1717,6 +1711,9 @@ GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, ...@@ -1717,6 +1711,9 @@ GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
/* /*
* Check whether the lock we are considering is already covered by a * Check whether the lock we are considering is already covered by a
* coarser lock for our transaction. * coarser lock for our transaction.
*
* Like PredicateLockExists, this function might return a false
* negative, but it will never return a false positive.
*/ */
static bool static bool
CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag) CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
...@@ -1747,7 +1744,6 @@ static void ...@@ -1747,7 +1744,6 @@ static void
RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
{ {
PREDICATELOCKTARGET *rmtarget; PREDICATELOCKTARGET *rmtarget;
PREDICATELOCKTARGET *next;
Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
...@@ -1755,33 +1751,6 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) ...@@ -1755,33 +1751,6 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
if (!SHMQueueEmpty(&target->predicateLocks)) if (!SHMQueueEmpty(&target->predicateLocks))
return; return;
/* Can't remove it if there are locks for a prior row version. */
LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
if (target->priorVersionOfRow != NULL)
{
LWLockRelease(PredicateLockNextRowLinkLock);
return;
}
/*
* We are going to release this target, This requires that we let the
* next version of the row (if any) know that it's previous version is
* done.
*
* It might be that the link was all that was keeping the other target
* from cleanup, but we can't clean that up here -- LW locking is all
* wrong for that. We'll pass the HTAB in the general cleanup function to
* get rid of such "dead" targets.
*/
next = target->nextVersionOfRow;
if (next != NULL)
{
next->priorVersionOfRow = NULL;
if (SHMQueueEmpty(&next->predicateLocks))
PredXact->NeedTargetLinkCleanup = true;
}
LWLockRelease(PredicateLockNextRowLinkLock);
/* Actually remove the target. */ /* Actually remove the target. */
rmtarget = hash_search_with_hash_value(PredicateLockTargetHash, rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
&target->tag, &target->tag,
...@@ -2065,11 +2034,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, ...@@ -2065,11 +2034,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
errmsg("out of shared memory"), errmsg("out of shared memory"),
errhint("You might need to increase max_pred_locks_per_transaction."))); errhint("You might need to increase max_pred_locks_per_transaction.")));
if (!found) if (!found)
{
SHMQueueInit(&(target->predicateLocks)); SHMQueueInit(&(target->predicateLocks));
target->priorVersionOfRow = NULL;
target->nextVersionOfRow = NULL;
}
/* We've got the sxact and target, make sure they're joined. */ /* We've got the sxact and target, make sure they're joined. */
locktag.myTarget = target; locktag.myTarget = target;
...@@ -2125,8 +2090,6 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag) ...@@ -2125,8 +2090,6 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
hash_search_with_hash_value(LocalPredicateLockHash, hash_search_with_hash_value(LocalPredicateLockHash,
targettag, targettaghash, targettag, targettaghash,
HASH_ENTER, &found); HASH_ENTER, &found);
/* We should not hold the lock (but its entry might still exist) */
Assert(!found || !locallock->held);
locallock->held = true; locallock->held = true;
if (!found) if (!found)
locallock->childLocks = 0; locallock->childLocks = 0;
...@@ -2215,6 +2178,7 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple) ...@@ -2215,6 +2178,7 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple)
{ {
PREDICATELOCKTARGETTAG tag; PREDICATELOCKTARGETTAG tag;
ItemPointer tid; ItemPointer tid;
TransactionId targetxmin;
if (SkipSerialization(relation)) if (SkipSerialization(relation))
return; return;
...@@ -2224,15 +2188,16 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple) ...@@ -2224,15 +2188,16 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple)
*/ */
if (relation->rd_index == NULL) if (relation->rd_index == NULL)
{ {
TransactionId myxid = GetTopTransactionIdIfAny(); TransactionId myxid;
targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
myxid = GetTopTransactionIdIfAny();
if (TransactionIdIsValid(myxid)) if (TransactionIdIsValid(myxid))
{ {
TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data); if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
if (TransactionIdFollowsOrEquals(xid, TransactionXmin))
{ {
xid = SubTransGetTopmostTransaction(xid); TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
if (TransactionIdEquals(xid, myxid)) if (TransactionIdEquals(xid, myxid))
{ {
/* We wrote it; we already have a write lock. */ /* We wrote it; we already have a write lock. */
...@@ -2241,6 +2206,8 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple) ...@@ -2241,6 +2206,8 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple)
} }
} }
} }
else
targetxmin = InvalidTransactionId;
/* /*
* Do quick-but-not-definitive test for a relation lock first. This will * Do quick-but-not-definitive test for a relation lock first. This will
...@@ -2259,116 +2226,78 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple) ...@@ -2259,116 +2226,78 @@ PredicateLockTuple(const Relation relation, const HeapTuple tuple)
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
ItemPointerGetBlockNumber(tid), ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid)); ItemPointerGetOffsetNumber(tid),
targetxmin);
PredicateLockAcquire(&tag); PredicateLockAcquire(&tag);
} }
/* /*
* If the old tuple has any predicate locks, create a lock target for the * If the old tuple has any predicate locks, copy them to the new target.
* new tuple and point them at each other. Conflict detection needs to *
* look for locks against prior versions of the row. * This is called at an UPDATE, where any predicate locks held on the old
* tuple need to be copied to the new tuple, because logically they both
* represent the same row. A lock taken before the update must conflict
* with anyone locking the same row after the update.
*/ */
void void
PredicateLockTupleRowVersionLink(const Relation relation, PredicateLockTupleRowVersionLink(const Relation relation,
const HeapTuple oldTuple, const HeapTuple oldTuple,
const HeapTuple newTuple) const HeapTuple newTuple)
{ {
PREDICATELOCKTARGETTAG oldtargettag; PREDICATELOCKTARGETTAG oldtupletag;
PREDICATELOCKTARGETTAG newtargettag; PREDICATELOCKTARGETTAG oldpagetag;
PREDICATELOCKTARGET *oldtarget; PREDICATELOCKTARGETTAG newtupletag;
PREDICATELOCKTARGET *newtarget; BlockNumber oldblk,
PREDICATELOCKTARGET *next; newblk;
uint32 oldtargettaghash; OffsetNumber oldoff,
LWLockId oldpartitionLock; newoff;
uint32 newtargettaghash; TransactionId oldxmin,
LWLockId newpartitionLock; newxmin;
bool found;
oldblk = ItemPointerGetBlockNumber(&(oldTuple->t_self));
SET_PREDICATELOCKTARGETTAG_TUPLE(oldtargettag, oldoff = ItemPointerGetOffsetNumber(&(oldTuple->t_self));
oldxmin = HeapTupleHeaderGetXmin(oldTuple->t_data);
newblk = ItemPointerGetBlockNumber(&(newTuple->t_self));
newoff = ItemPointerGetOffsetNumber(&(newTuple->t_self));
newxmin = HeapTupleHeaderGetXmin(newTuple->t_data);
SET_PREDICATELOCKTARGETTAG_TUPLE(oldtupletag,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
ItemPointerGetBlockNumber(&(oldTuple->t_self)), oldblk,
ItemPointerGetOffsetNumber(&(oldTuple->t_self))); oldoff,
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); oldxmin);
oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
SET_PREDICATELOCKTARGETTAG_TUPLE(newtargettag, SET_PREDICATELOCKTARGETTAG_PAGE(oldpagetag,
relation->rd_node.dbNode,
relation->rd_id,
oldblk);
SET_PREDICATELOCKTARGETTAG_TUPLE(newtupletag,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
ItemPointerGetBlockNumber(&(newTuple->t_self)), newblk,
ItemPointerGetOffsetNumber(&(newTuple->t_self))); newoff,
newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag); newxmin);
newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
/* Lock lower numbered partition first. */
if (oldpartitionLock < newpartitionLock)
{
LWLockAcquire(oldpartitionLock, LW_SHARED);
LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
}
else if (newpartitionLock < oldpartitionLock)
{
LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
LWLockAcquire(oldpartitionLock, LW_SHARED);
}
else
LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
oldtarget = (PREDICATELOCKTARGET *)
hash_search_with_hash_value(PredicateLockTargetHash,
&oldtargettag, oldtargettaghash,
HASH_FIND, NULL);
/* Only need to link if there is an old target already. */
if (oldtarget)
{
LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
/* Guard against stale pointers from rollback. */
next = oldtarget->nextVersionOfRow;
if (next != NULL)
{
next->priorVersionOfRow = NULL;
oldtarget->nextVersionOfRow = NULL;
}
/* Find or create the new target, and link old and new. */
newtarget = (PREDICATELOCKTARGET *)
hash_search_with_hash_value(PredicateLockTargetHash,
&newtargettag, newtargettaghash,
HASH_ENTER, &found);
if (!newtarget)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_pred_locks_per_transaction.")));
if (!found)
{
SHMQueueInit(&(newtarget->predicateLocks));
newtarget->nextVersionOfRow = NULL;
}
else
Assert(newtarget->priorVersionOfRow == NULL);
newtarget->priorVersionOfRow = oldtarget; /*
oldtarget->nextVersionOfRow = newtarget; * A page-level lock on the page containing the old tuple counts too.
* Anyone holding a lock on the page is logically holding a lock on
* the old tuple, so we need to acquire a lock on his behalf on the
* new tuple too. However, if the new tuple is on the same page as the
* old one, the old page-level lock already covers the new tuple.
*
* A relation-level lock always covers both tuple versions, so we don't
* need to worry about those here.
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
LWLockRelease(PredicateLockNextRowLinkLock); TransferPredicateLocksToNewTarget(oldtupletag, newtupletag, false);
} if (newblk != oldblk)
TransferPredicateLocksToNewTarget(oldpagetag, newtupletag, false);
/* Release lower number partition last. */ LWLockRelease(SerializablePredicateLockListLock);
if (oldpartitionLock < newpartitionLock)
{
LWLockRelease(newpartitionLock);
LWLockRelease(oldpartitionLock);
}
else if (newpartitionLock < oldpartitionLock)
{
LWLockRelease(oldpartitionLock);
LWLockRelease(newpartitionLock);
}
else
LWLockRelease(newpartitionLock);
} }
...@@ -2437,6 +2366,17 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) ...@@ -2437,6 +2366,17 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
* removeOld is set (by using the reserved entry in * removeOld is set (by using the reserved entry in
* PredicateLockTargetHash for scratch space). * PredicateLockTargetHash for scratch space).
* *
* Warning: the "removeOld" option should be used only with care,
* because this function does not (indeed, can not) update other
* backends' LocalPredicateLockHash. If we are only adding new
* entries, this is not a problem: the local lock table is used only
* as a hint, so missing entries for locks that are held are
* OK. Having entries for locks that are no longer held, as can happen
* when using "removeOld", is not in general OK. We can only use it
* safely when replacing a lock with a coarser-granularity lock that
* covers it, or if we are absolutely certain that no one will need to
* refer to that lock in the future.
*
* Caller must hold SerializablePredicateLockListLock. * Caller must hold SerializablePredicateLockListLock.
*/ */
static bool static bool
...@@ -2533,11 +2473,7 @@ TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag, ...@@ -2533,11 +2473,7 @@ TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
/* If we created a new entry, initialize it */ /* If we created a new entry, initialize it */
if (!found) if (!found)
{
SHMQueueInit(&(newtarget->predicateLocks)); SHMQueueInit(&(newtarget->predicateLocks));
newtarget->priorVersionOfRow = NULL;
newtarget->nextVersionOfRow = NULL;
}
newpredlocktag.myTarget = newtarget; newpredlocktag.myTarget = newtarget;
...@@ -2704,7 +2640,14 @@ PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, ...@@ -2704,7 +2640,14 @@ PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
&newtargettag); &newtargettag);
Assert(success); Assert(success);
/* Move the locks to the parent. This shouldn't fail. */ /*
* Move the locks to the parent. This shouldn't fail.
*
* Note that here we are removing locks held by other
* backends, leading to a possible inconsistency in their
* local lock hash table. This is OK because we're replacing
* it with a lock that covers the old one.
*/
success = TransferPredicateLocksToNewTarget(oldtargettag, success = TransferPredicateLocksToNewTarget(oldtargettag,
newtargettag, newtargettag,
true); true);
...@@ -2727,36 +2670,19 @@ void ...@@ -2727,36 +2670,19 @@ void
PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
const BlockNumber newblkno) const BlockNumber newblkno)
{ {
PREDICATELOCKTARGETTAG oldtargettag; /*
PREDICATELOCKTARGETTAG newtargettag; * Page combines differ from page splits in that we ought to be
bool success; * able to remove the locks on the old page after transferring
* them to the new page, instead of duplicating them. However,
* because we can't edit other backends' local lock tables,
if (SkipSplitTracking(relation)) * removing the old lock would leave them with an entry in their
return; * LocalPredicateLockHash for a lock they're not holding, which
* isn't acceptable. So we wind up having to do the same work as a
Assert(oldblkno != newblkno); * page split, acquiring a lock on the new page and keeping the old
Assert(BlockNumberIsValid(oldblkno)); * page locked too. That can lead to some false positives, but
Assert(BlockNumberIsValid(newblkno)); * should be rare in practice.
*/
SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag, PredicateLockPageSplit(relation, oldblkno, newblkno);
relation->rd_node.dbNode,
relation->rd_id,
oldblkno);
SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
relation->rd_node.dbNode,
relation->rd_id,
newblkno);
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
/* Move the locks. This shouldn't fail. */
success = TransferPredicateLocksToNewTarget(oldtargettag,
newtargettag,
true);
Assert(success);
LWLockRelease(SerializablePredicateLockListLock);
} }
/* /*
...@@ -3132,9 +3058,6 @@ ClearOldPredicateLocks(void) ...@@ -3132,9 +3058,6 @@ ClearOldPredicateLocks(void)
{ {
SERIALIZABLEXACT *finishedSxact; SERIALIZABLEXACT *finishedSxact;
PREDICATELOCK *predlock; PREDICATELOCK *predlock;
int i;
HASH_SEQ_STATUS seqstat;
PREDICATELOCKTARGET *locktarget;
LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE); LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
finishedSxact = (SERIALIZABLEXACT *) finishedSxact = (SERIALIZABLEXACT *)
...@@ -3232,35 +3155,6 @@ ClearOldPredicateLocks(void) ...@@ -3232,35 +3155,6 @@ ClearOldPredicateLocks(void)
LWLockRelease(SerializablePredicateLockListLock); LWLockRelease(SerializablePredicateLockListLock);
LWLockRelease(SerializableFinishedListLock); LWLockRelease(SerializableFinishedListLock);
if (!PredXact->NeedTargetLinkCleanup)
return;
/*
* Clean up any targets which were disconnected from a prior version with
* no predicate locks attached.
*/
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
hash_seq_init(&seqstat, PredicateLockTargetHash);
while ((locktarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
{
if (SHMQueueEmpty(&locktarget->predicateLocks)
&& locktarget->priorVersionOfRow == NULL
&& locktarget->nextVersionOfRow == NULL)
{
hash_search(PredicateLockTargetHash, &locktarget->tag,
HASH_REMOVE, NULL);
}
}
PredXact->NeedTargetLinkCleanup = false;
LWLockRelease(PredicateLockNextRowLinkLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
LWLockRelease(FirstPredicateLockMgrLock + i);
} }
/* /*
...@@ -3676,38 +3570,15 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation, ...@@ -3676,38 +3570,15 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
} }
/* /*
* Check a particular target for rw-dependency conflict in. This will * Check a particular target for rw-dependency conflict in.
* also check prior versions of a tuple, if any.
*/ */
static void static void
CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
{
PREDICATELOCKTARGETTAG nexttargettag = { 0 };
PREDICATELOCKTARGETTAG thistargettag;
for (;;)
{
if (!CheckSingleTargetForConflictsIn(targettag, &nexttargettag))
break;
thistargettag = nexttargettag;
targettag = &thistargettag;
}
}
/*
* Check a particular target for rw-dependency conflict in. If the tuple
* has prior versions, returns true and *nexttargettag is set to the tag
* of the prior tuple version.
*/
static bool
CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
PREDICATELOCKTARGETTAG *nexttargettag)
{ {
uint32 targettaghash; uint32 targettaghash;
LWLockId partitionLock; LWLockId partitionLock;
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
PREDICATELOCK *predlock; PREDICATELOCK *predlock;
bool hasnexttarget = false;
Assert(MySerializableXact != InvalidSerializableXact); Assert(MySerializableXact != InvalidSerializableXact);
...@@ -3717,7 +3588,6 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag, ...@@ -3717,7 +3588,6 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
targettaghash = PredicateLockTargetTagHashCode(targettag); targettaghash = PredicateLockTargetTagHashCode(targettag);
partitionLock = PredicateLockHashPartitionLock(targettaghash); partitionLock = PredicateLockHashPartitionLock(targettaghash);
LWLockAcquire(partitionLock, LW_SHARED); LWLockAcquire(partitionLock, LW_SHARED);
LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
target = (PREDICATELOCKTARGET *) target = (PREDICATELOCKTARGET *)
hash_search_with_hash_value(PredicateLockTargetHash, hash_search_with_hash_value(PredicateLockTargetHash,
targettag, targettaghash, targettag, targettaghash,
...@@ -3725,21 +3595,9 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag, ...@@ -3725,21 +3595,9 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
if (!target) if (!target)
{ {
/* Nothing has this target locked; we're done here. */ /* Nothing has this target locked; we're done here. */
LWLockRelease(PredicateLockNextRowLinkLock);
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
return false; return;
}
/*
* If the target is linked to a prior version of the row, save the tag so
* that it can be used for iterative calls to this function.
*/
if (target->priorVersionOfRow != NULL)
{
*nexttargettag = target->priorVersionOfRow->tag;
hasnexttarget = true;
} }
LWLockRelease(PredicateLockNextRowLinkLock);
/* /*
* Each lock for an overlapping transaction represents a conflict: a * Each lock for an overlapping transaction represents a conflict: a
...@@ -3828,17 +3686,25 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag, ...@@ -3828,17 +3686,25 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
hash_search_with_hash_value(LocalPredicateLockHash, hash_search_with_hash_value(LocalPredicateLockHash,
targettag, targettaghash, targettag, targettaghash,
HASH_FIND, NULL); HASH_FIND, NULL);
Assert(locallock != NULL);
Assert(locallock->held);
locallock->held = false;
if (locallock->childLocks == 0) /*
* Remove entry in local lock table if it exists and has
* no children. It's OK if it doesn't exist; that means
* the lock was transferred to a new target by a
* different backend.
*/
if (locallock != NULL)
{ {
rmlocallock = (LOCALPREDICATELOCK *) locallock->held = false;
hash_search_with_hash_value(LocalPredicateLockHash,
targettag, targettaghash, if (locallock->childLocks == 0)
HASH_REMOVE, NULL); {
Assert(rmlocallock == locallock); rmlocallock = (LOCALPREDICATELOCK *)
hash_search_with_hash_value(LocalPredicateLockHash,
targettag, targettaghash,
HASH_REMOVE, NULL);
Assert(rmlocallock == locallock);
}
} }
DecrementParentLocks(targettag); DecrementParentLocks(targettag);
...@@ -3848,7 +3714,7 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag, ...@@ -3848,7 +3714,7 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
* the target, bail out before re-acquiring the locks. * the target, bail out before re-acquiring the locks.
*/ */
if (rmtarget) if (rmtarget)
return hasnexttarget; return;
/* /*
* The list has been altered. Start over at the front. * The list has been altered. Start over at the front.
...@@ -3895,8 +3761,6 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag, ...@@ -3895,8 +3761,6 @@ CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
} }
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
return hasnexttarget;
} }
/* /*
...@@ -3943,7 +3807,8 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, ...@@ -3943,7 +3807,8 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
relation->rd_node.dbNode, relation->rd_node.dbNode,
relation->rd_id, relation->rd_id,
ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)), ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid))); ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
HeapTupleHeaderGetXmin(tuple->t_data));
CheckTargetForConflictsIn(&targettag); CheckTargetForConflictsIn(&targettag);
} }
......
...@@ -78,7 +78,6 @@ typedef enum LWLockId ...@@ -78,7 +78,6 @@ typedef enum LWLockId
SerializableFinishedListLock, SerializableFinishedListLock,
SerializablePredicateLockListLock, SerializablePredicateLockListLock,
OldSerXidLock, OldSerXidLock,
PredicateLockNextRowLinkLock,
/* Individual lock IDs end here */ /* Individual lock IDs end here */
FirstBufMappingLock, FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
......
...@@ -150,8 +150,6 @@ typedef struct PredXactListData ...@@ -150,8 +150,6 @@ typedef struct PredXactListData
SerCommitSeqNo HavePartialClearedThrough; /* have cleared through this SerCommitSeqNo HavePartialClearedThrough; /* have cleared through this
* seq no */ * seq no */
SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */
bool NeedTargetLinkCleanup; /* to save cleanup effort for rare
* case */
PredXactListElement element; PredXactListElement element;
} PredXactListData; } PredXactListData;
...@@ -231,9 +229,13 @@ typedef struct SERIALIZABLEXID ...@@ -231,9 +229,13 @@ typedef struct SERIALIZABLEXID
/* /*
* The PREDICATELOCKTARGETTAG struct identifies a database object which can * The PREDICATELOCKTARGETTAG struct identifies a database object which can
* be the target of predicate locks. It is designed to fit into 16 bytes * be the target of predicate locks.
* with no padding. Note that this would need adjustment if we widen Oid or *
* BlockNumber to more than 32 bits. * Note that the hash function being used doesn't properly respect tag
* length -- it will go to a four byte boundary past the end of the tag.
* If you change this struct, make sure any slack space is initialized,
* so that any random bytes in the middle or at the end are not included
* in the hash.
* *
* TODO SSI: If we always use the same fields for the same type of value, we * TODO SSI: If we always use the same fields for the same type of value, we
* should rename these. Holding off until it's clear there are no exceptions. * should rename these. Holding off until it's clear there are no exceptions.
...@@ -247,8 +249,8 @@ typedef struct PREDICATELOCKTARGETTAG ...@@ -247,8 +249,8 @@ typedef struct PREDICATELOCKTARGETTAG
uint32 locktag_field1; /* a 32-bit ID field */ uint32 locktag_field1; /* a 32-bit ID field */
uint32 locktag_field2; /* a 32-bit ID field */ uint32 locktag_field2; /* a 32-bit ID field */
uint32 locktag_field3; /* a 32-bit ID field */ uint32 locktag_field3; /* a 32-bit ID field */
uint16 locktag_field4; /* a 16-bit ID field */ uint32 locktag_field4; /* a 32-bit ID field */
uint16 locktag_field5; /* a 16-bit ID field */ uint32 locktag_field5; /* a 32-bit ID field */
} PREDICATELOCKTARGETTAG; } PREDICATELOCKTARGETTAG;
/* /*
...@@ -260,12 +262,11 @@ typedef struct PREDICATELOCKTARGETTAG ...@@ -260,12 +262,11 @@ typedef struct PREDICATELOCKTARGETTAG
* already have one. An entry is removed when the last lock is removed from * already have one. An entry is removed when the last lock is removed from
* its list. * its list.
* *
* Because a check for predicate locks on a tuple target should also find * Because a particular target might become obsolete, due to update to a new
* locks on previous versions of the same row, if there are any created by * version, before the reading transaction is obsolete, we need some way to
* overlapping transactions, we keep a pointer to the target for the prior * prevent errors from reuse of a tuple ID. Rather than attempting to clean
* version of the row. We also keep a pointer to the next version of the * up the targets as the related tuples are pruned or vacuumed, we check the
* row, so that when we no longer have any predicate locks and the back * xmin on access. This should be far less costly.
* pointer is clear, we can clean up the prior pointer for the next version.
*/ */
typedef struct PREDICATELOCKTARGET PREDICATELOCKTARGET; typedef struct PREDICATELOCKTARGET PREDICATELOCKTARGET;
...@@ -277,15 +278,6 @@ struct PREDICATELOCKTARGET ...@@ -277,15 +278,6 @@ struct PREDICATELOCKTARGET
/* data */ /* data */
SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with
* predicate lock target */ * predicate lock target */
/*
* The following two pointers are only used for tuple locks, and are only
* consulted for conflict detection and cleanup; not for granularity
* promotion.
*/
PREDICATELOCKTARGET *priorVersionOfRow; /* what other locks to check */
PREDICATELOCKTARGET *nextVersionOfRow; /* who has pointer here for
* more targets */
}; };
...@@ -387,30 +379,32 @@ typedef struct PredicateLockData ...@@ -387,30 +379,32 @@ typedef struct PredicateLockData
(locktag).locktag_field2 = (reloid), \ (locktag).locktag_field2 = (reloid), \
(locktag).locktag_field3 = InvalidBlockNumber, \ (locktag).locktag_field3 = InvalidBlockNumber, \
(locktag).locktag_field4 = InvalidOffsetNumber, \ (locktag).locktag_field4 = InvalidOffsetNumber, \
(locktag).locktag_field5 = 0) (locktag).locktag_field5 = InvalidTransactionId)
#define SET_PREDICATELOCKTARGETTAG_PAGE(locktag,dboid,reloid,blocknum) \ #define SET_PREDICATELOCKTARGETTAG_PAGE(locktag,dboid,reloid,blocknum) \
((locktag).locktag_field1 = (dboid), \ ((locktag).locktag_field1 = (dboid), \
(locktag).locktag_field2 = (reloid), \ (locktag).locktag_field2 = (reloid), \
(locktag).locktag_field3 = (blocknum), \ (locktag).locktag_field3 = (blocknum), \
(locktag).locktag_field4 = InvalidOffsetNumber, \ (locktag).locktag_field4 = InvalidOffsetNumber, \
(locktag).locktag_field5 = 0) (locktag).locktag_field5 = InvalidTransactionId)
#define SET_PREDICATELOCKTARGETTAG_TUPLE(locktag,dboid,reloid,blocknum,offnum) \ #define SET_PREDICATELOCKTARGETTAG_TUPLE(locktag,dboid,reloid,blocknum,offnum,xmin) \
((locktag).locktag_field1 = (dboid), \ ((locktag).locktag_field1 = (dboid), \
(locktag).locktag_field2 = (reloid), \ (locktag).locktag_field2 = (reloid), \
(locktag).locktag_field3 = (blocknum), \ (locktag).locktag_field3 = (blocknum), \
(locktag).locktag_field4 = (offnum), \ (locktag).locktag_field4 = (offnum), \
(locktag).locktag_field5 = 0) (locktag).locktag_field5 = (xmin))
#define GET_PREDICATELOCKTARGETTAG_DB(locktag) \ #define GET_PREDICATELOCKTARGETTAG_DB(locktag) \
((locktag).locktag_field1) ((Oid) (locktag).locktag_field1)
#define GET_PREDICATELOCKTARGETTAG_RELATION(locktag) \ #define GET_PREDICATELOCKTARGETTAG_RELATION(locktag) \
((locktag).locktag_field2) ((Oid) (locktag).locktag_field2)
#define GET_PREDICATELOCKTARGETTAG_PAGE(locktag) \ #define GET_PREDICATELOCKTARGETTAG_PAGE(locktag) \
((locktag).locktag_field3) ((BlockNumber) (locktag).locktag_field3)
#define GET_PREDICATELOCKTARGETTAG_OFFSET(locktag) \ #define GET_PREDICATELOCKTARGETTAG_OFFSET(locktag) \
((locktag).locktag_field4) ((OffsetNumber) (locktag).locktag_field4)
#define GET_PREDICATELOCKTARGETTAG_XMIN(locktag) \
((TransactionId) (locktag).locktag_field5)
#define GET_PREDICATELOCKTARGETTAG_TYPE(locktag) \ #define GET_PREDICATELOCKTARGETTAG_TYPE(locktag) \
(((locktag).locktag_field4 != InvalidOffsetNumber) ? PREDLOCKTAG_TUPLE : \ (((locktag).locktag_field4 != InvalidOffsetNumber) ? PREDLOCKTAG_TUPLE : \
(((locktag).locktag_field3 != InvalidBlockNumber) ? PREDLOCKTAG_PAGE : \ (((locktag).locktag_field3 != InvalidBlockNumber) ? PREDLOCKTAG_PAGE : \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment