Commit 52667d56 authored by Tom Lane's avatar Tom Lane

Rethink the locking mechanisms used for CREATE/DROP/RENAME DATABASE.

The former approach used ExclusiveLock on pg_database, which being a
cluster-wide lock meant only one of these operations could proceed at
a time; worse, it also blocked all incoming connections in ReverifyMyDatabase.
Now that we have LockSharedObject(), we can use locks of different types
applied to databases considered as objects.  This allows much more
flexible management of the interlocking: two CREATE DATABASEs need not
block each other, and need not block connections except to the template
database being used.  Similarly DROP DATABASE doesn't block unrelated
operations.  The locking used in flatfiles.c is also much narrower in
scope than before.  Per recent proposal.
parent cb98e6fb
<!-- $PostgreSQL: pgsql/doc/src/sgml/manage-ag.sgml,v 2.45 2006/03/10 19:10:48 momjian Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/manage-ag.sgml,v 2.46 2006/05/04 16:07:28 tgl Exp $ -->
<chapter id="managing-databases"> <chapter id="managing-databases">
<title>Managing Databases</title> <title>Managing Databases</title>
...@@ -166,6 +166,7 @@ CREATE DATABASE <replaceable>dbname</> OWNER <replaceable>rolename</>; ...@@ -166,6 +166,7 @@ CREATE DATABASE <replaceable>dbname</> OWNER <replaceable>rolename</>;
<programlisting> <programlisting>
createdb -O <replaceable>rolename</> <replaceable>dbname</> createdb -O <replaceable>rolename</> <replaceable>dbname</>
</programlisting> </programlisting>
from the shell.
You must be a superuser to be allowed to create a database for You must be a superuser to be allowed to create a database for
someone else (that is, for a role you are not a member of). someone else (that is, for a role you are not a member of).
</para> </para>
...@@ -220,19 +221,15 @@ createdb -T template0 <replaceable>dbname</> ...@@ -220,19 +221,15 @@ createdb -T template0 <replaceable>dbname</>
<para> <para>
It is possible to create additional template databases, and indeed It is possible to create additional template databases, and indeed
one might copy any database in a cluster by specifying its name one may copy any database in a cluster by specifying its name
as the template for <command>CREATE DATABASE</>. It is important to as the template for <command>CREATE DATABASE</>. It is important to
understand, however, that this is not (yet) intended as understand, however, that this is not (yet) intended as
a general-purpose <quote><command>COPY DATABASE</command></quote> facility. In particular, it is a general-purpose <quote><command>COPY DATABASE</command></quote> facility.
essential that the source database be idle (no data-altering transactions The principal limitation is that no other sessions can be connected to
in progress) the source database while it is being copied. <command>CREATE
for the duration of the copying operation. <command>CREATE DATABASE</> DATABASE</> will fail if any other connection exists when it starts;
will check otherwise, new connections to the source database are locked out
that no session (other than itself) is connected to until <command>CREATE DATABASE</> completes.
the source database at the start of the operation, but this does not
guarantee that changes cannot be made while the copy proceeds, which
would result in an inconsistent copied database. Therefore,
we recommend that databases used as templates be treated as read-only.
</para> </para>
<para> <para>
......
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/create_database.sgml,v 1.44 2005/07/31 17:19:17 tgl Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/create_database.sgml,v 1.45 2006/05/04 16:07:29 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -45,7 +45,7 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable> ...@@ -45,7 +45,7 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
<para> <para>
Normally, the creator becomes the owner of the new database. Normally, the creator becomes the owner of the new database.
Superusers can create databases owned by other users using the Superusers can create databases owned by other users, by using the
<literal>OWNER</> clause. They can even create databases owned by <literal>OWNER</> clause. They can even create databases owned by
users with no special privileges. Non-superusers with <literal>CREATEDB</> users with no special privileges. Non-superusers with <literal>CREATEDB</>
privilege can only create databases owned by themselves. privilege can only create databases owned by themselves.
...@@ -104,7 +104,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable> ...@@ -104,7 +104,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
Character set encoding to use in the new database. Specify Character set encoding to use in the new database. Specify
a string constant (e.g., <literal>'SQL_ASCII'</literal>), a string constant (e.g., <literal>'SQL_ASCII'</literal>),
or an integer encoding number, or <literal>DEFAULT</literal> or an integer encoding number, or <literal>DEFAULT</literal>
to use the default encoding. The character sets supported by the to use the default encoding (namely, the encoding of the
template database). The character sets supported by the
<productname>PostgreSQL</productname> server are described in <productname>PostgreSQL</productname> server are described in
<xref linkend="multibyte-charset-supported">. <xref linkend="multibyte-charset-supported">.
</para> </para>
...@@ -169,7 +170,11 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable> ...@@ -169,7 +170,11 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
Although it is possible to copy a database other than <literal>template1</> Although it is possible to copy a database other than <literal>template1</>
by specifying its name as the template, this is not (yet) intended as by specifying its name as the template, this is not (yet) intended as
a general-purpose <quote><command>COPY DATABASE</command></quote> facility. a general-purpose <quote><command>COPY DATABASE</command></quote> facility.
We recommend that databases used as templates be treated as read-only. The principal limitation is that no other sessions can be connected to
the template database while it is being copied. <command>CREATE
DATABASE</> will fail if any other connection exists when it starts;
otherwise, new connections to the template database are locked out
until <command>CREATE DATABASE</> completes.
See <xref linkend="manage-ag-templatedbs"> for more information. See <xref linkend="manage-ag-templatedbs"> for more information.
</para> </para>
...@@ -220,6 +225,16 @@ CREATE DATABASE music ENCODING 'LATIN1'; ...@@ -220,6 +225,16 @@ CREATE DATABASE music ENCODING 'LATIN1';
implementation-defined. implementation-defined.
</para> </para>
</refsect1> </refsect1>
<refsect1>
<title>See Also</title>
<simplelist type="inline">
<member><xref linkend="sql-alterdatabase" endterm="sql-alterdatabase-title"></member>
<member><xref linkend="sql-dropdatabase" endterm="sql-dropdatabase-title"></member>
</simplelist>
</refsect1>
</refentry> </refentry>
<!-- Keep this comment at the end of the file <!-- Keep this comment at the end of the file
......
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/drop_database.sgml,v 1.21 2005/11/22 15:24:17 adunstan Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/drop_database.sgml,v 1.22 2006/05/04 16:07:29 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -86,7 +86,7 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable> ...@@ -86,7 +86,7 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
<title>Compatibility</title> <title>Compatibility</title>
<para> <para>
The is no <command>DROP DATABASE</command> statement in the SQL standard. There is no <command>DROP DATABASE</command> statement in the SQL standard.
</para> </para>
</refsect1> </refsect1>
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.8 2006/03/05 15:58:23 momjian Exp $ * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.9 2006/05/04 16:07:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -37,7 +37,6 @@ ...@@ -37,7 +37,6 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/syscache.h" #include "utils/syscache.h"
...@@ -911,12 +910,6 @@ shdepLockAndCheckObject(Oid classId, Oid objectId) ...@@ -911,12 +910,6 @@ shdepLockAndCheckObject(Oid classId, Oid objectId)
/* AccessShareLock should be OK, since we are not modifying the object */ /* AccessShareLock should be OK, since we are not modifying the object */
LockSharedObject(classId, objectId, 0, AccessShareLock); LockSharedObject(classId, objectId, 0, AccessShareLock);
/*
* We have to recognize sinval updates here, else our local syscache may
* still contain the object even if it was just dropped.
*/
AcceptInvalidationMessages();
switch (classId) switch (classId)
{ {
case AuthIdRelationId: case AuthIdRelationId:
......
...@@ -3,19 +3,17 @@ ...@@ -3,19 +3,17 @@
* dbcommands.c * dbcommands.c
* Database management commands (create/drop database). * Database management commands (create/drop database).
* *
* Note: database creation/destruction commands take ExclusiveLock on * Note: database creation/destruction commands use exclusive locks on
* pg_database to ensure that no two proceed in parallel. We must use * the database objects (as expressed by LockSharedObject()) to avoid
* at least this level of locking to ensure that no two backends try to * stepping on each others' toes. Formerly we used table-level locks
* write the flat-file copy of pg_database at once. We avoid using * on pg_database, but that's too coarse-grained.
* AccessExclusiveLock since there's no need to lock out ordinary readers
* of pg_database.
* *
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.180 2006/05/03 22:45:26 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.181 2006/05/04 16:07:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -53,7 +51,8 @@ ...@@ -53,7 +51,8 @@
/* non-export function prototypes */ /* non-export function prototypes */
static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, static bool get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP, Oid *dbLastSysOidP,
TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
...@@ -80,13 +79,12 @@ createdb(const CreatedbStmt *stmt) ...@@ -80,13 +79,12 @@ createdb(const CreatedbStmt *stmt)
TransactionId src_frozenxid; TransactionId src_frozenxid;
Oid src_deftablespace; Oid src_deftablespace;
volatile Oid dst_deftablespace; volatile Oid dst_deftablespace;
volatile Relation pg_database_rel; Relation pg_database_rel;
HeapTuple tuple; HeapTuple tuple;
TupleDesc pg_database_dsc;
Datum new_record[Natts_pg_database]; Datum new_record[Natts_pg_database];
char new_record_nulls[Natts_pg_database]; char new_record_nulls[Natts_pg_database];
Oid dboid; Oid dboid;
volatile Oid datdba; Oid datdba;
ListCell *option; ListCell *option;
DefElem *dtablespacename = NULL; DefElem *dtablespacename = NULL;
DefElem *downer = NULL; DefElem *downer = NULL;
...@@ -96,8 +94,8 @@ createdb(const CreatedbStmt *stmt) ...@@ -96,8 +94,8 @@ createdb(const CreatedbStmt *stmt)
char *dbname = stmt->dbname; char *dbname = stmt->dbname;
char *dbowner = NULL; char *dbowner = NULL;
const char *dbtemplate = NULL; const char *dbtemplate = NULL;
volatile int encoding = -1; int encoding = -1;
volatile int dbconnlimit = -1; int dbconnlimit = -1;
/* don't call this in a transaction block */ /* don't call this in a transaction block */
PreventTransactionChain((void *) stmt, "CREATE DATABASE"); PreventTransactionChain((void *) stmt, "CREATE DATABASE");
...@@ -216,31 +214,25 @@ createdb(const CreatedbStmt *stmt) ...@@ -216,31 +214,25 @@ createdb(const CreatedbStmt *stmt)
check_is_member_of_role(GetUserId(), datdba); check_is_member_of_role(GetUserId(), datdba);
/* /*
* Check for db name conflict. There is a race condition here, since * Lookup database (template) to be cloned, and obtain share lock on it.
* another backend could create the same DB name before we commit. * ShareLock allows two CREATE DATABASEs to work from the same template
* However, holding an exclusive lock on pg_database for the whole time we * concurrently, while ensuring no one is busy dropping it in parallel
* are copying the source database doesn't seem like a good idea, so * (which would be Very Bad since we'd likely get an incomplete copy
* accept possibility of race to create. We will check again after we * without knowing it). This also prevents any new connections from being
* grab the exclusive lock. * made to the source until we finish copying it, so we can be sure it
*/ * won't change underneath us.
if (get_db_info(dbname, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE),
errmsg("database \"%s\" already exists", dbname)));
/*
* Lookup database (template) to be cloned.
*/ */
if (!dbtemplate) if (!dbtemplate)
dbtemplate = "template1"; /* Default template database name */ dbtemplate = "template1"; /* Default template database name */
if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, if (!get_db_info(dbtemplate, ShareLock,
&src_dboid, &src_owner, &src_encoding,
&src_istemplate, &src_allowconn, &src_lastsysoid, &src_istemplate, &src_allowconn, &src_lastsysoid,
&src_vacuumxid, &src_frozenxid, &src_deftablespace)) &src_vacuumxid, &src_frozenxid, &src_deftablespace))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("template database \"%s\" does not exist", dbtemplate))); errmsg("template database \"%s\" does not exist",
dbtemplate)));
/* /*
* Permission check: to copy a DB that's not marked datistemplate, you * Permission check: to copy a DB that's not marked datistemplate, you
...@@ -258,8 +250,7 @@ createdb(const CreatedbStmt *stmt) ...@@ -258,8 +250,7 @@ createdb(const CreatedbStmt *stmt)
/* /*
* The source DB can't have any active backends, except this one * The source DB can't have any active backends, except this one
* (exception is to allow CREATE DB while connected to template1). * (exception is to allow CREATE DB while connected to template1).
* Otherwise we might copy inconsistent data. This check is not * Otherwise we might copy inconsistent data.
* bulletproof, since someone might connect while we are copying...
*/ */
if (DatabaseHasActiveBackends(src_dboid, true)) if (DatabaseHasActiveBackends(src_dboid, true))
ereport(ERROR, ereport(ERROR,
...@@ -346,14 +337,65 @@ createdb(const CreatedbStmt *stmt) ...@@ -346,14 +337,65 @@ createdb(const CreatedbStmt *stmt)
src_vacuumxid = src_frozenxid = GetCurrentTransactionId(); src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
/* /*
* Preassign OID for pg_database tuple, so that we can compute db path. We * Check for db name conflict. This is just to give a more friendly
* have to open pg_database to do this, but we don't want to take * error message than "unique index violation". There's a race condition
* ExclusiveLock yet, so just do it and close again. * but we're willing to accept the less friendly message in that case.
*/ */
pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock); if (OidIsValid(get_database_oid(dbname)))
dboid = GetNewOid(pg_database_rel); ereport(ERROR,
heap_close(pg_database_rel, AccessShareLock); (errcode(ERRCODE_DUPLICATE_DATABASE),
pg_database_rel = NULL; errmsg("database \"%s\" already exists", dbname)));
/*
* Insert a new tuple into pg_database. This establishes our ownership
* of the new database name (anyone else trying to insert the same name
* will block on the unique index, and fail after we commit). It also
* assigns the OID that the new database will have.
*/
pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
/* Form tuple */
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
new_record[Anum_pg_database_datname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(dbname));
new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
/*
* We deliberately set datconfig and datacl to defaults (NULL), rather
* than copying them from the template database. Copying datacl would
* be a bad idea when the owner is not the same as the template's
* owner. It's more debatable whether datconfig should be copied.
*/
new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
new_record, new_record_nulls);
dboid = simple_heap_insert(pg_database_rel, tuple);
/* Update indexes */
CatalogUpdateIndexes(pg_database_rel, tuple);
/*
* Now generate additional catalog entries associated with the new DB
*/
/* Register owner dependency */
recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
/* Create pg_shdepend entries for objects within database */
copyTemplateDependencies(src_dboid, dboid);
/* /*
* Force dirty buffers out to disk, to ensure source database is * Force dirty buffers out to disk, to ensure source database is
...@@ -434,64 +476,6 @@ createdb(const CreatedbStmt *stmt) ...@@ -434,64 +476,6 @@ createdb(const CreatedbStmt *stmt)
heap_endscan(scan); heap_endscan(scan);
heap_close(rel, AccessShareLock); heap_close(rel, AccessShareLock);
/*
* Now OK to grab exclusive lock on pg_database.
*/
pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
/* Check to see if someone else created same DB name meanwhile. */
if (get_db_info(dbname, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE),
errmsg("database \"%s\" already exists", dbname)));
/*
* Insert a new tuple into pg_database
*/
pg_database_dsc = RelationGetDescr(pg_database_rel);
/* Form tuple */
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
new_record[Anum_pg_database_datname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(dbname));
new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
/*
* We deliberately set datconfig and datacl to defaults (NULL), rather
* than copying them from the template database. Copying datacl would
* be a bad idea when the owner is not the same as the template's
* owner. It's more debatable whether datconfig should be copied.
*/
new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID
* selection */
simple_heap_insert(pg_database_rel, tuple);
/* Update indexes */
CatalogUpdateIndexes(pg_database_rel, tuple);
/* Register owner dependency */
recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
/* Create pg_shdepend entries for objects within database */
copyTemplateDependencies(src_dboid, dboid);
/* /*
* We force a checkpoint before committing. This effectively means * We force a checkpoint before committing. This effectively means
* that committed XLOG_DBASE_CREATE operations will never need to be * that committed XLOG_DBASE_CREATE operations will never need to be
...@@ -523,6 +507,12 @@ createdb(const CreatedbStmt *stmt) ...@@ -523,6 +507,12 @@ createdb(const CreatedbStmt *stmt)
*/ */
RequestCheckpoint(true, false); RequestCheckpoint(true, false);
/*
* Close pg_database, but keep lock till commit (this is important
* to prevent any risk of deadlock failure while updating flat file)
*/
heap_close(pg_database_rel, NoLock);
/* /*
* Set flag to update flat database file at commit. * Set flag to update flat database file at commit.
*/ */
...@@ -530,9 +520,9 @@ createdb(const CreatedbStmt *stmt) ...@@ -530,9 +520,9 @@ createdb(const CreatedbStmt *stmt)
} }
PG_CATCH(); PG_CATCH();
{ {
/* Don't hold pg_database lock while doing recursive remove */ /* Release lock on source database before doing recursive remove */
if (pg_database_rel != NULL) UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
heap_close(pg_database_rel, ExclusiveLock); ShareLock);
/* Throw away any successfully copied subdirectories */ /* Throw away any successfully copied subdirectories */
remove_dbtablespaces(dboid); remove_dbtablespaces(dboid);
...@@ -540,10 +530,6 @@ createdb(const CreatedbStmt *stmt) ...@@ -540,10 +530,6 @@ createdb(const CreatedbStmt *stmt)
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
/* Close pg_database, but keep exclusive lock till commit */
/* This has to be outside the PG_TRY */
heap_close(pg_database_rel, NoLock);
} }
...@@ -568,20 +554,15 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -568,20 +554,15 @@ dropdb(const char *dbname, bool missing_ok)
errmsg("cannot drop the currently open database"))); errmsg("cannot drop the currently open database")));
/* /*
* Obtain exclusive lock on pg_database. We need this to ensure that no * Look up the target database's OID, and get exclusive lock on it.
* new backend starts up in the target database while we are deleting it. * We need this to ensure that no new backend starts up in the target
* (Actually, a new backend might still manage to start up, because it * database while we are deleting it (see postinit.c), and that no one is
* isn't able to lock pg_database while starting. But it will detect its * using it as a CREATE DATABASE template or trying to delete it for
* error in ReverifyMyDatabase and shut down before any serious damage is * themselves.
* done. See postinit.c.)
*
* An ExclusiveLock, rather than AccessExclusiveLock, is sufficient since
* ReverifyMyDatabase takes RowShareLock. This allows ordinary readers of
* pg_database to proceed in parallel.
*/ */
pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock); pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, &db_id, NULL, NULL, if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
&db_istemplate, NULL, NULL, NULL, NULL, NULL)) &db_istemplate, NULL, NULL, NULL, NULL, NULL))
{ {
if (!missing_ok) if (!missing_ok)
...@@ -592,17 +573,18 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -592,17 +573,18 @@ dropdb(const char *dbname, bool missing_ok)
} }
else else
{ {
/* Close pg_database, release the lock, since we changed nothing */ /* Close pg_database, release the lock, since we changed nothing */
heap_close(pgdbrel, ExclusiveLock); heap_close(pgdbrel, RowExclusiveLock);
ereport(NOTICE, ereport(NOTICE,
(errmsg("database \"%s\" does not exist, skipping", (errmsg("database \"%s\" does not exist, skipping",
dbname))); dbname)));
return; return;
} }
} }
/*
* Permission checks
*/
if (!pg_database_ownercheck(db_id, GetUserId())) if (!pg_database_ownercheck(db_id, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
dbname); dbname);
...@@ -618,7 +600,8 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -618,7 +600,8 @@ dropdb(const char *dbname, bool missing_ok)
errmsg("cannot drop a template database"))); errmsg("cannot drop a template database")));
/* /*
* Check for active backends in the target database. * Check for active backends in the target database. (Because we hold
* the database lock, no new ones can start after this.)
*/ */
if (DatabaseHasActiveBackends(db_id, false)) if (DatabaseHasActiveBackends(db_id, false))
ereport(ERROR, ereport(ERROR,
...@@ -640,8 +623,7 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -640,8 +623,7 @@ dropdb(const char *dbname, bool missing_ok)
ReleaseSysCache(tup); ReleaseSysCache(tup);
/* /*
* Delete any comments associated with the database * Delete any comments associated with the database.
*
*/ */
DeleteSharedComments(db_id, DatabaseRelationId); DeleteSharedComments(db_id, DatabaseRelationId);
...@@ -675,7 +657,10 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -675,7 +657,10 @@ dropdb(const char *dbname, bool missing_ok)
*/ */
remove_dbtablespaces(db_id); remove_dbtablespaces(db_id);
/* Close pg_database, but keep exclusive lock till commit */ /*
* Close pg_database, but keep lock till commit (this is important
* to prevent any risk of deadlock failure while updating flat file)
*/
heap_close(pgdbrel, NoLock); heap_close(pgdbrel, NoLock);
/* /*
...@@ -691,29 +676,18 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -691,29 +676,18 @@ dropdb(const char *dbname, bool missing_ok)
void void
RenameDatabase(const char *oldname, const char *newname) RenameDatabase(const char *oldname, const char *newname)
{ {
HeapTuple tup, Oid db_id;
newtup; HeapTuple newtup;
Relation rel; Relation rel;
SysScanDesc scan,
scan2;
ScanKeyData key,
key2;
/* /*
* Obtain ExclusiveLock so that no new session gets started while the * Look up the target database's OID, and get exclusive lock on it.
* rename is in progress. * We need this for the same reasons as DROP DATABASE.
*/ */
rel = heap_open(DatabaseRelationId, ExclusiveLock); rel = heap_open(DatabaseRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(oldname));
scan = systable_beginscan(rel, DatabaseNameIndexId, true,
SnapshotNow, 1, &key);
tup = systable_getnext(scan); if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
if (!HeapTupleIsValid(tup)) NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", oldname))); errmsg("database \"%s\" does not exist", oldname)));
...@@ -724,36 +698,29 @@ RenameDatabase(const char *oldname, const char *newname) ...@@ -724,36 +698,29 @@ RenameDatabase(const char *oldname, const char *newname)
* be an actual problem besides a little confusion, so think about this * be an actual problem besides a little confusion, so think about this
* and decide. * and decide.
*/ */
if (HeapTupleGetOid(tup) == MyDatabaseId) if (db_id == MyDatabaseId)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("current database may not be renamed"))); errmsg("current database may not be renamed")));
/* /*
* Make sure the database does not have active sessions. Might not be * Make sure the database does not have active sessions. This is the
* necessary, but it's consistent with other database operations. * same concern as above, but applied to other sessions.
*/ */
if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false)) if (DatabaseHasActiveBackends(db_id, false))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE), (errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is being accessed by other users", errmsg("database \"%s\" is being accessed by other users",
oldname))); oldname)));
/* make sure the new name doesn't exist */ /* make sure the new name doesn't exist */
ScanKeyInit(&key2, if (OidIsValid(get_database_oid(newname)))
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(newname));
scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
SnapshotNow, 1, &key2);
if (HeapTupleIsValid(systable_getnext(scan2)))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE), (errcode(ERRCODE_DUPLICATE_DATABASE),
errmsg("database \"%s\" already exists", newname))); errmsg("database \"%s\" already exists", newname)));
systable_endscan(scan2);
/* must be owner */ /* must be owner */
if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId())) if (!pg_database_ownercheck(db_id, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
oldname); oldname);
...@@ -764,14 +731,19 @@ RenameDatabase(const char *oldname, const char *newname) ...@@ -764,14 +731,19 @@ RenameDatabase(const char *oldname, const char *newname)
errmsg("permission denied to rename database"))); errmsg("permission denied to rename database")));
/* rename */ /* rename */
newtup = heap_copytuple(tup); newtup = SearchSysCacheCopy(DATABASEOID,
ObjectIdGetDatum(db_id),
0, 0, 0);
if (!HeapTupleIsValid(newtup))
elog(ERROR, "cache lookup failed for database %u", db_id);
namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname); namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
simple_heap_update(rel, &newtup->t_self, newtup); simple_heap_update(rel, &newtup->t_self, newtup);
CatalogUpdateIndexes(rel, newtup); CatalogUpdateIndexes(rel, newtup);
systable_endscan(scan); /*
* Close pg_database, but keep lock till commit (this is important
/* Close pg_database, but keep exclusive lock till commit */ * to prevent any risk of deadlock failure while updating flat file)
*/
heap_close(rel, NoLock); heap_close(rel, NoLock);
/* /*
...@@ -821,7 +793,9 @@ AlterDatabase(AlterDatabaseStmt *stmt) ...@@ -821,7 +793,9 @@ AlterDatabase(AlterDatabaseStmt *stmt)
connlimit = intVal(dconnlimit->arg); connlimit = intVal(dconnlimit->arg);
/* /*
* We don't need ExclusiveLock since we aren't updating the flat file. * Get the old tuple. We don't need a lock on the database per se,
* because we're not going to do anything that would mess up incoming
* connections.
*/ */
rel = heap_open(DatabaseRelationId, RowExclusiveLock); rel = heap_open(DatabaseRelationId, RowExclusiveLock);
ScanKeyInit(&scankey, ScanKeyInit(&scankey,
...@@ -891,7 +865,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) ...@@ -891,7 +865,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
valuestr = flatten_set_variable_args(stmt->variable, stmt->value); valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
/* /*
* We don't need ExclusiveLock since we aren't updating the flat file. * Get the old tuple. We don't need a lock on the database per se,
* because we're not going to do anything that would mess up incoming
* connections.
*/ */
rel = heap_open(DatabaseRelationId, RowExclusiveLock); rel = heap_open(DatabaseRelationId, RowExclusiveLock);
ScanKeyInit(&scankey, ScanKeyInit(&scankey,
...@@ -974,7 +950,9 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) ...@@ -974,7 +950,9 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
Form_pg_database datForm; Form_pg_database datForm;
/* /*
* We don't need ExclusiveLock since we aren't updating the flat file. * Get the old tuple. We don't need a lock on the database per se,
* because we're not going to do anything that would mess up incoming
* connections.
*/ */
rel = heap_open(DatabaseRelationId, RowExclusiveLock); rel = heap_open(DatabaseRelationId, RowExclusiveLock);
ScanKeyInit(&scankey, ScanKeyInit(&scankey,
...@@ -1077,72 +1055,127 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) ...@@ -1077,72 +1055,127 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
* Helper functions * Helper functions
*/ */
/*
* Look up info about the database named "name". If the database exists,
* obtain the specified lock type on it, fill in any of the remaining
* parameters that aren't NULL, and return TRUE. If no such database,
* return FALSE.
*/
static bool static bool
get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP, Oid *dbLastSysOidP,
TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
Oid *dbTablespace) Oid *dbTablespace)
{ {
bool result = false;
Relation relation; Relation relation;
ScanKeyData scanKey;
SysScanDesc scan;
HeapTuple tuple;
bool gottuple;
AssertArg(name); AssertArg(name);
/* Caller may wish to grab a better lock on pg_database beforehand... */ /* Caller may wish to grab a better lock on pg_database beforehand... */
relation = heap_open(DatabaseRelationId, AccessShareLock); relation = heap_open(DatabaseRelationId, AccessShareLock);
ScanKeyInit(&scanKey, /*
Anum_pg_database_datname, * Loop covers the rare case where the database is renamed before we
BTEqualStrategyNumber, F_NAMEEQ, * can lock it. We try again just in case we can find a new one of
NameGetDatum(name)); * the same name.
*/
for (;;)
{
ScanKeyData scanKey;
SysScanDesc scan;
HeapTuple tuple;
Oid dbOid;
scan = systable_beginscan(relation, DatabaseNameIndexId, true, /*
SnapshotNow, 1, &scanKey); * there's no syscache for database-indexed-by-name,
* so must do it the hard way
*/
ScanKeyInit(&scanKey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(name));
tuple = systable_getnext(scan); scan = systable_beginscan(relation, DatabaseNameIndexId, true,
SnapshotNow, 1, &scanKey);
gottuple = HeapTupleIsValid(tuple); tuple = systable_getnext(scan);
if (gottuple)
{ if (!HeapTupleIsValid(tuple))
Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); {
/* definitely no database of that name */
/* oid of the database */ systable_endscan(scan);
if (dbIdP) break;
*dbIdP = HeapTupleGetOid(tuple); }
/* oid of the owner */
if (ownerIdP) dbOid = HeapTupleGetOid(tuple);
*ownerIdP = dbform->datdba;
/* character encoding */ systable_endscan(scan);
if (encodingP)
*encodingP = dbform->encoding; /*
/* allowed as template? */ * Now that we have a database OID, we can try to lock the DB.
if (dbIsTemplateP) */
*dbIsTemplateP = dbform->datistemplate; if (lockmode != NoLock)
/* allowing connections? */ LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
if (dbAllowConnP)
*dbAllowConnP = dbform->datallowconn; /*
/* last system OID used in database */ * And now, re-fetch the tuple by OID. If it's still there and
if (dbLastSysOidP) * still the same name, we win; else, drop the lock and loop
*dbLastSysOidP = dbform->datlastsysoid; * back to try again.
/* limit of vacuumed XIDs */ */
if (dbVacuumXidP) tuple = SearchSysCache(DATABASEOID,
*dbVacuumXidP = dbform->datvacuumxid; ObjectIdGetDatum(dbOid),
/* limit of frozen XIDs */ 0, 0, 0);
if (dbFrozenXidP) if (HeapTupleIsValid(tuple))
*dbFrozenXidP = dbform->datfrozenxid; {
/* default tablespace for this database */ Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
if (dbTablespace)
*dbTablespace = dbform->dattablespace; if (strcmp(name, NameStr(dbform->datname)) == 0)
{
/* oid of the database */
if (dbIdP)
*dbIdP = dbOid;
/* oid of the owner */
if (ownerIdP)
*ownerIdP = dbform->datdba;
/* character encoding */
if (encodingP)
*encodingP = dbform->encoding;
/* allowed as template? */
if (dbIsTemplateP)
*dbIsTemplateP = dbform->datistemplate;
/* allowing connections? */
if (dbAllowConnP)
*dbAllowConnP = dbform->datallowconn;
/* last system OID used in database */
if (dbLastSysOidP)
*dbLastSysOidP = dbform->datlastsysoid;
/* limit of vacuumed XIDs */
if (dbVacuumXidP)
*dbVacuumXidP = dbform->datvacuumxid;
/* limit of frozen XIDs */
if (dbFrozenXidP)
*dbFrozenXidP = dbform->datfrozenxid;
/* default tablespace for this database */
if (dbTablespace)
*dbTablespace = dbform->dattablespace;
ReleaseSysCache(tuple);
result = true;
break;
}
/* can only get here if it was just renamed */
ReleaseSysCache(tuple);
}
if (lockmode != NoLock)
UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
} }
systable_endscan(scan);
heap_close(relation, AccessShareLock); heap_close(relation, AccessShareLock);
return gottuple; return result;
} }
/* Check if current user has createdb privileges */ /* Check if current user has createdb privileges */
...@@ -1234,8 +1267,6 @@ remove_dbtablespaces(Oid db_id) ...@@ -1234,8 +1267,6 @@ remove_dbtablespaces(Oid db_id)
* get_database_oid - given a database name, look up the OID * get_database_oid - given a database name, look up the OID
* *
* Returns InvalidOid if database name not found. * Returns InvalidOid if database name not found.
*
* This is not actually used in this file, but is exported for use elsewhere.
*/ */
Oid Oid
get_database_oid(const char *dbname) get_database_oid(const char *dbname)
...@@ -1277,8 +1308,6 @@ get_database_oid(const char *dbname) ...@@ -1277,8 +1308,6 @@ get_database_oid(const char *dbname)
* get_database_name - given a database OID, look up the name * get_database_name - given a database OID, look up the name
* *
* Returns a palloc'd string, or NULL if no such database. * Returns a palloc'd string, or NULL if no such database.
*
* This is not actually used in this file, but is exported for use elsewhere.
*/ */
char * char *
get_database_name(Oid dbid) get_database_name(Oid dbid)
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.170 2006/03/05 15:58:25 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.171 2006/05/04 16:07:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -274,10 +274,9 @@ CreateRole(CreateRoleStmt *stmt) ...@@ -274,10 +274,9 @@ CreateRole(CreateRoleStmt *stmt)
/* /*
* Check the pg_authid relation to be certain the role doesn't already * Check the pg_authid relation to be certain the role doesn't already
* exist. Note we secure exclusive lock because we need to protect our * exist.
* eventual update of the flat auth file.
*/ */
pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
pg_authid_dsc = RelationGetDescr(pg_authid_rel); pg_authid_dsc = RelationGetDescr(pg_authid_rel);
tuple = SearchSysCache(AUTHNAME, tuple = SearchSysCache(AUTHNAME,
...@@ -377,8 +376,8 @@ CreateRole(CreateRoleStmt *stmt) ...@@ -377,8 +376,8 @@ CreateRole(CreateRoleStmt *stmt)
GetUserId(), false); GetUserId(), false);
/* /*
* Now we can clean up; but keep lock until commit (to avoid possible * Close pg_authid, but keep lock till commit (this is important
* deadlock when commit code tries to acquire lock). * to prevent any risk of deadlock failure while updating flat file)
*/ */
heap_close(pg_authid_rel, NoLock); heap_close(pg_authid_rel, NoLock);
...@@ -538,10 +537,9 @@ AlterRole(AlterRoleStmt *stmt) ...@@ -538,10 +537,9 @@ AlterRole(AlterRoleStmt *stmt)
validUntil = strVal(dvalidUntil->arg); validUntil = strVal(dvalidUntil->arg);
/* /*
* Scan the pg_authid relation to be certain the user exists. Note we * Scan the pg_authid relation to be certain the user exists.
* secure exclusive lock to protect our update of the flat auth file.
*/ */
pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
pg_authid_dsc = RelationGetDescr(pg_authid_rel); pg_authid_dsc = RelationGetDescr(pg_authid_rel);
tuple = SearchSysCache(AUTHNAME, tuple = SearchSysCache(AUTHNAME,
...@@ -697,8 +695,8 @@ AlterRole(AlterRoleStmt *stmt) ...@@ -697,8 +695,8 @@ AlterRole(AlterRoleStmt *stmt)
false); false);
/* /*
* Now we can clean up; but keep lock until commit (to avoid possible * Close pg_authid, but keep lock till commit (this is important
* deadlock when commit code tries to acquire lock). * to prevent any risk of deadlock failure while updating flat file)
*/ */
heap_close(pg_authid_rel, NoLock); heap_close(pg_authid_rel, NoLock);
...@@ -726,10 +724,6 @@ AlterRoleSet(AlterRoleSetStmt *stmt) ...@@ -726,10 +724,6 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
valuestr = flatten_set_variable_args(stmt->variable, stmt->value); valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
/*
* RowExclusiveLock is sufficient, because we don't need to update the
* flat auth file.
*/
rel = heap_open(AuthIdRelationId, RowExclusiveLock); rel = heap_open(AuthIdRelationId, RowExclusiveLock);
oldtuple = SearchSysCache(AUTHNAME, oldtuple = SearchSysCache(AUTHNAME,
PointerGetDatum(stmt->role), PointerGetDatum(stmt->role),
...@@ -799,6 +793,7 @@ AlterRoleSet(AlterRoleSetStmt *stmt) ...@@ -799,6 +793,7 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
CatalogUpdateIndexes(rel, newtuple); CatalogUpdateIndexes(rel, newtuple);
ReleaseSysCache(oldtuple); ReleaseSysCache(oldtuple);
/* needn't keep lock since we won't be updating the flat file */
heap_close(rel, RowExclusiveLock); heap_close(rel, RowExclusiveLock);
} }
...@@ -820,11 +815,9 @@ DropRole(DropRoleStmt *stmt) ...@@ -820,11 +815,9 @@ DropRole(DropRoleStmt *stmt)
/* /*
* Scan the pg_authid relation to find the Oid of the role(s) to be * Scan the pg_authid relation to find the Oid of the role(s) to be
* deleted. Note we secure exclusive lock on pg_authid, because we need * deleted.
* to protect our update of the flat auth file. A regular writer's lock
* on pg_auth_members is sufficient though.
*/ */
pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
pg_auth_members_rel = heap_open(AuthMemRelationId, RowExclusiveLock); pg_auth_members_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
foreach(item, stmt->roles) foreach(item, stmt->roles)
...@@ -960,7 +953,7 @@ DropRole(DropRoleStmt *stmt) ...@@ -960,7 +953,7 @@ DropRole(DropRoleStmt *stmt)
/* /*
* Now we can clean up; but keep locks until commit (to avoid possible * Now we can clean up; but keep locks until commit (to avoid possible
* deadlock when commit code tries to acquire lock). * deadlock failure while updating flat file)
*/ */
heap_close(pg_auth_members_rel, NoLock); heap_close(pg_auth_members_rel, NoLock);
heap_close(pg_authid_rel, NoLock); heap_close(pg_authid_rel, NoLock);
...@@ -989,8 +982,7 @@ RenameRole(const char *oldname, const char *newname) ...@@ -989,8 +982,7 @@ RenameRole(const char *oldname, const char *newname)
int i; int i;
Oid roleid; Oid roleid;
/* ExclusiveLock because we need to update the flat auth file */ rel = heap_open(AuthIdRelationId, RowExclusiveLock);
rel = heap_open(AuthIdRelationId, ExclusiveLock);
dsc = RelationGetDescr(rel); dsc = RelationGetDescr(rel);
oldtuple = SearchSysCache(AUTHNAME, oldtuple = SearchSysCache(AUTHNAME,
...@@ -1080,6 +1072,11 @@ RenameRole(const char *oldname, const char *newname) ...@@ -1080,6 +1072,11 @@ RenameRole(const char *oldname, const char *newname)
CatalogUpdateIndexes(rel, newtuple); CatalogUpdateIndexes(rel, newtuple);
ReleaseSysCache(oldtuple); ReleaseSysCache(oldtuple);
/*
* Close pg_authid, but keep lock till commit (this is important
* to prevent any risk of deadlock failure while updating flat file)
*/
heap_close(rel, NoLock); heap_close(rel, NoLock);
/* /*
...@@ -1108,11 +1105,8 @@ GrantRole(GrantRoleStmt *stmt) ...@@ -1108,11 +1105,8 @@ GrantRole(GrantRoleStmt *stmt)
grantee_ids = roleNamesToIds(stmt->grantee_roles); grantee_ids = roleNamesToIds(stmt->grantee_roles);
/* /* AccessShareLock is enough since we aren't modifying pg_authid */
* Even though this operation doesn't change pg_authid, we must secure pg_authid_rel = heap_open(AuthIdRelationId, AccessShareLock);
* exclusive lock on it to protect our update of the flat auth file.
*/
pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
/* /*
* Step through all of the granted roles and add/remove entries for the * Step through all of the granted roles and add/remove entries for the
...@@ -1136,6 +1130,10 @@ GrantRole(GrantRoleStmt *stmt) ...@@ -1136,6 +1130,10 @@ GrantRole(GrantRoleStmt *stmt)
stmt->admin_opt); stmt->admin_opt);
} }
/*
* Close pg_authid, but keep lock till commit (this is important
* to prevent any risk of deadlock failure while updating flat file)
*/
heap_close(pg_authid_rel, NoLock); heap_close(pg_authid_rel, NoLock);
/* /*
...@@ -1237,8 +1235,7 @@ roleNamesToIds(List *memberNames) ...@@ -1237,8 +1235,7 @@ roleNamesToIds(List *memberNames)
* grantorId: who is granting the membership * grantorId: who is granting the membership
* admin_opt: granting admin option? * admin_opt: granting admin option?
* *
* Note: caller is responsible for holding ExclusiveLock on pg_authid, * Note: caller is responsible for calling auth_file_update_needed().
* and for calling auth_file_update_needed().
*/ */
static void static void
AddRoleMems(const char *rolename, Oid roleid, AddRoleMems(const char *rolename, Oid roleid,
...@@ -1283,7 +1280,6 @@ AddRoleMems(const char *rolename, Oid roleid, ...@@ -1283,7 +1280,6 @@ AddRoleMems(const char *rolename, Oid roleid,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to set grantor"))); errmsg("must be superuser to set grantor")));
/* We need only regular writer's lock on pg_auth_members */
pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock); pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
pg_authmem_dsc = RelationGetDescr(pg_authmem_rel); pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
...@@ -1363,8 +1359,8 @@ AddRoleMems(const char *rolename, Oid roleid, ...@@ -1363,8 +1359,8 @@ AddRoleMems(const char *rolename, Oid roleid,
} }
/* /*
* Now we can clean up; but keep lock until commit (to avoid possible * Close pg_authmem, but keep lock till commit (this is important
* deadlock when commit code tries to acquire lock). * to prevent any risk of deadlock failure while updating flat file)
*/ */
heap_close(pg_authmem_rel, NoLock); heap_close(pg_authmem_rel, NoLock);
} }
...@@ -1378,8 +1374,7 @@ AddRoleMems(const char *rolename, Oid roleid, ...@@ -1378,8 +1374,7 @@ AddRoleMems(const char *rolename, Oid roleid,
* memberIds: OIDs of roles to del * memberIds: OIDs of roles to del
* admin_opt: remove admin option only? * admin_opt: remove admin option only?
* *
* Note: caller is responsible for holding ExclusiveLock on pg_authid, * Note: caller is responsible for calling auth_file_update_needed().
* and for calling auth_file_update_needed().
*/ */
static void static void
DelRoleMems(const char *rolename, Oid roleid, DelRoleMems(const char *rolename, Oid roleid,
...@@ -1418,7 +1413,6 @@ DelRoleMems(const char *rolename, Oid roleid, ...@@ -1418,7 +1413,6 @@ DelRoleMems(const char *rolename, Oid roleid,
rolename))); rolename)));
} }
/* We need only regular writer's lock on pg_auth_members */
pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock); pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
pg_authmem_dsc = RelationGetDescr(pg_authmem_rel); pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
...@@ -1478,8 +1472,8 @@ DelRoleMems(const char *rolename, Oid roleid, ...@@ -1478,8 +1472,8 @@ DelRoleMems(const char *rolename, Oid roleid,
} }
/* /*
* Now we can clean up; but keep lock until commit (to avoid possible * Close pg_authmem, but keep lock till commit (this is important
* deadlock when commit code tries to acquire lock). * to prevent any risk of deadlock failure while updating flat file)
*/ */
heap_close(pg_authmem_rel, NoLock); heap_close(pg_authmem_rel, NoLock);
} }
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.82 2006/03/05 15:58:38 momjian Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.83 2006/05/04 16:07:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -478,6 +478,9 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid, ...@@ -478,6 +478,9 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objsubid); objsubid);
(void) LockAcquire(&tag, false, lockmode, false, false); (void) LockAcquire(&tag, false, lockmode, false, false);
/* Make sure syscaches are up-to-date with any changes we waited for */
AcceptInvalidationMessages();
} }
/* /*
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.17 2006/03/05 15:58:46 momjian Exp $ * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.18 2006/05/04 16:07:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -768,25 +768,49 @@ AtEOXact_UpdateFlatFiles(bool isCommit) ...@@ -768,25 +768,49 @@ AtEOXact_UpdateFlatFiles(bool isCommit)
CommandCounterIncrement(); CommandCounterIncrement();
/* /*
* We use ExclusiveLock to ensure that only one backend writes the flat * Open and lock the needed catalog(s).
* file(s) at a time. That's sufficient because it's okay to allow plain
* reads of the tables in parallel. There is some chance of a deadlock
* here (if we were triggered by a user update of one of the tables, which
* likely won't have gotten a strong enough lock), so get the locks we
* need before writing anything.
* *
* For writing the auth file, it's sufficient to ExclusiveLock pg_authid; * Even though we only need AccessShareLock, this could theoretically fail
* we take just regular AccessShareLock on pg_auth_members. * due to deadlock. In practice, however, our transaction already holds
* RowExclusiveLock or better (it couldn't have updated the catalog
* without such a lock). This implies that dbcommands.c and other places
* that force flat-file updates must not follow the common practice of
* dropping catalog locks before commit.
*/ */
if (database_file_update_subid != InvalidSubTransactionId) if (database_file_update_subid != InvalidSubTransactionId)
drel = heap_open(DatabaseRelationId, ExclusiveLock); drel = heap_open(DatabaseRelationId, AccessShareLock);
if (auth_file_update_subid != InvalidSubTransactionId) if (auth_file_update_subid != InvalidSubTransactionId)
{ {
arel = heap_open(AuthIdRelationId, ExclusiveLock); arel = heap_open(AuthIdRelationId, AccessShareLock);
mrel = heap_open(AuthMemRelationId, AccessShareLock); mrel = heap_open(AuthMemRelationId, AccessShareLock);
} }
/*
* Obtain special locks to ensure that two transactions don't try to write
* the same flat file concurrently. Quite aside from any direct risks of
* corrupted output, the winning writer probably wouldn't have seen the
* other writer's updates. By taking a lock and holding it till commit,
* we ensure that whichever updater goes second will see the other
* updater's changes as committed, and thus the final state of the file
* will include all updates.
*
* We use a lock on "database 0" to protect writing the pg_database flat
* file, and a lock on "role 0" to protect the auth file. This is a bit
* ugly but it's not worth inventing any more-general convention. (Any
* two locktags that are never used for anything else would do.)
*
* This is safe against deadlock as long as these are the very last locks
* acquired during the transaction.
*/
if (database_file_update_subid != InvalidSubTransactionId)
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
if (auth_file_update_subid != InvalidSubTransactionId)
LockSharedObject(AuthIdRelationId, InvalidOid, 0,
AccessExclusiveLock);
/* Okay to write the files */ /* Okay to write the files */
if (database_file_update_subid != InvalidSubTransactionId) if (database_file_update_subid != InvalidSubTransactionId)
{ {
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.165 2006/05/03 22:45:26 tgl Exp $ * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.166 2006/05/04 16:07:29 tgl Exp $
* *
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
...@@ -16,14 +16,10 @@ ...@@ -16,14 +16,10 @@
#include "postgres.h" #include "postgres.h"
#include <fcntl.h> #include <fcntl.h>
#include <sys/file.h>
#include <math.h>
#include <unistd.h> #include <unistd.h>
#include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/pg_database.h" #include "catalog/pg_database.h"
...@@ -42,7 +38,6 @@ ...@@ -42,7 +38,6 @@
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/flatfiles.h" #include "utils/flatfiles.h"
#include "utils/fmgroids.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/portal.h" #include "utils/portal.h"
#include "utils/relcache.h" #include "utils/relcache.h"
...@@ -51,7 +46,7 @@ ...@@ -51,7 +46,7 @@
static bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace); static bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
static void ReverifyMyDatabase(const char *name, bool am_superuser); static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void); static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg); static void ShutdownPostgres(int code, Datum arg);
static bool ThereIsAtLeastOneRole(void); static bool ThereIsAtLeastOneRole(void);
...@@ -72,7 +67,7 @@ static bool ThereIsAtLeastOneRole(void); ...@@ -72,7 +67,7 @@ static bool ThereIsAtLeastOneRole(void);
* file" copy of pg_database that is helpfully maintained by flatfiles.c. * file" copy of pg_database that is helpfully maintained by flatfiles.c.
* This is subject to various race conditions, so after we have the * This is subject to various race conditions, so after we have the
* transaction infrastructure started, we have to recheck the information; * transaction infrastructure started, we have to recheck the information;
* see ReverifyMyDatabase. * see InitPostgres.
*/ */
static bool static bool
FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace) FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace)
...@@ -108,76 +103,35 @@ FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace) ...@@ -108,76 +103,35 @@ FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace)
} }
/* /*
* ReverifyMyDatabase -- recheck info obtained by FindMyDatabase * CheckMyDatabase -- fetch information from the pg_database entry for our DB
*
* Since FindMyDatabase cannot lock pg_database, the information it read
* could be stale; for example we might have attached to a database that's in
* process of being destroyed by dropdb(). This routine is called after
* we have all the locking and other infrastructure running --- now we can
* check that we are really attached to a valid database.
*
* In reality, if dropdb() is running in parallel with our startup,
* it's pretty likely that we will have failed before now, due to being
* unable to read some of the system tables within the doomed database.
* This routine just exists to make *sure* we have not started up in an
* invalid database. If we quit now, we should have managed to avoid
* creating any serious problems.
*
* This is also a handy place to fetch the database encoding info out
* of pg_database.
*
* To avoid having to read pg_database more times than necessary
* during session startup, this place is also fitting to check CONNECT
* privilege and set up any database-specific configuration variables.
*/ */
static void static void
ReverifyMyDatabase(const char *name, bool am_superuser) CheckMyDatabase(const char *name, bool am_superuser)
{ {
Relation pgdbrel;
SysScanDesc pgdbscan;
ScanKeyData key;
HeapTuple tup; HeapTuple tup;
Form_pg_database dbform; Form_pg_database dbform;
/* /* Fetch our real pg_database row */
* Because we grab RowShareLock here, we can be sure that dropdb() is not tup = SearchSysCache(DATABASEOID,
* running in parallel with us (any more). ObjectIdGetDatum(MyDatabaseId),
*/ 0, 0, 0);
pgdbrel = heap_open(DatabaseRelationId, RowShareLock); if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for database %u", MyDatabaseId);
ScanKeyInit(&key, dbform = (Form_pg_database) GETSTRUCT(tup);
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(name));
pgdbscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
SnapshotNow, 1, &key);
tup = systable_getnext(pgdbscan);
if (!HeapTupleIsValid(tup) ||
HeapTupleGetOid(tup) != MyDatabaseId)
{
/* OOPS */
heap_close(pgdbrel, RowShareLock);
/* /* This recheck is strictly paranoia */
* The only real problem I could have created is to load dirty buffers if (strcmp(name, NameStr(dbform->datname)) != 0)
* for the dead database into shared buffer cache; if I did, some
* other backend will eventually try to write them and die in
* mdblindwrt. Flush any such pages to forestall trouble.
*/
DropDatabaseBuffers(MyDatabaseId);
/* Now I can commit hara-kiri with a clear conscience... */
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\", OID %u, has disappeared from pg_database", errmsg("database \"%s\" has disappeared from pg_database",
name, MyDatabaseId))); name),
} errdetail("Database OID %u now seems to belong to \"%s\".",
MyDatabaseId, NameStr(dbform->datname))));
dbform = (Form_pg_database) GETSTRUCT(tup);
/* /*
* These next checks are not enforced when in standalone mode, so that * Check permissions to connect to the database.
*
* These checks are not enforced when in standalone mode, so that
* there is a way to recover from disabling all access to all databases, * there is a way to recover from disabling all access to all databases,
* for example "UPDATE pg_database SET datallowconn = false;". * for example "UPDATE pg_database SET datallowconn = false;".
* *
...@@ -246,8 +200,8 @@ ReverifyMyDatabase(const char *name, bool am_superuser) ...@@ -246,8 +200,8 @@ ReverifyMyDatabase(const char *name, bool am_superuser)
Datum datum; Datum datum;
bool isnull; bool isnull;
datum = heap_getattr(tup, Anum_pg_database_datconfig, datum = SysCacheGetAttr(DATABASEOID, tup, Anum_pg_database_datconfig,
RelationGetDescr(pgdbrel), &isnull); &isnull);
if (!isnull) if (!isnull)
{ {
ArrayType *a = DatumGetArrayTypeP(datum); ArrayType *a = DatumGetArrayTypeP(datum);
...@@ -256,8 +210,7 @@ ReverifyMyDatabase(const char *name, bool am_superuser) ...@@ -256,8 +210,7 @@ ReverifyMyDatabase(const char *name, bool am_superuser)
} }
} }
systable_endscan(pgdbscan); ReleaseSysCache(tup);
heap_close(pgdbrel, RowShareLock);
} }
...@@ -337,9 +290,11 @@ InitPostgres(const char *dbname, const char *username) ...@@ -337,9 +290,11 @@ InitPostgres(const char *dbname, const char *username)
bool bootstrap = IsBootstrapProcessingMode(); bool bootstrap = IsBootstrapProcessingMode();
bool autovacuum = IsAutoVacuumProcess(); bool autovacuum = IsAutoVacuumProcess();
bool am_superuser; bool am_superuser;
char *fullpath;
/* /*
* Set up the global variables holding database id and path. * Set up the global variables holding database id and path. But note
* we won't actually try to touch the database just yet.
* *
* We take a shortcut in the bootstrap case, otherwise we have to look up * We take a shortcut in the bootstrap case, otherwise we have to look up
* the db name in pg_database. * the db name in pg_database.
...@@ -348,55 +303,24 @@ InitPostgres(const char *dbname, const char *username) ...@@ -348,55 +303,24 @@ InitPostgres(const char *dbname, const char *username)
{ {
MyDatabaseId = TemplateDbOid; MyDatabaseId = TemplateDbOid;
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID; MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
SetDatabasePath(GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace));
} }
else else
{ {
char *fullpath;
/*
* Formerly we validated DataDir here, but now that's done earlier.
*/
/* /*
* Find oid and tablespace of the database we're about to open. Since * Find oid and tablespace of the database we're about to open. Since
* we're not yet up and running we have to use the hackish * we're not yet up and running we have to use the hackish
* FindMyDatabase. * FindMyDatabase, which looks in the flat-file copy of pg_database.
*/ */
if (!FindMyDatabase(dbname, &MyDatabaseId, &MyDatabaseTableSpace)) if (!FindMyDatabase(dbname, &MyDatabaseId, &MyDatabaseTableSpace))
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", errmsg("database \"%s\" does not exist",
dbname))); dbname)));
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
/* Verify the database path */
if (access(fullpath, F_OK) == -1)
{
if (errno == ENOENT)
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist",
dbname),
errdetail("The database subdirectory \"%s\" is missing.",
fullpath)));
else
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not access directory \"%s\": %m",
fullpath)));
}
ValidatePgVersion(fullpath);
SetDatabasePath(fullpath);
} }
/* fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
* Code after this point assumes we are in the proper directory!
*/ SetDatabasePath(fullpath);
/* /*
* Finish filling in the PGPROC struct, and add it to the ProcArray. * Finish filling in the PGPROC struct, and add it to the ProcArray.
...@@ -459,10 +383,78 @@ InitPostgres(const char *dbname, const char *username) ...@@ -459,10 +383,78 @@ InitPostgres(const char *dbname, const char *username)
*/ */
on_shmem_exit(ShutdownPostgres, 0); on_shmem_exit(ShutdownPostgres, 0);
/* start a new transaction here before access to db */ /*
* Start a new transaction here before first access to db
*/
if (!bootstrap) if (!bootstrap)
StartTransactionCommand(); StartTransactionCommand();
/*
* Now that we have a transaction, we can take locks. Take a writer's
* lock on the database we are trying to connect to. If there is
* a concurrently running DROP DATABASE on that database, this will
* block us until it finishes (and has updated the flat file copy
* of pg_database).
*
* Note that the lock is not held long, only until the end of this
* startup transaction. This is OK since we are already advertising
* our use of the database in the PGPROC array; anyone trying a DROP
* DATABASE after this point will see us there.
*
* Note: use of RowExclusiveLock here is reasonable because we envision
* our session as being a concurrent writer of the database. If we had
* a way of declaring a session as being guaranteed-read-only, we could
* use AccessShareLock for such sessions and thereby not conflict against
* CREATE DATABASE.
*/
if (!bootstrap)
LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
RowExclusiveLock);
/*
* Recheck the flat file copy of pg_database to make sure the target
* database hasn't gone away. If there was a concurrent DROP DATABASE,
* this ensures we will die cleanly without creating a mess.
*/
if (!bootstrap)
{
Oid dbid2;
Oid tsid2;
if (!FindMyDatabase(dbname, &dbid2, &tsid2) ||
dbid2 != MyDatabaseId || tsid2 != MyDatabaseTableSpace)
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist",
dbname),
errdetail("It seems to have just been dropped or renamed.")));
}
/*
* Now we should be able to access the database directory safely.
* Verify it's there and looks reasonable.
*/
if (!bootstrap)
{
if (access(fullpath, F_OK) == -1)
{
if (errno == ENOENT)
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist",
dbname),
errdetail("The database subdirectory \"%s\" is missing.",
fullpath)));
else
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not access directory \"%s\": %m",
fullpath)));
}
ValidatePgVersion(fullpath);
}
/* /*
* It's now possible to do real access to the system catalogs. * It's now possible to do real access to the system catalogs.
* *
...@@ -499,22 +491,21 @@ InitPostgres(const char *dbname, const char *username) ...@@ -499,22 +491,21 @@ InitPostgres(const char *dbname, const char *username)
am_superuser = superuser(); am_superuser = superuser();
} }
/* set up ACL framework (so ReverifyMyDatabase can check permissions) */ /* set up ACL framework (so CheckMyDatabase can check permissions) */
initialize_acl(); initialize_acl();
/* /*
* Unless we are bootstrapping, double-check that InitMyDatabaseInfo() got * Read the real pg_database row for our database, check permissions
* a correct result. We can't do this until all the database-access * and set up database-specific GUC settings. We can't do this until all
* infrastructure is up. (Also, it wants to know if the user is a * the database-access infrastructure is up. (Also, it wants to know if
* superuser, so the above stuff has to happen first.) * the user is a superuser, so the above stuff has to happen first.)
*/ */
if (!bootstrap) if (!bootstrap)
ReverifyMyDatabase(dbname, am_superuser); CheckMyDatabase(dbname, am_superuser);
/* /*
* Final phase of relation cache startup: write a new cache file if * Final phase of relation cache startup: write a new cache file if
* necessary. This is done after ReverifyMyDatabase to avoid writing a * necessary. (XXX this could be folded back into Phase2)
* cache file into a dead database.
*/ */
RelationCacheInitializePhase3(); RelationCacheInitializePhase3();
...@@ -530,7 +521,7 @@ InitPostgres(const char *dbname, const char *username) ...@@ -530,7 +521,7 @@ InitPostgres(const char *dbname, const char *username)
/* /*
* Initialize various default states that can't be set up until we've * Initialize various default states that can't be set up until we've
* selected the active user and done ReverifyMyDatabase. * selected the active user and gotten the right GUC settings.
*/ */
/* set default namespace search path */ /* set default namespace search path */
...@@ -587,13 +578,13 @@ ThereIsAtLeastOneRole(void) ...@@ -587,13 +578,13 @@ ThereIsAtLeastOneRole(void)
HeapScanDesc scan; HeapScanDesc scan;
bool result; bool result;
pg_authid_rel = heap_open(AuthIdRelationId, AccessExclusiveLock); pg_authid_rel = heap_open(AuthIdRelationId, AccessShareLock);
scan = heap_beginscan(pg_authid_rel, SnapshotNow, 0, NULL); scan = heap_beginscan(pg_authid_rel, SnapshotNow, 0, NULL);
result = (heap_getnext(scan, ForwardScanDirection) != NULL); result = (heap_getnext(scan, ForwardScanDirection) != NULL);
heap_endscan(scan); heap_endscan(scan);
heap_close(pg_authid_rel, AccessExclusiveLock); heap_close(pg_authid_rel, AccessShareLock);
return result; return result;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment