Commit 4240e429 authored by Robert Haas's avatar Robert Haas

Try to acquire relation locks in RangeVarGetRelid.

In the previous coding, we would look up a relation in RangeVarGetRelid,
lock the resulting OID, and then AcceptInvalidationMessages().  While
this was sufficient to ensure that we noticed any changes to the
relation definition before building the relcache entry, it didn't
handle the possibility that the name we looked up no longer referenced
the same OID.  This was particularly problematic in the case where a
table had been dropped and recreated: we'd latch on to the entry for
the old relation and fail later on.  Now, we acquire the relation lock
inside RangeVarGetRelid, and retry the name lookup if we notice that
invalidation messages have been processed meanwhile.  Many operations
that would previously have failed with an error in the presence of
concurrent DDL will now succeed.

There is a good deal of work remaining to be done here: many callers
of RangeVarGetRelid still pass NoLock for one reason or another.  In
addition, nothing in this patch guards against the possibility that
the meaning of an unqualified name might change due to the creation
of a relation in a schema earlier in the user's search path than the
one where it was previously found.  Furthermore, there's nothing at
all here to guard against similar race conditions for non-relations.
For all that, it's a start.

Noah Misch and Robert Haas
parent 9d522cb3
......@@ -975,26 +975,11 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
{
Oid relOid;
/*
* Check for shared-cache-inval messages before trying to open the
* relation. This is needed to cover the case where the name identifies a
* rel that has been dropped and recreated since the start of our
* transaction: if we don't flush the old syscache entry then we'll latch
* onto that entry and suffer an error when we do RelationIdGetRelation.
* Note that relation_open does not need to do this, since a relation's
* OID never changes.
*
* We skip this if asked for NoLock, on the assumption that the caller has
* already ensured some appropriate lock is held.
*/
if (lockmode != NoLock)
AcceptInvalidationMessages();
/* Look up the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, false);
/* Look up and lock the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, lockmode, false, false);
/* Let relation_open do the rest */
return relation_open(relOid, lockmode);
return relation_open(relOid, NoLock);
}
/* ----------------
......@@ -1012,30 +997,15 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
{
Oid relOid;
/*
* Check for shared-cache-inval messages before trying to open the
* relation. This is needed to cover the case where the name identifies a
* rel that has been dropped and recreated since the start of our
* transaction: if we don't flush the old syscache entry then we'll latch
* onto that entry and suffer an error when we do RelationIdGetRelation.
* Note that relation_open does not need to do this, since a relation's
* OID never changes.
*
* We skip this if asked for NoLock, on the assumption that the caller has
* already ensured some appropriate lock is held.
*/
if (lockmode != NoLock)
AcceptInvalidationMessages();
/* Look up the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, missing_ok);
/* Look up and lock the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false);
/* Return NULL on not-found */
if (!OidIsValid(relOid))
return NULL;
/* Let relation_open do the rest */
return relation_open(relOid, lockmode);
return relation_open(relOid, NoLock);
}
/* ----------------
......
......@@ -583,6 +583,11 @@ ExecGrantStmt_oids(InternalGrant *istmt)
* objectNamesToOids
*
* Turn a list of object names of a given type into an Oid list.
*
* XXX: This function doesn't take any sort of locks on the objects whose
* names it looks up. In the face of concurrent DDL, we might easily latch
* onto an old version of an object, causing the GRANT or REVOKE statement
* to fail.
*/
static List *
objectNamesToOids(GrantObjectType objtype, List *objnames)
......@@ -601,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relOid;
relOid = RangeVarGetRelid(relvar, false);
relOid = RangeVarGetRelid(relvar, NoLock, false, false);
objects = lappend_oid(objects, relOid);
}
break;
......
......@@ -2942,7 +2942,8 @@ reindex_relation(Oid relid, int flags)
/*
* Open and lock the relation. ShareLock is sufficient since we only need
* to prevent schema and data changes in it.
* to prevent schema and data changes in it. The lock level used here
* should match ReindexTable().
*/
rel = heap_open(relid, ShareLock);
......
......@@ -44,6 +44,8 @@
#include "parser/parse_func.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/sinval.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
......@@ -215,14 +217,20 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS);
* Given a RangeVar describing an existing relation,
* select the proper namespace and look up the relation OID.
*
* If the relation is not found, return InvalidOid if failOK = true,
* If the relation is not found, return InvalidOid if missing_ok = true,
* otherwise raise an error.
*
* If nowait = true, throw an error if we'd have to wait for a lock.
*/
Oid
RangeVarGetRelid(const RangeVar *relation, bool failOK)
RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
bool nowait)
{
uint64 inval_count;
Oid namespaceId;
Oid relId;
Oid oldRelId = InvalidOid;
bool retry = false;
/*
* We check the catalog name and then ignore it.
......@@ -237,13 +245,38 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK)
relation->relname)));
}
/*
* DDL operations can change the results of a name lookup. Since all
* such operations will generate invalidation messages, we keep track
* of whether any such messages show up while we're performing the
* operation, and retry until either (1) no more invalidation messages
* show up or (2) the answer doesn't change.
*
* But if lockmode = NoLock, then we assume that either the caller is OK
* with the answer changing under them, or that they already hold some
* appropriate lock, and therefore return the first answer we get without
* checking for invalidation messages. Also, if the requested lock is
* already held, no LockRelationOid will not AcceptInvalidationMessages,
* so we may fail to notice a change. We could protect against that case
* by calling AcceptInvalidationMessages() before beginning this loop,
* but that would add a significant amount overhead, so for now we don't.
*/
for (;;)
{
/*
* Remember this value, so that, after looking up the relation name
* and locking its OID, we can check whether any invalidation messages
* have been processed that might require a do-over.
*/
inval_count = SharedInvalidMessageCounter;
/*
* Some non-default relpersistence value may have been specified. The
* parser never generates such a RangeVar in simple DML, but it can happen
* in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY KEY)". Such
* a command will generate an added CREATE INDEX operation, which must be
* careful to find the temp table, even when pg_temp is not first in the
* search path.
* parser never generates such a RangeVar in simple DML, but it can
* happen in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY
* KEY)". Such a command will generate an added CREATE INDEX
* operation, which must be careful to find the temp table, even when
* pg_temp is not first in the search path.
*/
if (relation->relpersistence == RELPERSISTENCE_TEMP)
{
......@@ -268,7 +301,65 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK)
relId = RelnameGetRelid(relation->relname);
}
if (!OidIsValid(relId) && !failOK)
/*
* If no lock requested, we assume the caller knows what they're
* doing. They should have already acquired a heavyweight lock on
* this relation earlier in the processing of this same statement,
* so it wouldn't be appropriate to AcceptInvalidationMessages()
* here, as that might pull the rug out from under them.
*/
if (lockmode == NoLock)
break;
/*
* If, upon retry, we get back the same OID we did last time, then
* the invalidation messages we processed did not change the final
* answer. So we're done.
*/
if (retry && relId == oldRelId)
break;
/*
* Lock relation. This will also accept any pending invalidation
* messages. If we got back InvalidOid, indicating not found, then
* there's nothing to lock, but we accept invalidation messages
* anyway, to flush any negative catcache entries that may be
* lingering.
*/
if (!OidIsValid(relId))
AcceptInvalidationMessages();
else if (!nowait)
LockRelationOid(relId, lockmode);
else if (!ConditionalLockRelationOid(relId, lockmode))
{
if (relation->schemaname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s.%s\"",
relation->schemaname, relation->relname)));
else
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relation->relname)));
}
/*
* If no invalidation message were processed, we're done!
*/
if (inval_count == SharedInvalidMessageCounter)
break;
/*
* Something may have changed. Let's repeat the name lookup, to
* make sure this name still references the same relation it did
* previously.
*/
retry = true;
oldRelId = relId;
}
if (!OidIsValid(relId) && !missing_ok)
{
if (relation->schemaname)
ereport(ERROR,
......
......@@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt)
CheckRelationOwnership(stmt->relation, true);
relid = RangeVarGetRelid(stmt->relation, false);
/*
* Lock level used here should match what will be taken later,
* in RenameRelation, renameatt, or renametrig.
*/
relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
false, false);
switch (stmt->renameType)
{
......
......@@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation)
Oid indOid;
HeapTuple tuple;
indOid = RangeVarGetRelid(indexRelation, false);
/*
* XXX: This is not safe in the presence of concurrent DDL. We should
* take AccessExclusiveLock here, but that would violate the rule that
* indexes should only be locked after their parent tables. For now,
* we live with it.
*/
indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", indOid);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
......@@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation)
Oid heapOid;
HeapTuple tuple;
heapOid = RangeVarGetRelid(relation, false);
/* The lock level used here should match reindex_relation(). */
heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", heapOid);
......
......@@ -24,8 +24,8 @@
#include "utils/acl.h"
#include "utils/lsyscache.h"
static void LockTableRecurse(Oid reloid, RangeVar *rv,
LOCKMODE lockmode, bool nowait, bool recurse);
static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
bool recurse);
/*
......@@ -36,17 +36,6 @@ LockTableCommand(LockStmt *lockstmt)
{
ListCell *p;
/*
* Iterate over the list and process the named relations one at a time
*/
foreach(p, lockstmt->relations)
{
RangeVar *relation = (RangeVar *) lfirst(p);
bool recurse = interpretInhOption(relation->inhOpt);
Oid reloid;
reloid = RangeVarGetRelid(relation, false);
/*
* During recovery we only accept these variations: LOCK TABLE foo IN
* ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo
......@@ -56,80 +45,31 @@ LockTableCommand(LockStmt *lockstmt)
if (lockstmt->mode > RowExclusiveLock)
PreventCommandDuringRecovery("LOCK TABLE");
LockTableRecurse(reloid, relation,
lockstmt->mode, lockstmt->nowait, recurse);
/*
* Iterate over the list and process the named relations one at a time
*/
foreach(p, lockstmt->relations)
{
RangeVar *rv = (RangeVar *) lfirst(p);
Relation rel;
bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
rel = relation_open(reloid, NoLock);
LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
}
}
/*
* Apply LOCK TABLE recursively over an inheritance tree
*
* At top level, "rv" is the original command argument; we use it to throw
* an appropriate error message if the relation isn't there. Below top level,
* "rv" is NULL and we should just silently ignore any dropped child rel.
*/
static void
LockTableRecurse(Oid reloid, RangeVar *rv,
LOCKMODE lockmode, bool nowait, bool recurse)
LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
{
Relation rel;
AclResult aclresult;
/*
* Acquire the lock. We must do this first to protect against concurrent
* drops. Note that a lock against an already-dropped relation's OID
* won't fail.
*/
if (nowait)
{
if (!ConditionalLockRelationOid(reloid, lockmode))
{
/* try to throw error by name; relation could be deleted... */
char *relname = rv ? rv->relname : get_rel_name(reloid);
if (relname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
else
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation with OID %u",
reloid)));
}
}
else
LockRelationOid(reloid, lockmode);
/*
* Now that we have the lock, check to see if the relation really exists
* or not.
*/
rel = try_relation_open(reloid, NoLock);
if (!rel)
{
/* Release useless lock */
UnlockRelationOid(reloid, lockmode);
/* At top level, throw error; otherwise, ignore this child rel */
if (rv)
{
if (rv->schemaname)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s.%s\" does not exist",
rv->schemaname, rv->relname)));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
rv->relname)));
}
return;
}
Oid reloid = RelationGetRelid(rel);
/* Verify adequate privilege */
if (lockmode == AccessShareLock)
......@@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv,
{
List *children = find_inheritance_children(reloid, NoLock);
ListCell *lc;
Relation childrel;
foreach(lc, children)
{
Oid childreloid = lfirst_oid(lc);
LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse);
/*
* Acquire the lock, to protect against concurrent drops. Note
* that a lock against an already-dropped relation's OID won't
* fail.
*/
if (!nowait)
LockRelationOid(childreloid, lockmode);
else if (!ConditionalLockRelationOid(childreloid, lockmode))
{
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(childreloid);
if (relname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
else
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation with OID %u",
reloid)));
}
/*
* Now that we have the lock, check to see if the relation really
* exists or not.
*/
childrel = try_relation_open(childreloid, NoLock);
if (!childrel)
{
/* Release useless lock */
UnlockRelationOid(childreloid, lockmode);
}
LockTableRecurse(childrel, lockmode, nowait, recurse);
}
}
......
......@@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt)
FormData_pg_sequence new;
List *owned_by;
/* open and AccessShareLock sequence */
relid = RangeVarGetRelid(stmt->sequence, false);
/* Open and lock sequence. */
relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
......@@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS)
Oid relid;
sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
relid = RangeVarGetRelid(sequence, false);
/*
* XXX: This is not safe in the presence of concurrent DDL, but
* acquiring a lock here is more expensive than letting nextval_internal
* do it, since the latter maintains a cache that keeps us from hitting
* the lock manager more than once per transaction. It's not clear
* whether the performance penalty is material in practice, but for now,
* we do it this way.
*/
relid = RangeVarGetRelid(sequence, NoLock, false, false);
PG_RETURN_INT64(nextval_internal(relid));
}
......
......@@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
/* Look up the appropriate relation using namespace search */
relOid = RangeVarGetRelid(rel, true);
/*
* Look up the appropriate relation using namespace search.
*
* XXX: Doing this without a lock is unsafe in the presence of
* concurrent DDL, but acquiring a lock here might violate the rule
* that a table must be locked before its corresponding index.
* So, for now, we ignore the hazard.
*/
relOid = RangeVarGetRelid(rel, NoLock, true, false);
/* Not there? */
if (!OidIsValid(relOid))
......@@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
* Lock level used here should match ExecRenameStmt
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
......
......@@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
RelationGetRelationName(rel))));
if (stmt->isconstraint && stmt->constrrel != NULL)
constrrelid = RangeVarGetRelid(stmt->constrrel, false);
{
/*
* We must take a lock on the target relation to protect against
* concurrent drop. It's not clear that AccessShareLock is strong
* enough, but we certainly need at least that much... otherwise,
* we might end up creating a pg_constraint entry referencing a
* nonexistent table.
*/
constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
false);
}
/* permission checks */
if (!isInternal)
......@@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid)
* DropTrigger - drop an individual trigger by name
*/
void
DropTrigger(Oid relid, const char *trigname, DropBehavior behavior,
DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior,
bool missing_ok)
{
Oid relid;
ObjectAddress object;
/* lock level should match RemoveTriggerById */
relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false);
object.classId = TriggerRelationId;
object.objectId = get_trigger_oid(relid, trigname, missing_ok);
object.objectSubId = 0;
......
......@@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
/* Process a specific relation */
Oid relid;
relid = RangeVarGetRelid(vacrel, false);
/*
* Since we don't take a lock here, the relation might be gone,
* or the RangeVar might no longer refer to the OID we look up
* here. In the former case, VACUUM will do nothing; in the
* latter case, it will process the OID we looked up here, rather
* than the new one. Neither is ideal, but there's little practical
* alternative, since we're going to commit this transaction and
* begin a new one between now and then.
*/
relid = RangeVarGetRelid(vacrel, NoLock, false, false);
/* Make a relation list entry for this guy */
oldcontext = MemoryContextSwitchTo(vac_context);
......
......@@ -273,11 +273,17 @@ searchRangeTable(ParseState *pstate, RangeVar *relation)
* If it's an unqualified name, check for possible CTE matches. A CTE
* hides any real relation matches. If no CTE, look for a matching
* relation.
*
* NB: It's not critical that RangeVarGetRelid return the correct answer
* here in the face of concurrent DDL. If it doesn't, the worst case
* scenario is a less-clear error message. Also, the tables involved in
* the query are already locked, which reduces the number of cases in
* which surprising behavior can occur. So we do the name lookup unlocked.
*/
if (!relation->schemaname)
cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup);
if (!cte)
relId = RangeVarGetRelid(relation, true);
relId = RangeVarGetRelid(relation, NoLock, true, false);
/* Now look for RTEs matching either the relation/CTE or the alias */
for (levelsup = 0;
......
......@@ -108,8 +108,14 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
break;
}
/* look up the field */
relid = RangeVarGetRelid(rel, false);
/*
* Look up the field.
*
* XXX: As no lock is taken here, this might fail in the presence
* of concurrent DDL. But taking a lock would carry a performance
* penalty and would also require a permissions check.
*/
relid = RangeVarGetRelid(rel, NoLock, false, false);
attnum = get_attnum(relid, field);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
......
......@@ -196,11 +196,15 @@ DefineRule(RuleStmt *stmt, const char *queryString)
Node *whereClause;
Oid relId;
/* Parse analysis ... */
/* Parse analysis. */
transformRuleStmt(stmt, queryString, &actions, &whereClause);
/* ... find the relation ... */
relId = RangeVarGetRelid(stmt->relation, false);
/*
* Find and lock the relation. Lock level should match
* DefineQueryRewrite.
*/
relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false,
false);
/* ... and execute */
DefineQueryRewrite(stmt->rulename,
......@@ -242,6 +246,8 @@ DefineQueryRewrite(char *rulename,
* grab ShareRowExclusiveLock to lock out insert/update/delete actions and
* to ensure that we lock out current CREATE RULE statements; but because
* of race conditions in access to catalog entries, we can't do that yet.
*
* Note that this lock level should match the one used in DefineRule.
*/
event_relation = heap_open(event_relid, AccessExclusiveLock);
......
......@@ -19,6 +19,7 @@
#include "access/sysattr.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_rewrite.h"
#include "miscadmin.h"
#include "rewrite/rewriteRemove.h"
......@@ -37,13 +38,18 @@
* Delete a rule given its name.
*/
void
RemoveRewriteRule(Oid owningRel, const char *ruleName, DropBehavior behavior,
bool missing_ok)
RemoveRewriteRule(RangeVar *relation, const char *ruleName,
DropBehavior behavior, bool missing_ok)
{
HeapTuple tuple;
Oid eventRelationOid;
Oid owningRel;
ObjectAddress object;
/* should match RemoveRewriteRuleById */
owningRel = RangeVarGetRelid(relation, ShareUpdateExclusiveLock,
false, false);
/*
* Find the tuple for the target rule.
*/
......
......@@ -22,6 +22,9 @@
#include "utils/inval.h"
uint64 SharedInvalidMessageCounter;
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
......@@ -90,6 +93,7 @@ ReceiveSharedInvalidMessages(
{
SharedInvalidationMessage *msg = &messages[nextmsg++];
SharedInvalidMessageCounter++;
invalFunction(msg);
}
......@@ -106,6 +110,7 @@ ReceiveSharedInvalidMessages(
{
/* got a reset message */
elog(DEBUG4, "cache state reset");
SharedInvalidMessageCounter++;
resetFunction();
break; /* nothing more to do */
}
......@@ -118,6 +123,7 @@ ReceiveSharedInvalidMessages(
{
SharedInvalidationMessage *msg = &messages[nextmsg++];
SharedInvalidMessageCounter++;
invalFunction(msg);
}
......
......@@ -81,10 +81,11 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
/*
* Now that we have the lock, check for invalidation messages, so that we
* will update or flush any stale relcache entry before we try to use it.
* We can skip this in the not-uncommon case that we already had the same
* type of lock being requested, since then no one else could have
* modified the relcache entry in an undesirable way. (In the case where
* our own xact modifies the rel, the relcache update happens via
* RangeVarGetRelid() specifically relies on us for this. We can skip
* this in the not-uncommon case that we already had the same type of lock
* being requested, since then no one else could have modified the
* relcache entry in an undesirable way. (In the case where our own xact
* modifies the rel, the relcache update happens via
* CommandCounterIncrement, not here.)
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
......
......@@ -77,7 +77,15 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
Oid relOid;
HeapTuple tuple;
relOid = RangeVarGetRelid(rel, false);
/*
* XXX: This is unsafe in the presence of concurrent DDL, since it is
* called before acquiring any lock on the target relation. However,
* locking the target relation (especially using something like
* AccessExclusiveLock) before verifying that the user has permissions
* is not appealing either.
*/
relOid = RangeVarGetRelid(rel, NoLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple)) /* should not happen */
elog(ERROR, "cache lookup failed for relation %u", relOid);
......@@ -1111,20 +1119,17 @@ standard_ProcessUtility(Node *parsetree,
case T_DropPropertyStmt:
{
DropPropertyStmt *stmt = (DropPropertyStmt *) parsetree;
Oid relId;
relId = RangeVarGetRelid(stmt->relation, false);
switch (stmt->removeType)
{
case OBJECT_RULE:
/* RemoveRewriteRule checks permissions */
RemoveRewriteRule(relId, stmt->property,
RemoveRewriteRule(stmt->relation, stmt->property,
stmt->behavior, stmt->missing_ok);
break;
case OBJECT_TRIGGER:
/* DropTrigger checks permissions */
DropTrigger(relId, stmt->property,
DropTrigger(stmt->relation, stmt->property,
stmt->behavior, stmt->missing_ok);
break;
default:
......
......@@ -1939,7 +1939,8 @@ convert_table_name(text *tablename)
relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
return RangeVarGetRelid(relrv, false);
/* We might not even have permissions on this relation; don't lock it. */
return RangeVarGetRelid(relrv, NoLock, false, false);
}
/*
......
......@@ -823,7 +823,9 @@ regclassin(PG_FUNCTION_ARGS)
*/
names = stringToQualifiedNameList(class_name_or_oid);
result = RangeVarGetRelid(makeRangeVarFromNameList(names), false);
/* We might not even have permissions on this relation; don't lock it. */
result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false,
false);
PG_RETURN_OID(result);
}
......@@ -1294,7 +1296,9 @@ text_regclass(PG_FUNCTION_ARGS)
RangeVar *rv;
rv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
result = RangeVarGetRelid(rv, false);
/* We might not even have permissions on this relation; don't lock it. */
result = RangeVarGetRelid(rv, NoLock, false, false);
PG_RETURN_OID(result);
}
......
......@@ -385,8 +385,9 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS)
RangeVar *viewrel;
Oid viewoid;
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
viewoid = RangeVarGetRelid(viewrel, false);
viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0)));
}
......@@ -403,8 +404,10 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS)
Oid viewoid;
prettyFlags = pretty ? PRETTYFLAG_PAREN | PRETTYFLAG_INDENT : 0;
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
viewoid = RangeVarGetRelid(viewrel, false);
viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags)));
}
......@@ -1567,9 +1570,9 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS)
SysScanDesc scan;
HeapTuple tup;
/* Get the OID of the table */
/* Look up table name. Can't lock it - we might not have privileges. */
tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
tableOid = RangeVarGetRelid(tablerv, false);
tableOid = RangeVarGetRelid(tablerv, NoLock, false, false);
/* Get the number of the column */
column = text_to_cstring(columnname);
......
......@@ -15,6 +15,7 @@
#define NAMESPACE_H
#include "nodes/primnodes.h"
#include "storage/lock.h"
/*
......@@ -47,7 +48,8 @@ typedef struct OverrideSearchPath
} OverrideSearchPath;
extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK);
extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode,
bool missing_ok, bool nowait);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
......
......@@ -112,7 +112,7 @@ extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
Oid constraintOid, Oid indexOid,
bool isInternal);
extern void DropTrigger(Oid relid, const char *trigname,
extern void DropTrigger(RangeVar *relation, const char *trigname,
DropBehavior behavior, bool missing_ok);
extern void RemoveTriggerById(Oid trigOid);
extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
......
......@@ -17,7 +17,7 @@
#include "nodes/parsenodes.h"
extern void RemoveRewriteRule(Oid owningRel, const char *ruleName,
extern void RemoveRewriteRule(RangeVar *relation, const char *ruleName,
DropBehavior behavior, bool missing_ok);
extern void RemoveRewriteRuleById(Oid ruleOid);
......
......@@ -116,6 +116,10 @@ typedef union
} SharedInvalidationMessage;
/* Counter of messages processed; don't worry about overflow. */
extern uint64 SharedInvalidMessageCounter;
extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
int n);
extern void ReceiveSharedInvalidMessages(
......
......@@ -1708,7 +1708,8 @@ plpgsql_parse_cwordtype(List *idents)
relvar = makeRangeVar(strVal(linitial(idents)),
strVal(lsecond(idents)),
-1);
classOid = RangeVarGetRelid(relvar, true);
/* Can't lock relation - we might not have privileges. */
classOid = RangeVarGetRelid(relvar, NoLock, true, false);
if (!OidIsValid(classOid))
goto done;
fldname = strVal(lthird(idents));
......@@ -1804,16 +1805,11 @@ plpgsql_parse_cwordrowtype(List *idents)
/* Avoid memory leaks in long-term function context */
oldCxt = MemoryContextSwitchTo(compile_tmp_cxt);
/* Lookup the relation */
/* Look up relation name. Can't lock it - we might not have privileges. */
relvar = makeRangeVar(strVal(linitial(idents)),
strVal(lsecond(idents)),
-1);
classOid = RangeVarGetRelid(relvar, true);
if (!OidIsValid(classOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s.%s\" does not exist",
strVal(linitial(idents)), strVal(lsecond(idents)))));
classOid = RangeVarGetRelid(relvar, NoLock, false, false);
MemoryContextSwitchTo(oldCxt);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment