Commit cb2d158c authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix locking while setting flags in MySerializableXact.

Even if a flag is modified only by the backend owning the transaction, it's
not safe to modify it without a lock. Another backend might be setting or
clearing a different flag in the flags field concurrently, and that
operation might be lost because setting or clearing a bit in a word is not
atomic.

Make did-write flag a simple backend-private boolean variable, because it
was only set or tested in the owning backend (except when committing a
prepared transaction, but it's not worthwhile to optimize for the case of a
read-only prepared transaction). This also eliminates the need to add
locking where that flag is set.

Also, set the did-write flag when doing DDL operations like DROP TABLE or
TRUNCATE -- that was missed earlier.
parent d69149ed
...@@ -392,10 +392,11 @@ static HTAB *LocalPredicateLockHash = NULL; ...@@ -392,10 +392,11 @@ static HTAB *LocalPredicateLockHash = NULL;
/* /*
* Keep a pointer to the currently-running serializable transaction (if any) * Keep a pointer to the currently-running serializable transaction (if any)
* for quick reference. * for quick reference. Also, remember if we have written anything that could
* TODO SSI: Remove volatile qualifier and the then-unnecessary casts? * cause a rw-conflict.
*/ */
static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact; static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
static bool MyXactDidWrite = false;
/* local functions */ /* local functions */
...@@ -1424,20 +1425,30 @@ GetSafeSnapshot(Snapshot origSnapshot) ...@@ -1424,20 +1425,30 @@ GetSafeSnapshot(Snapshot origSnapshot)
if (MySerializableXact == InvalidSerializableXact) if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */ return snapshot; /* no concurrent r/w xacts; it's safe */
MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
/* /*
* Wait for concurrent transactions to finish. Stop early if one of * Wait for concurrent transactions to finish. Stop early if one of
* them marked us as conflicted. * them marked us as conflicted.
*/ */
MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
while (!(SHMQueueEmpty((SHM_QUEUE *) while (!(SHMQueueEmpty((SHM_QUEUE *)
&MySerializableXact->possibleUnsafeConflicts) || &MySerializableXact->possibleUnsafeConflicts) ||
SxactIsROUnsafe(MySerializableXact))) SxactIsROUnsafe(MySerializableXact)))
{
LWLockRelease(SerializableXactHashLock);
ProcWaitForSignal(); ProcWaitForSignal();
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
}
MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
if (!SxactIsROUnsafe(MySerializableXact)) if (!SxactIsROUnsafe(MySerializableXact))
{
LWLockRelease(SerializableXactHashLock);
break; /* success */ break; /* success */
}
LWLockRelease(SerializableXactHashLock);
/* else, need to retry... */ /* else, need to retry... */
ereport(DEBUG2, ereport(DEBUG2,
...@@ -1600,6 +1611,7 @@ RegisterSerializableTransactionInt(Snapshot snapshot) ...@@ -1600,6 +1611,7 @@ RegisterSerializableTransactionInt(Snapshot snapshot)
} }
MySerializableXact = sxact; MySerializableXact = sxact;
MyXactDidWrite = false; /* haven't written anything yet */
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
...@@ -1635,24 +1647,24 @@ RegisterPredicateLockingXid(const TransactionId xid) ...@@ -1635,24 +1647,24 @@ RegisterPredicateLockingXid(const TransactionId xid)
if (MySerializableXact == InvalidSerializableXact) if (MySerializableXact == InvalidSerializableXact)
return; return;
/* This should only be done once per transaction. */
Assert(MySerializableXact->topXid == InvalidTransactionId);
/* We should have a valid XID and be at the top level. */ /* We should have a valid XID and be at the top level. */
Assert(TransactionIdIsValid(xid)); Assert(TransactionIdIsValid(xid));
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
/* This should only be done once per transaction. */
Assert(MySerializableXact->topXid == InvalidTransactionId);
MySerializableXact->topXid = xid; MySerializableXact->topXid = xid;
sxidtag.xid = xid; sxidtag.xid = xid;
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash, sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
&sxidtag, &sxidtag,
HASH_ENTER, &found); HASH_ENTER, &found);
Assert(sxid != NULL);
Assert(!found); Assert(!found);
/* Initialize the structure. */ /* Initialize the structure. */
sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact; sxid->myXact = MySerializableXact;
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
} }
...@@ -1881,7 +1893,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) ...@@ -1881,7 +1893,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
PREDICATELOCK *predlock; PREDICATELOCK *predlock;
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
sxact = (SERIALIZABLEXACT *) MySerializableXact; sxact = MySerializableXact;
predlock = (PREDICATELOCK *) predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks), SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks), &(sxact->predicateLocks),
...@@ -2200,8 +2212,7 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag) ...@@ -2200,8 +2212,7 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
locallock->childLocks = 0; locallock->childLocks = 0;
/* Actually create the lock */ /* Actually create the lock */
CreatePredicateLock(targettag, targettaghash, CreatePredicateLock(targettag, targettaghash, MySerializableXact);
(SERIALIZABLEXACT *) MySerializableXact);
/* /*
* Lock has been acquired. Check whether it should be promoted to a * Lock has been acquired. Check whether it should be promoted to a
...@@ -3042,7 +3053,7 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3042,7 +3053,7 @@ ReleasePredicateLocks(const bool isCommit)
Assert(IsolationIsSerializable()); Assert(IsolationIsSerializable());
/* We'd better not already be on the cleanup list. */ /* We'd better not already be on the cleanup list. */
Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact)); Assert(!SxactIsOnFinishedList(MySerializableXact));
topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact); topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
...@@ -3070,7 +3081,7 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3070,7 +3081,7 @@ ReleasePredicateLocks(const bool isCommit)
MySerializableXact->flags |= SXACT_FLAG_COMMITTED; MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo); MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
/* Recognize implicit read-only transaction (commit without write). */ /* Recognize implicit read-only transaction (commit without write). */
if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE)) if (!MyXactDidWrite)
MySerializableXact->flags |= SXACT_FLAG_READ_ONLY; MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
} }
else else
...@@ -3218,7 +3229,7 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3218,7 +3229,7 @@ ReleasePredicateLocks(const bool isCommit)
/* Mark conflicted if necessary. */ /* Mark conflicted if necessary. */
if (isCommit if (isCommit
&& (MySerializableXact->flags & SXACT_FLAG_DID_WRITE) && MyXactDidWrite
&& SxactHasConflictOut(MySerializableXact) && SxactHasConflictOut(MySerializableXact)
&& (MySerializableXact->SeqNo.earliestOutConflictCommit && (MySerializableXact->SeqNo.earliestOutConflictCommit
<= roXact->SeqNo.lastCommitBeforeSnapshot)) <= roXact->SeqNo.lastCommitBeforeSnapshot))
...@@ -3282,8 +3293,7 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3282,8 +3293,7 @@ ReleasePredicateLocks(const bool isCommit)
(SHM_QUEUE *) &(MySerializableXact->finishedLink)); (SHM_QUEUE *) &(MySerializableXact->finishedLink));
if (!isCommit) if (!isCommit)
ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact, ReleaseOneSerializableXact(MySerializableXact, false, false);
false, false);
LWLockRelease(SerializableFinishedListLock); LWLockRelease(SerializableFinishedListLock);
...@@ -3291,6 +3301,7 @@ ReleasePredicateLocks(const bool isCommit) ...@@ -3291,6 +3301,7 @@ ReleasePredicateLocks(const bool isCommit)
ClearOldPredicateLocks(); ClearOldPredicateLocks();
MySerializableXact = InvalidSerializableXact; MySerializableXact = InvalidSerializableXact;
MyXactDidWrite = false;
/* Delete per-transaction lock table */ /* Delete per-transaction lock table */
if (LocalPredicateLockHash != NULL) if (LocalPredicateLockHash != NULL)
...@@ -3851,7 +3862,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation, ...@@ -3851,7 +3862,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
return; return;
} }
if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact)) if (RWConflictExists(MySerializableXact, sxact))
{ {
/* We don't want duplicate conflict records in the list. */ /* We don't want duplicate conflict records in the list. */
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
...@@ -3862,7 +3873,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation, ...@@ -3862,7 +3873,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
* Flag the conflict. But first, if this conflict creates a dangerous * Flag the conflict. But first, if this conflict creates a dangerous
* structure, ereport an error. * structure, ereport an error.
*/ */
FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact); FlagRWConflict(MySerializableXact, sxact);
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
} }
...@@ -3944,7 +3955,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) ...@@ -3944,7 +3955,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
&& (!SxactIsCommitted(sxact) && (!SxactIsCommitted(sxact)
|| TransactionIdPrecedes(GetTransactionSnapshot()->xmin, || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
sxact->finishedBefore)) sxact->finishedBefore))
&& !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact)) && !RWConflictExists(sxact, MySerializableXact))
{ {
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
...@@ -3957,10 +3968,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) ...@@ -3957,10 +3968,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
&& (!SxactIsCommitted(sxact) && (!SxactIsCommitted(sxact)
|| TransactionIdPrecedes(GetTransactionSnapshot()->xmin, || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
sxact->finishedBefore)) sxact->finishedBefore))
&& !RWConflictExists(sxact, && !RWConflictExists(sxact, MySerializableXact))
(SERIALIZABLEXACT *) MySerializableXact))
{ {
FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact); FlagRWConflict(sxact, MySerializableXact);
} }
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
...@@ -4065,7 +4075,11 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, ...@@ -4065,7 +4075,11 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
errdetail("Cancelled on identification as a pivot, during conflict in checking."), errdetail("Cancelled on identification as a pivot, during conflict in checking."),
errhint("The transaction might succeed if retried."))); errhint("The transaction might succeed if retried.")));
MySerializableXact->flags |= SXACT_FLAG_DID_WRITE; /*
* We're doing a write which might cause rw-conflicts now or later.
* Memorize that fact.
*/
MyXactDidWrite = true;
/* /*
* It is important that we check for locks from the finest granularity to * It is important that we check for locks from the finest granularity to
...@@ -4150,6 +4164,12 @@ CheckTableForSerializableConflictIn(const Relation relation) ...@@ -4150,6 +4164,12 @@ CheckTableForSerializableConflictIn(const Relation relation)
if (SkipSerialization(relation)) if (SkipSerialization(relation))
return; return;
/*
* We're doing a write which might cause rw-conflicts now or later.
* Memorize that fact.
*/
MyXactDidWrite = true;
Assert(relation->rd_index == NULL); /* not an index relation */ Assert(relation->rd_index == NULL); /* not an index relation */
dbId = relation->rd_node.dbNode; dbId = relation->rd_node.dbNode;
...@@ -4192,10 +4212,10 @@ CheckTableForSerializableConflictIn(const Relation relation) ...@@ -4192,10 +4212,10 @@ CheckTableForSerializableConflictIn(const Relation relation)
offsetof(PREDICATELOCK, targetLink)); offsetof(PREDICATELOCK, targetLink));
if (predlock->tag.myXact != MySerializableXact if (predlock->tag.myXact != MySerializableXact
&& !RWConflictExists(predlock->tag.myXact, && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
(SERIALIZABLEXACT *) MySerializableXact)) {
FlagRWConflict(predlock->tag.myXact, FlagRWConflict(predlock->tag.myXact, MySerializableXact);
(SERIALIZABLEXACT *) MySerializableXact); }
predlock = nextpredlock; predlock = nextpredlock;
} }
...@@ -4506,7 +4526,7 @@ AtPrepare_PredicateLocks(void) ...@@ -4506,7 +4526,7 @@ AtPrepare_PredicateLocks(void)
TwoPhasePredicateXactRecord *xactRecord; TwoPhasePredicateXactRecord *xactRecord;
TwoPhasePredicateLockRecord *lockRecord; TwoPhasePredicateLockRecord *lockRecord;
sxact = (SERIALIZABLEXACT *) MySerializableXact; sxact = MySerializableXact;
xactRecord = &(record.data.xactRecord); xactRecord = &(record.data.xactRecord);
lockRecord = &(record.data.lockRecord); lockRecord = &(record.data.lockRecord);
...@@ -4583,6 +4603,7 @@ PostPrepare_PredicateLocks(TransactionId xid) ...@@ -4583,6 +4603,7 @@ PostPrepare_PredicateLocks(TransactionId xid)
LocalPredicateLockHash = NULL; LocalPredicateLockHash = NULL;
MySerializableXact = InvalidSerializableXact; MySerializableXact = InvalidSerializableXact;
MyXactDidWrite = false;
} }
/* /*
...@@ -4609,6 +4630,8 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) ...@@ -4609,6 +4630,8 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
/* Release its locks */ /* Release its locks */
MySerializableXact = sxid->myXact; MySerializableXact = sxid->myXact;
MyXactDidWrite = true; /* conservatively assume that we wrote
* something */
ReleasePredicateLocks(isCommit); ReleasePredicateLocks(isCommit);
} }
......
...@@ -99,14 +99,13 @@ typedef struct SERIALIZABLEXACT ...@@ -99,14 +99,13 @@ typedef struct SERIALIZABLEXACT
*/ */
#define SXACT_FLAG_CONFLICT_OUT 0x00000004 #define SXACT_FLAG_CONFLICT_OUT 0x00000004
#define SXACT_FLAG_READ_ONLY 0x00000008 #define SXACT_FLAG_READ_ONLY 0x00000008
#define SXACT_FLAG_DID_WRITE 0x00000010 #define SXACT_FLAG_MARKED_FOR_DEATH 0x00000010
#define SXACT_FLAG_MARKED_FOR_DEATH 0x00000020 #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000020
#define SXACT_FLAG_DEFERRABLE_WAITING 0x00000040 #define SXACT_FLAG_RO_SAFE 0x00000040
#define SXACT_FLAG_RO_SAFE 0x00000080 #define SXACT_FLAG_RO_UNSAFE 0x00000080
#define SXACT_FLAG_RO_UNSAFE 0x00000100 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000100
#define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000200
#define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 #define SXACT_FLAG_PREPARED 0x00000400
#define SXACT_FLAG_PREPARED 0x00000800
/* /*
* The following types are used to provide an ad hoc list for holding * The following types are used to provide an ad hoc list for holding
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment