Commit b04aeb0a authored by Tom Lane's avatar Tom Lane

Add assertions that we hold some relevant lock during relation open.

Opening a relation with no lock at all is unsafe; there's no guarantee
that we'll see a consistent state of the relevant catalog entries.
While use of MVCC scans to read the catalogs partially addresses that
complaint, it's still possible to switch to a new catalog snapshot
partway through loading the relcache entry.  Moreover, whether or not
you trust the reasoning behind sometimes using less than
AccessExclusiveLock for ALTER TABLE, that reasoning is certainly not
valid if concurrent users of the table don't hold a lock corresponding
to the operation they want to perform.

Hence, add some assertion-build-only checks that require any caller
of relation_open(x, NoLock) to hold at least AccessShareLock.  This
isn't a full solution, since we can't verify that the lock level is
semantically appropriate for the action --- but it's definitely of
some use, because it's already caught two bugs.

We can also assert that callers of addRangeTableEntryForRelation()
hold at least the lock level specified for the new RTE.

Amit Langote and Tom Lane

Discussion: https://postgr.es/m/16565.1538327894@sss.pgh.pa.us
parent b66827ca
...@@ -1137,6 +1137,14 @@ relation_open(Oid relationId, LOCKMODE lockmode) ...@@ -1137,6 +1137,14 @@ relation_open(Oid relationId, LOCKMODE lockmode)
if (!RelationIsValid(r)) if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId); elog(ERROR, "could not open relation with OID %u", relationId);
/*
* If we didn't get the lock ourselves, assert that caller holds one,
* except in bootstrap mode where no locks are used.
*/
Assert(lockmode != NoLock ||
IsBootstrapProcessingMode() ||
CheckRelationLockedByMe(r, AccessShareLock, true));
/* Make note that we've accessed a temporary relation */ /* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r)) if (RelationUsesLocalBuffers(r))
MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL; MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
...@@ -1183,6 +1191,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode) ...@@ -1183,6 +1191,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
if (!RelationIsValid(r)) if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId); elog(ERROR, "could not open relation with OID %u", relationId);
/* If we didn't get the lock ourselves, assert that caller holds one */
Assert(lockmode != NoLock ||
CheckRelationLockedByMe(r, AccessShareLock, true));
/* Make note that we've accessed a temporary relation */ /* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r)) if (RelationUsesLocalBuffers(r))
MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL; MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
...@@ -8084,6 +8096,8 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * ...@@ -8084,6 +8096,8 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
idx_rel = RelationIdGetRelation(replidindex); idx_rel = RelationIdGetRelation(replidindex);
Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
/* deform tuple, so we have fast access to columns */ /* deform tuple, so we have fast access to columns */
heap_deform_tuple(tp, desc, values, nulls); heap_deform_tuple(tp, desc, values, nulls);
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "parser/parse_enr.h" #include "parser/parse_enr.h"
#include "parser/parse_relation.h" #include "parser/parse_relation.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
...@@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate, ...@@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate,
* *
* lockmode is the lock type required for query execution; it must be one * lockmode is the lock type required for query execution; it must be one
* of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the * of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the
* RTE's role within the query. The caller should always hold that lock mode * RTE's role within the query. The caller must hold that lock mode
* or a stronger one. * or a stronger one.
* *
* Note: properly, lockmode should be declared LOCKMODE not int, but that * Note: properly, lockmode should be declared LOCKMODE not int, but that
...@@ -1295,6 +1296,7 @@ addRangeTableEntryForRelation(ParseState *pstate, ...@@ -1295,6 +1296,7 @@ addRangeTableEntryForRelation(ParseState *pstate,
Assert(lockmode == AccessShareLock || Assert(lockmode == AccessShareLock ||
lockmode == RowShareLock || lockmode == RowShareLock ||
lockmode == RowExclusiveLock); lockmode == RowExclusiveLock);
Assert(CheckRelationLockedByMe(rel, lockmode, true));
rte->rtekind = RTE_RELATION; rte->rtekind = RTE_RELATION;
rte->alias = alias; rte->alias = alias;
......
...@@ -287,6 +287,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode) ...@@ -287,6 +287,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
LockRelease(&tag, lockmode, false); LockRelease(&tag, lockmode, false);
} }
/*
* CheckRelationLockedByMe
*
* Returns true if current transaction holds a lock on 'relation' of mode
* 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
* ("Stronger" is defined as "numerically higher", which is a bit
* semantically dubious but is OK for the purposes we use this for.)
*/
bool
CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
if (LockHeldByMe(&tag, lockmode))
return true;
if (orstronger)
{
LOCKMODE slockmode;
for (slockmode = lockmode + 1;
slockmode <= MaxLockMode;
slockmode++)
{
if (LockHeldByMe(&tag, slockmode))
{
#ifdef NOT_USED
/* Sometimes this might be useful for debugging purposes */
elog(WARNING, "lock mode %s substituted for %s on relation %s",
GetLockmodeName(tag.locktag_lockmethodid, slockmode),
GetLockmodeName(tag.locktag_lockmethodid, lockmode),
RelationGetRelationName(relation));
#endif
return true;
}
}
}
return false;
}
/* /*
* LockHasWaitersRelation * LockHasWaitersRelation
* *
......
...@@ -563,6 +563,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) ...@@ -563,6 +563,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
return false; return false;
} }
/*
* LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
* by the current transaction
*/
bool
LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
/*
* See if there is a LOCALLOCK entry for this lock and lockmode
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
(void *) &localtag,
HASH_FIND, NULL);
return (locallock && locallock->nLocks > 0);
}
/* /*
* LockHasWaiters -- look up 'locktag' and check if releasing this * LockHasWaiters -- look up 'locktag' and check if releasing this
* lock would wake up other processes waiting for it. * lock would wake up other processes waiting for it.
......
...@@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode); ...@@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
extern void LockRelation(Relation relation, LOCKMODE lockmode); extern void LockRelation(Relation relation, LOCKMODE lockmode);
extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode); extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode,
bool orstronger);
extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode); extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
......
...@@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); ...@@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseSession(LOCKMETHODID lockmethodid); extern void LockReleaseSession(LOCKMETHODID lockmethodid);
extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
extern bool LockHasWaiters(const LOCKTAG *locktag, extern bool LockHasWaiters(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock); LOCKMODE lockmode, bool sessionLock);
extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag, extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
......
...@@ -45,11 +45,15 @@ typedef int LOCKMODE; ...@@ -45,11 +45,15 @@ typedef int LOCKMODE;
#define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM FULL, #define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM FULL,
* and unqualified LOCK TABLE */ * and unqualified LOCK TABLE */
#define MaxLockMode 8
/* WAL representation of an AccessExclusiveLock on a table */
typedef struct xl_standby_lock typedef struct xl_standby_lock
{ {
TransactionId xid; /* xid of holder of AccessExclusiveLock */ TransactionId xid; /* xid of holder of AccessExclusiveLock */
Oid dbOid; Oid dbOid; /* DB containing table */
Oid relOid; Oid relOid; /* OID of table */
} xl_standby_lock; } xl_standby_lock;
#endif /* LOCKDEF_H_ */ #endif /* LOCKDEF_H_ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment