Commit fbb1d7d7 authored by Tom Lane's avatar Tom Lane

Allow CREATE/ALTER DATABASE to manipulate datistemplate and datallowconn.

Historically these database properties could be manipulated only by
manually updating pg_database, which is error-prone and only possible for
superusers.  But there seems no good reason not to allow database owners to
set them for their databases, so invent CREATE/ALTER DATABASE options to do
that.  Adjust a couple of places that were doing it the hard way to use the
commands instead.

Vik Fearing, reviewed by Pavel Stehule
parent 15c82efd
...@@ -540,9 +540,8 @@ set_frozenxids(void) ...@@ -540,9 +540,8 @@ set_frozenxids(void)
*/ */
if (strcmp(datallowconn, "f") == 0) if (strcmp(datallowconn, "f") == 0)
PQclear(executeQueryOrDie(conn_template1, PQclear(executeQueryOrDie(conn_template1,
"UPDATE pg_catalog.pg_database " "ALTER DATABASE %s ALLOW_CONNECTIONS = true",
"SET datallowconn = true " quote_identifier(datname)));
"WHERE datname = '%s'", datname));
conn = connectToServer(&new_cluster, datname); conn = connectToServer(&new_cluster, datname);
...@@ -558,9 +557,8 @@ set_frozenxids(void) ...@@ -558,9 +557,8 @@ set_frozenxids(void)
/* Reset datallowconn flag */ /* Reset datallowconn flag */
if (strcmp(datallowconn, "f") == 0) if (strcmp(datallowconn, "f") == 0)
PQclear(executeQueryOrDie(conn_template1, PQclear(executeQueryOrDie(conn_template1,
"UPDATE pg_catalog.pg_database " "ALTER DATABASE %s ALLOW_CONNECTIONS = false",
"SET datallowconn = false " quote_identifier(datname)));
"WHERE datname = '%s'", datname));
} }
PQclear(dbres); PQclear(dbres);
......
...@@ -25,6 +25,8 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> [ [ WITH ] <rep ...@@ -25,6 +25,8 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> [ [ WITH ] <rep
<phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase> <phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase>
IS_TEMPLATE <replaceable class="PARAMETER">istemplate</replaceable>
ALLOW_CONNECTIONS <replaceable class="PARAMETER">allowconn</replaceable>
CONNECTION LIMIT <replaceable class="PARAMETER">connlimit</replaceable> CONNECTION LIMIT <replaceable class="PARAMETER">connlimit</replaceable>
ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RENAME TO <replaceable>new_name</replaceable> ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
...@@ -107,6 +109,26 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RESET ALL ...@@ -107,6 +109,26 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RESET ALL
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><replaceable class="parameter">istemplate</replaceable></term>
<listitem>
<para>
If true, then this database can be cloned by any user with CREATEDB
privileges; if false, then only superusers or the owner of the
database can clone it.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">allowconn</replaceable></term>
<listitem>
<para>
If false then no one can connect to this database.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">connlimit</replaceable></term> <term><replaceable class="parameter">connlimit</replaceable></term>
<listitem> <listitem>
<para> <para>
......
...@@ -28,6 +28,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable> ...@@ -28,6 +28,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
[ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ] [ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ]
[ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ] [ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ]
[ TABLESPACE [=] <replaceable class="parameter">tablespace_name</replaceable> ] [ TABLESPACE [=] <replaceable class="parameter">tablespace_name</replaceable> ]
[ IS_TEMPLATE [=] <replaceable class="parameter">istemplate</replaceable> ]
[ ALLOW_CONNECTIONS [=] <replaceable class="parameter">allowconn</replaceable> ]
[ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ] ] [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ] ]
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -148,6 +150,28 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable> ...@@ -148,6 +150,28 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><replaceable class="parameter">istemplate</replaceable></term>
<listitem>
<para>
If true, then this database can be cloned by any user with CREATEDB
privileges; if false (the default), then only superusers or the owner
of the database can clone it.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">allowconn</replaceable></term>
<listitem>
<para>
If false then no one can connect to this database. The default is
true, allowing connections (except as restricted by other mechanisms,
such as <literal>GRANT</>/<literal>REVOKE CONNECT</>).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">connlimit</replaceable></term> <term><replaceable class="parameter">connlimit</replaceable></term>
<listitem> <listitem>
<para> <para>
......
...@@ -123,6 +123,8 @@ createdb(const CreatedbStmt *stmt) ...@@ -123,6 +123,8 @@ createdb(const CreatedbStmt *stmt)
DefElem *dencoding = NULL; DefElem *dencoding = NULL;
DefElem *dcollate = NULL; DefElem *dcollate = NULL;
DefElem *dctype = NULL; DefElem *dctype = NULL;
DefElem *distemplate = NULL;
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL; DefElem *dconnlimit = NULL;
char *dbname = stmt->dbname; char *dbname = stmt->dbname;
char *dbowner = NULL; char *dbowner = NULL;
...@@ -131,6 +133,8 @@ createdb(const CreatedbStmt *stmt) ...@@ -131,6 +133,8 @@ createdb(const CreatedbStmt *stmt)
char *dbctype = NULL; char *dbctype = NULL;
char *canonname; char *canonname;
int encoding = -1; int encoding = -1;
bool dbistemplate = false;
bool dballowconnections = true;
int dbconnlimit = -1; int dbconnlimit = -1;
int notherbackends; int notherbackends;
int npreparedxacts; int npreparedxacts;
...@@ -189,6 +193,22 @@ createdb(const CreatedbStmt *stmt) ...@@ -189,6 +193,22 @@ createdb(const CreatedbStmt *stmt)
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
dctype = defel; dctype = defel;
} }
else if (strcmp(defel->defname, "is_template") == 0)
{
if (distemplate)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
distemplate = defel;
}
else if (strcmp(defel->defname, "allow_connections") == 0)
{
if (dallowconnections)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dallowconnections = defel;
}
else if (strcmp(defel->defname, "connection_limit") == 0) else if (strcmp(defel->defname, "connection_limit") == 0)
{ {
if (dconnlimit) if (dconnlimit)
...@@ -244,7 +264,10 @@ createdb(const CreatedbStmt *stmt) ...@@ -244,7 +264,10 @@ createdb(const CreatedbStmt *stmt)
dbcollate = defGetString(dcollate); dbcollate = defGetString(dcollate);
if (dctype && dctype->arg) if (dctype && dctype->arg)
dbctype = defGetString(dctype); dbctype = defGetString(dctype);
if (distemplate && distemplate->arg)
dbistemplate = defGetBoolean(distemplate);
if (dallowconnections && dallowconnections->arg)
dballowconnections = defGetBoolean(dallowconnections);
if (dconnlimit && dconnlimit->arg) if (dconnlimit && dconnlimit->arg)
{ {
dbconnlimit = defGetInt32(dconnlimit); dbconnlimit = defGetInt32(dconnlimit);
...@@ -487,8 +510,8 @@ createdb(const CreatedbStmt *stmt) ...@@ -487,8 +510,8 @@ createdb(const CreatedbStmt *stmt)
DirectFunctionCall1(namein, CStringGetDatum(dbcollate)); DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
new_record[Anum_pg_database_datctype - 1] = new_record[Anum_pg_database_datctype - 1] =
DirectFunctionCall1(namein, CStringGetDatum(dbctype)); DirectFunctionCall1(namein, CStringGetDatum(dbctype));
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
...@@ -1328,7 +1351,11 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) ...@@ -1328,7 +1351,11 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
ScanKeyData scankey; ScanKeyData scankey;
SysScanDesc scan; SysScanDesc scan;
ListCell *option; ListCell *option;
int connlimit = -1; bool dbistemplate = false;
bool dballowconnections = true;
int dbconnlimit = -1;
DefElem *distemplate = NULL;
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL; DefElem *dconnlimit = NULL;
DefElem *dtablespace = NULL; DefElem *dtablespace = NULL;
Datum new_record[Natts_pg_database]; Datum new_record[Natts_pg_database];
...@@ -1340,7 +1367,23 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) ...@@ -1340,7 +1367,23 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
{ {
DefElem *defel = (DefElem *) lfirst(option); DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "connection_limit") == 0) if (strcmp(defel->defname, "is_template") == 0)
{
if (distemplate)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
distemplate = defel;
}
else if (strcmp(defel->defname, "allow_connections") == 0)
{
if (dallowconnections)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dallowconnections = defel;
}
else if (strcmp(defel->defname, "connection_limit") == 0)
{ {
if (dconnlimit) if (dconnlimit)
ereport(ERROR, ereport(ERROR,
...@@ -1380,13 +1423,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) ...@@ -1380,13 +1423,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
return InvalidOid; return InvalidOid;
} }
if (distemplate && distemplate->arg)
dbistemplate = defGetBoolean(distemplate);
if (dallowconnections && dallowconnections->arg)
dballowconnections = defGetBoolean(dallowconnections);
if (dconnlimit && dconnlimit->arg) if (dconnlimit && dconnlimit->arg)
{ {
connlimit = defGetInt32(dconnlimit); dbconnlimit = defGetInt32(dconnlimit);
if (connlimit < -1) if (dbconnlimit < -1)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid connection limit: %d", connlimit))); errmsg("invalid connection limit: %d", dbconnlimit)));
} }
/* /*
...@@ -1413,6 +1460,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) ...@@ -1413,6 +1460,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
stmt->dbname); stmt->dbname);
/*
* In order to avoid getting locked out and having to go through
* standalone mode, we refuse to disallow connections to the database
* we're currently connected to. Lockout can still happen with concurrent
* sessions but the likeliness of that is not high enough to worry about.
*/
if (!dballowconnections && dboid == MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot disallow connections for current database")));
/* /*
* Build an updated tuple, perusing the information just obtained * Build an updated tuple, perusing the information just obtained
*/ */
...@@ -1420,9 +1478,19 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel) ...@@ -1420,9 +1478,19 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
MemSet(new_record_nulls, false, sizeof(new_record_nulls)); MemSet(new_record_nulls, false, sizeof(new_record_nulls));
MemSet(new_record_repl, false, sizeof(new_record_repl)); MemSet(new_record_repl, false, sizeof(new_record_repl));
if (distemplate)
{
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
new_record_repl[Anum_pg_database_datistemplate - 1] = true;
}
if (dallowconnections)
{
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
new_record_repl[Anum_pg_database_datallowconn - 1] = true;
}
if (dconnlimit) if (dconnlimit)
{ {
new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit); new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record_repl[Anum_pg_database_datconnlimit - 1] = true; new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
} }
......
...@@ -2288,11 +2288,7 @@ make_template0(void) ...@@ -2288,11 +2288,7 @@ make_template0(void)
PG_CMD_DECL; PG_CMD_DECL;
const char **line; const char **line;
static const char *template0_setup[] = { static const char *template0_setup[] = {
"CREATE DATABASE template0;\n", "CREATE DATABASE template0 IS_TEMPLATE = true ALLOW_CONNECTIONS = false;\n",
"UPDATE pg_database SET "
" datistemplate = 't', "
" datallowconn = 'f' "
" WHERE datname = 'template0';\n",
/* /*
* We use the OID of template0 to determine lastsysoid * We use the OID of template0 to determine lastsysoid
......
...@@ -1374,19 +1374,15 @@ dumpCreateDB(PGconn *conn) ...@@ -1374,19 +1374,15 @@ dumpCreateDB(PGconn *conn)
appendPQExpBuffer(buf, " TABLESPACE = %s", appendPQExpBuffer(buf, " TABLESPACE = %s",
fmtId(dbtablespace)); fmtId(dbtablespace));
if (strcmp(dbistemplate, "t") == 0)
appendPQExpBuffer(buf, " IS_TEMPLATE = true");
if (strcmp(dbconnlimit, "-1") != 0) if (strcmp(dbconnlimit, "-1") != 0)
appendPQExpBuffer(buf, " CONNECTION LIMIT = %s", appendPQExpBuffer(buf, " CONNECTION LIMIT = %s",
dbconnlimit); dbconnlimit);
appendPQExpBufferStr(buf, ";\n"); appendPQExpBufferStr(buf, ";\n");
if (strcmp(dbistemplate, "t") == 0)
{
appendPQExpBufferStr(buf, "UPDATE pg_catalog.pg_database SET datistemplate = 't' WHERE datname = ");
appendStringLiteralConn(buf, dbname, conn);
appendPQExpBufferStr(buf, ";\n");
}
if (binary_upgrade) if (binary_upgrade)
{ {
appendPQExpBufferStr(buf, "-- For binary upgrade, set datfrozenxid.\n"); appendPQExpBufferStr(buf, "-- For binary upgrade, set datfrozenxid.\n");
......
...@@ -1021,7 +1021,8 @@ psql_completion(const char *text, int start, int end) ...@@ -1021,7 +1021,8 @@ psql_completion(const char *text, int start, int end)
pg_strcasecmp(prev2_wd, "DATABASE") == 0) pg_strcasecmp(prev2_wd, "DATABASE") == 0)
{ {
static const char *const list_ALTERDATABASE[] = static const char *const list_ALTERDATABASE[] =
{"RESET", "SET", "OWNER TO", "RENAME TO", "CONNECTION LIMIT", NULL}; {"RESET", "SET", "OWNER TO", "RENAME TO", "IS_TEMPLATE",
"ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
COMPLETE_WITH_LIST(list_ALTERDATABASE); COMPLETE_WITH_LIST(list_ALTERDATABASE);
} }
...@@ -2111,8 +2112,8 @@ psql_completion(const char *text, int start, int end) ...@@ -2111,8 +2112,8 @@ psql_completion(const char *text, int start, int end)
pg_strcasecmp(prev2_wd, "DATABASE") == 0) pg_strcasecmp(prev2_wd, "DATABASE") == 0)
{ {
static const char *const list_DATABASE[] = static const char *const list_DATABASE[] =
{"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "CONNECTION LIMIT", {"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "IS_TEMPLATE",
NULL}; "ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
COMPLETE_WITH_LIST(list_DATABASE); COMPLETE_WITH_LIST(list_DATABASE);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment