Commit f868a814 authored by Tom Lane's avatar Tom Lane

Fix longstanding recursion hazard in sinval message processing.

LockRelationOid and sibling routines supposed that, if our session already
holds the lock they were asked to acquire, they could skip calling
AcceptInvalidationMessages on the grounds that we must have already read
any remote sinval messages issued against the relation being locked.
This is normally true, but there's a critical special case where it's not:
processing inside AcceptInvalidationMessages might attempt to access system
relations, resulting in a recursive call to acquire a relation lock.

Hence, if the outer call had acquired that same system catalog lock, we'd
fall through, despite the possibility that there's an as-yet-unread sinval
message for that system catalog.  This could, for example, result in
failure to access a system catalog or index that had just been processed
by VACUUM FULL.  This is the explanation for buildfarm failures we've been
seeing intermittently for the past three months.  The bug is far older
than that, but commits a54e1f15 et al added a new recursion case within
AcceptInvalidationMessages that is apparently easier to hit than any
previous case.

To fix this, we must not skip calling AcceptInvalidationMessages until
we have *finished* a call to it since acquiring a relation lock, not
merely acquired the lock.  (There's already adequate logic inside
AcceptInvalidationMessages to deal with being called recursively.)
Fortunately, we can implement that at trivial cost, by adding a flag
to LOCALLOCK hashtable entries that tracks whether we know we have
completed such a call.

There is an API hazard added by this patch for external callers of
LockAcquire: if anything is testing for LOCKACQUIRE_ALREADY_HELD,
it might be fooled by the new return code LOCKACQUIRE_ALREADY_CLEAR
into thinking the lock wasn't already held.  This should be a fail-soft
condition, though, unless something very bizarre is being done in
response to the test.

Also, I added an additional output argument to LockAcquireExtended,
assuming that that probably isn't called by any outside code given
the very limited usefulness of its additional functionality.

Back-patch to all supported branches.

Discussion: https://postgr.es/m/12259.1532117714@sss.pgh.pa.us
parent 8582b4d0
......@@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false);
(void) LockAcquireExtended(&locktag, AccessExclusiveLock, true, false,
false, NULL);
}
static void
......
......@@ -105,11 +105,12 @@ void
LockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
res = LockAcquire(&tag, lockmode, false, false);
res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages, so that we
......@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
* relcache entry in an undesirable way. (In the case where our own xact
* modifies the rel, the relcache update happens via
* CommandCounterIncrement, not here.)
*
* However, in corner cases where code acts on tables (usually catalogs)
* recursively, we might get here while still processing invalidation
* messages in some outer execution of this function or a sibling. The
* "cleared" status of the lock tells us whether we really are done
* absorbing relevant inval messages.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
if (res != LOCKACQUIRE_ALREADY_CLEAR)
{
AcceptInvalidationMessages();
MarkLockClear(locallock);
}
}
/*
......@@ -138,11 +148,12 @@ bool
ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
res = LockAcquire(&tag, lockmode, false, true);
res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
......@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
if (res != LOCKACQUIRE_ALREADY_CLEAR)
{
AcceptInvalidationMessages();
MarkLockClear(locallock);
}
return true;
}
......@@ -199,20 +213,24 @@ void
LockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
res = LockAcquire(&tag, lockmode, false, false);
res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
if (res != LOCKACQUIRE_ALREADY_CLEAR)
{
AcceptInvalidationMessages();
MarkLockClear(locallock);
}
}
/*
......@@ -226,13 +244,14 @@ bool
ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
res = LockAcquire(&tag, lockmode, false, true);
res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
......@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
if (res != LOCKACQUIRE_ALREADY_CLEAR)
{
AcceptInvalidationMessages();
MarkLockClear(locallock);
}
return true;
}
......
......@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
* LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
*
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
......@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait)
{
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
true, NULL);
}
/*
......@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
*
* If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
* table entry if a lock is successfully acquired, or NULL if not.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
bool reportMemoryError)
bool reportMemoryError,
LOCALLOCK **locallockp)
{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
......@@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->proclock = NULL;
locallock->hashcode = LockTagHashCode(&(localtag.lock));
locallock->nLocks = 0;
locallock->holdsStrongLockCount = false;
locallock->lockCleared = false;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
locallock->holdsStrongLockCount = false;
locallock->lockOwners = NULL; /* in case next line fails */
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext,
......@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
}
hashcode = locallock->hashcode;
if (locallockp)
*locallockp = locallock;
/*
* If we already hold the lock, we can just increase the count locally.
*
* If lockCleared is already set, caller need not worry about absorbing
* sinval messages related to the lock's object.
*/
if (locallock->nLocks > 0)
{
GrantLockLocal(locallock, owner);
return LOCKACQUIRE_ALREADY_HELD;
if (locallock->lockCleared)
return LOCKACQUIRE_ALREADY_CLEAR;
else
return LOCKACQUIRE_ALREADY_HELD;
}
/*
......@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode))
{
AbortStrongLockAcquire();
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
if (locallockp)
*locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
......@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
{
AbortStrongLockAcquire();
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
if (locallockp)
*locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
......@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
if (locallockp)
*locallockp = NULL;
return LOCKACQUIRE_NOT_AVAIL;
}
......@@ -1645,6 +1671,20 @@ GrantAwaitedLock(void)
GrantLockLocal(awaitedLock, awaitedOwner);
}
/*
* MarkLockClear -- mark an acquired lock as "clear"
*
* This means that we know we have absorbed all sinval messages that other
* sessions generated before we acquired this lock, and so we can confidently
* assume we know about any catalog changes protected by this lock.
*/
void
MarkLockClear(LOCALLOCK *locallock)
{
Assert(locallock->nLocks > 0);
locallock->lockCleared = true;
}
/*
* WaitOnLock -- wait to acquire a lock
*
......@@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return true;
/*
* At this point we can no longer suppose we are clear of invalidation
* messages related to this lock. Although we'll delete the LOCALLOCK
* object before any intentional return from this routine, it seems worth
* the trouble to explicitly reset lockCleared right now, just in case
* some error prevents us from deleting the LOCALLOCK.
*/
locallock->lockCleared = false;
/* Attempt fast release of any lock eligible for the fast path. */
if (EligibleForRelationFastPath(locktag, lockmode) &&
FastPathLocalUseCount > 0)
......
......@@ -408,9 +408,10 @@ typedef struct LOCALLOCK
PROCLOCK *proclock; /* associated PROCLOCK object, if any */
uint32 hashcode; /* copy of LOCKTAG's hash value */
int64 nLocks; /* total number of times lock is held */
bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
bool lockCleared; /* we read all sinval msgs for lock */
int numLockOwners; /* # of relevant ResourceOwners */
int maxLockOwners; /* allocated size of array */
bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
} LOCALLOCK;
......@@ -472,7 +473,8 @@ typedef enum
{
LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */
LOCKACQUIRE_OK, /* lock successfully acquired */
LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */
LOCKACQUIRE_ALREADY_HELD, /* incremented count for lock already held */
LOCKACQUIRE_ALREADY_CLEAR /* incremented count for lock already clear */
} LockAcquireResult;
/* Deadlock states identified by DeadLockCheck() */
......@@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
bool report_memory_error);
bool reportMemoryError,
LOCALLOCK **locallockp);
extern void AbortStrongLockAcquire(void);
extern void MarkLockClear(LOCALLOCK *locallock);
extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment