Commit 1379fd53 authored by Amit Kapila's avatar Amit Kapila

Introduce the 'force' option for the Drop Database command.

This new option terminates the other sessions connected to the target
database and then drop it.  To terminate other sessions, the current user
must have desired permissions (same as pg_terminate_backend()).  We don't
allow to terminate the sessions if prepared transactions, active logical
replication slots or subscriptions are present in the target database.

Author: Pavel Stehule with changes by me
Reviewed-by: Dilip Kumar, Vignesh C, Ibrar Ahmed, Anthony Nowocien,
Ryan Lambert and Amit Kapila
Discussion: https://postgr.es/m/CAP_rwwmLJJbn70vLOZFpxGw3XD7nLB_7+NKz46H5EOO2k5H7OQ@mail.gmail.com
parent 112caf90
...@@ -21,7 +21,11 @@ PostgreSQL documentation ...@@ -21,7 +21,11 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable> DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable> [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
FORCE
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -32,9 +36,11 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable> ...@@ -32,9 +36,11 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
<command>DROP DATABASE</command> drops a database. It removes the <command>DROP DATABASE</command> drops a database. It removes the
catalog entries for the database and deletes the directory catalog entries for the database and deletes the directory
containing the data. It can only be executed by the database owner. containing the data. It can only be executed by the database owner.
Also, it cannot be executed while you or anyone else are connected It cannot be executed while you are connected to the target database.
to the target database. (Connect to <literal>postgres</literal> or any (Connect to <literal>postgres</literal> or any other database to issue this
other database to issue this command.) command.)
Also, if anyone else is connected to the target database, this command will
fail unless you use the <literal>FORCE</literal> option described below.
</para> </para>
<para> <para>
...@@ -64,6 +70,25 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable> ...@@ -64,6 +70,25 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>FORCE</literal></term>
<listitem>
<para>
Attempt to terminate all existing connections to the target database.
It doesn't terminate if prepared transactions, active logical replication
slots or subscriptions are present in the target database.
</para>
<para>
This will fail if the current user has no permissions to terminate other
connections. Required permissions are the same as with
<literal>pg_terminate_backend</literal>, described in
<xref linkend="functions-admin-signal"/>. This will also fail if we
are not able to terminate connections.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</refsect1> </refsect1>
......
...@@ -810,7 +810,7 @@ createdb_failure_callback(int code, Datum arg) ...@@ -810,7 +810,7 @@ createdb_failure_callback(int code, Datum arg)
* DROP DATABASE * DROP DATABASE
*/ */
void void
dropdb(const char *dbname, bool missing_ok) dropdb(const char *dbname, bool missing_ok, bool force)
{ {
Oid db_id; Oid db_id;
bool db_istemplate; bool db_istemplate;
...@@ -910,6 +910,14 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -910,6 +910,14 @@ dropdb(const char *dbname, bool missing_ok)
"There are %d subscriptions.", "There are %d subscriptions.",
nsubscriptions, nsubscriptions))); nsubscriptions, nsubscriptions)));
/*
* Attempt to terminate all existing connections to the target database if
* the user has requested to do so.
*/
if (force)
TerminateOtherDBBackends(db_id);
/* /*
* Check for other backends in the target database. (Because we hold the * Check for other backends in the target database. (Because we hold the
* database lock, no new ones can start after this.) * database lock, no new ones can start after this.)
...@@ -1430,6 +1438,30 @@ movedb_failure_callback(int code, Datum arg) ...@@ -1430,6 +1438,30 @@ movedb_failure_callback(int code, Datum arg)
(void) rmtree(dstpath, true); (void) rmtree(dstpath, true);
} }
/*
* Process options and call dropdb function.
*/
void
DropDatabase(ParseState *pstate, DropdbStmt *stmt)
{
bool force = false;
ListCell *lc;
foreach(lc, stmt->options)
{
DefElem *opt = (DefElem *) lfirst(lc);
if (strcmp(opt->defname, "force") == 0)
force = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
parser_errposition(pstate, opt->location)));
}
dropdb(stmt->dbname, stmt->missing_ok, force);
}
/* /*
* ALTER DATABASE name ... * ALTER DATABASE name ...
......
...@@ -3868,6 +3868,7 @@ _copyDropdbStmt(const DropdbStmt *from) ...@@ -3868,6 +3868,7 @@ _copyDropdbStmt(const DropdbStmt *from)
COPY_STRING_FIELD(dbname); COPY_STRING_FIELD(dbname);
COPY_SCALAR_FIELD(missing_ok); COPY_SCALAR_FIELD(missing_ok);
COPY_NODE_FIELD(options);
return newnode; return newnode;
} }
......
...@@ -1676,6 +1676,7 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b) ...@@ -1676,6 +1676,7 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b)
{ {
COMPARE_STRING_FIELD(dbname); COMPARE_STRING_FIELD(dbname);
COMPARE_SCALAR_FIELD(missing_ok); COMPARE_SCALAR_FIELD(missing_ok);
COMPARE_NODE_FIELD(options);
return true; return true;
} }
......
...@@ -310,6 +310,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); ...@@ -310,6 +310,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <defelt> vac_analyze_option_elem %type <defelt> vac_analyze_option_elem
%type <list> vac_analyze_option_list %type <list> vac_analyze_option_list
%type <node> vac_analyze_option_arg %type <node> vac_analyze_option_arg
%type <defelt> drop_option
%type <boolean> opt_or_replace %type <boolean> opt_or_replace
opt_grant_grant_option opt_grant_admin_option opt_grant_grant_option opt_grant_admin_option
opt_nowait opt_if_exists opt_with_data opt_nowait opt_if_exists opt_with_data
...@@ -406,6 +407,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); ...@@ -406,6 +407,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
TriggerTransitions TriggerReferencing TriggerTransitions TriggerReferencing
publication_name_list publication_name_list
vacuum_relation_list opt_vacuum_relation_list vacuum_relation_list opt_vacuum_relation_list
drop_option_list
%type <list> group_by_list %type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> group_by_item empty_grouping_set rollup_clause cube_clause
...@@ -10213,7 +10215,7 @@ AlterDatabaseSetStmt: ...@@ -10213,7 +10215,7 @@ AlterDatabaseSetStmt:
/***************************************************************************** /*****************************************************************************
* *
* DROP DATABASE [ IF EXISTS ] * DROP DATABASE [ IF EXISTS ] dbname [ [ WITH ] ( options ) ]
* *
* This is implicitly CASCADE, no need for drop behavior * This is implicitly CASCADE, no need for drop behavior
*****************************************************************************/ *****************************************************************************/
...@@ -10223,6 +10225,7 @@ DropdbStmt: DROP DATABASE database_name ...@@ -10223,6 +10225,7 @@ DropdbStmt: DROP DATABASE database_name
DropdbStmt *n = makeNode(DropdbStmt); DropdbStmt *n = makeNode(DropdbStmt);
n->dbname = $3; n->dbname = $3;
n->missing_ok = false; n->missing_ok = false;
n->options = NULL;
$$ = (Node *)n; $$ = (Node *)n;
} }
| DROP DATABASE IF_P EXISTS database_name | DROP DATABASE IF_P EXISTS database_name
...@@ -10230,10 +10233,48 @@ DropdbStmt: DROP DATABASE database_name ...@@ -10230,10 +10233,48 @@ DropdbStmt: DROP DATABASE database_name
DropdbStmt *n = makeNode(DropdbStmt); DropdbStmt *n = makeNode(DropdbStmt);
n->dbname = $5; n->dbname = $5;
n->missing_ok = true; n->missing_ok = true;
n->options = NULL;
$$ = (Node *)n; $$ = (Node *)n;
} }
| DROP DATABASE database_name opt_with '(' drop_option_list ')'
{
DropdbStmt *n = makeNode(DropdbStmt);
n->dbname = $3;
n->missing_ok = false;
n->options = $6;
$$ = (Node *)n;
}
| DROP DATABASE IF_P EXISTS database_name opt_with '(' drop_option_list ')'
{
DropdbStmt *n = makeNode(DropdbStmt);
n->dbname = $5;
n->missing_ok = true;
n->options = $8;
$$ = (Node *)n;
}
;
drop_option_list:
drop_option
{
$$ = list_make1((Node *) $1);
}
| drop_option_list ',' drop_option
{
$$ = lappend($1, (Node *) $3);
}
; ;
/*
* Currently only the FORCE option is supported, but the syntax is designed
* to be extensible so that we can add more options in the future if required.
*/
drop_option:
FORCE
{
$$ = makeDefElem("force", NULL, @1);
}
;
/***************************************************************************** /*****************************************************************************
* *
......
...@@ -52,6 +52,8 @@ ...@@ -52,6 +52,8 @@
#include "access/xact.h" #include "access/xact.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "storage/proc.h" #include "storage/proc.h"
...@@ -2970,6 +2972,118 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) ...@@ -2970,6 +2972,118 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
return true; /* timed out, still conflicts */ return true; /* timed out, still conflicts */
} }
/*
* Terminate existing connections to the specified database. This routine
* is used by the DROP DATABASE command when user has asked to forcefully
* drop the database.
*
* The current backend is always ignored; it is caller's responsibility to
* check whether the current backend uses the given DB, if it's important.
*
* It doesn't allow to terminate the connections even if there is a one
* backend with the prepared transaction in the target database.
*/
void
TerminateOtherDBBackends(Oid databaseId)
{
ProcArrayStruct *arrayP = procArray;
List *pids = NIL;
int nprepared = 0;
int i;
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < procArray->numProcs; i++)
{
int pgprocno = arrayP->pgprocnos[i];
PGPROC *proc = &allProcs[pgprocno];
if (proc->databaseId != databaseId)
continue;
if (proc == MyProc)
continue;
if (proc->pid != 0)
pids = lappend_int(pids, proc->pid);
else
nprepared++;
}
LWLockRelease(ProcArrayLock);
if (nprepared > 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is being used by prepared transaction",
get_database_name(databaseId)),
errdetail_plural("There is %d prepared transaction using the database.",
"There are %d prepared transactions using the database.",
nprepared,
nprepared)));
if (pids)
{
ListCell *lc;
/*
* Check whether we have the necessary rights to terminate other
* sessions. We don't terminate any session untill we ensure that we
* have rights on all the sessions to be terminated. These checks are
* the same as we do in pg_terminate_backend.
*
* In this case we don't raise some warnings - like "PID %d is not a
* PostgreSQL server process", because for us already finished session
* is not a problem.
*/
foreach(lc, pids)
{
int pid = lfirst_int(lc);
PGPROC *proc = BackendPidGetProc(pid);
if (proc != NULL)
{
/* Only allow superusers to signal superuser-owned backends. */
if (superuser_arg(proc->roleId) && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be a superuser to terminate superuser process"))));
/* Users can signal backends they have role membership in. */
if (!has_privs_of_role(GetUserId(), proc->roleId) &&
!has_privs_of_role(GetUserId(), DEFAULT_ROLE_SIGNAL_BACKENDID))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be a member of the role whose process is being terminated or member of pg_signal_backend"))));
}
}
/*
* There's a race condition here: once we release the ProcArrayLock,
* it's possible for the session to exit before we issue kill. That
* race condition possibility seems too unlikely to worry about. See
* pg_signal_backend.
*/
foreach(lc, pids)
{
int pid = lfirst_int(lc);
PGPROC *proc = BackendPidGetProc(pid);
if (proc != NULL)
{
/*
* If we have setsid(), signal the backend's whole process
* group
*/
#ifdef HAVE_SETSID
(void) kill(-pid, SIGTERM);
#else
(void) kill(pid, SIGTERM);
#endif
}
}
}
}
/* /*
* ProcArraySetReplicationSlotXmin * ProcArraySetReplicationSlotXmin
* *
......
...@@ -595,13 +595,9 @@ standard_ProcessUtility(PlannedStmt *pstmt, ...@@ -595,13 +595,9 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break; break;
case T_DropdbStmt: case T_DropdbStmt:
{ /* no event triggers for global objects */
DropdbStmt *stmt = (DropdbStmt *) parsetree; PreventInTransactionBlock(isTopLevel, "DROP DATABASE");
DropDatabase(pstate, (DropdbStmt *) parsetree);
/* no event triggers for global objects */
PreventInTransactionBlock(isTopLevel, "DROP DATABASE");
dropdb(stmt->dbname, stmt->missing_ok);
}
break; break;
/* Query-level asynchronous notification */ /* Query-level asynchronous notification */
......
...@@ -2844,6 +2844,10 @@ psql_completion(const char *text, int start, int end) ...@@ -2844,6 +2844,10 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH_FUNCTION_ARG(prev2_wd); COMPLETE_WITH_FUNCTION_ARG(prev2_wd);
else if (Matches("DROP", "FOREIGN")) else if (Matches("DROP", "FOREIGN"))
COMPLETE_WITH("DATA WRAPPER", "TABLE"); COMPLETE_WITH("DATA WRAPPER", "TABLE");
else if (Matches("DROP", "DATABASE", MatchAny))
COMPLETE_WITH("WITH (");
else if (HeadMatches("DROP", "DATABASE") && (ends_with(prev_wd, '(')))
COMPLETE_WITH("FORCE");
/* DROP INDEX */ /* DROP INDEX */
else if (Matches("DROP", "INDEX")) else if (Matches("DROP", "INDEX"))
......
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
extern Oid createdb(ParseState *pstate, const CreatedbStmt *stmt); extern Oid createdb(ParseState *pstate, const CreatedbStmt *stmt);
extern void dropdb(const char *dbname, bool missing_ok); extern void dropdb(const char *dbname, bool missing_ok, bool force);
extern void DropDatabase(ParseState *pstate, DropdbStmt *stmt);
extern ObjectAddress RenameDatabase(const char *oldname, const char *newname); extern ObjectAddress RenameDatabase(const char *oldname, const char *newname);
extern Oid AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel); extern Oid AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel);
extern Oid AlterDatabaseSet(AlterDatabaseSetStmt *stmt); extern Oid AlterDatabaseSet(AlterDatabaseSetStmt *stmt);
......
...@@ -3145,6 +3145,7 @@ typedef struct DropdbStmt ...@@ -3145,6 +3145,7 @@ typedef struct DropdbStmt
NodeTag type; NodeTag type;
char *dbname; /* database to drop */ char *dbname; /* database to drop */
bool missing_ok; /* skip error if db is missing? */ bool missing_ok; /* skip error if db is missing? */
List *options; /* currently only FORCE is supported */
} DropdbStmt; } DropdbStmt;
/* ---------------------- /* ----------------------
......
...@@ -113,6 +113,7 @@ extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conf ...@@ -113,6 +113,7 @@ extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conf
extern int CountUserBackends(Oid roleid); extern int CountUserBackends(Oid roleid);
extern bool CountOtherDBBackends(Oid databaseId, extern bool CountOtherDBBackends(Oid databaseId,
int *nbackends, int *nprepared); int *nbackends, int *nprepared);
extern void TerminateOtherDBBackends(Oid databaseId);
extern void XidCacheRemoveRunningXids(TransactionId xid, extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids, int nxids, const TransactionId *xids,
......
...@@ -330,3 +330,13 @@ HINT: Specify the argument list to select the routine unambiguously. ...@@ -330,3 +330,13 @@ HINT: Specify the argument list to select the routine unambiguously.
-- cleanup -- cleanup
DROP PROCEDURE test_ambiguous_procname(int); DROP PROCEDURE test_ambiguous_procname(int);
DROP PROCEDURE test_ambiguous_procname(text); DROP PROCEDURE test_ambiguous_procname(text);
-- This test checks both the functionality of 'if exists' and the syntax
-- of the drop database command.
drop database test_database_exists (force);
ERROR: database "test_database_exists" does not exist
drop database test_database_exists with (force);
ERROR: database "test_database_exists" does not exist
drop database if exists test_database_exists (force);
NOTICE: database "test_database_exists" does not exist, skipping
drop database if exists test_database_exists with (force);
NOTICE: database "test_database_exists" does not exist, skipping
...@@ -295,3 +295,10 @@ DROP ROUTINE IF EXISTS test_ambiguous_procname; ...@@ -295,3 +295,10 @@ DROP ROUTINE IF EXISTS test_ambiguous_procname;
-- cleanup -- cleanup
DROP PROCEDURE test_ambiguous_procname(int); DROP PROCEDURE test_ambiguous_procname(int);
DROP PROCEDURE test_ambiguous_procname(text); DROP PROCEDURE test_ambiguous_procname(text);
-- This test checks both the functionality of 'if exists' and the syntax
-- of the drop database command.
drop database test_database_exists (force);
drop database test_database_exists with (force);
drop database if exists test_database_exists (force);
drop database if exists test_database_exists with (force);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment