Commit ff539da3 authored by Simon Riggs's avatar Simon Riggs

Cleanup slots during drop database

Automatically drop all logical replication slots associated with a
database when the database is dropped. Previously we threw an ERROR
if a slot existed. Now we throw ERROR only if a slot is active in
the database being dropped.

Craig Ringer
parent 4d33a7f2
...@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); ...@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
<entry> <entry>
Drops the physical or logical replication slot Drops the physical or logical replication slot
named <parameter>slot_name</parameter>. Same as replication protocol named <parameter>slot_name</parameter>. Same as replication protocol
command <literal>DROP_REPLICATION_SLOT</>. command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
be called when connected to the same database the slot was created on.
</entry> </entry>
</row> </row>
......
...@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are: ...@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
<para> <para>
Drops a replication slot, freeing any reserved server-side resources. If Drops a replication slot, freeing any reserved server-side resources. If
the slot is currently in use by an active connection, this command fails. the slot is currently in use by an active connection, this command fails.
If the slot is a logical slot that was created in a database other than
the database the walsender is connected to, this command fails.
</para> </para>
<variablelist> <variablelist>
<varlistentry> <varlistentry>
......
...@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
errmsg("cannot drop the currently open database"))); errmsg("cannot drop the currently open database")));
/* /*
* Check whether there are, possibly unconnected, logical slots that refer * Check whether there are active logical slots that refer to the
* to the to-be-dropped database. The database lock we are holding * to-be-dropped database. The database lock we are holding prevents the
* prevents the creation of new slots using the database. * creation of new slots using the database or existing slots becoming
* active.
*/ */
if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active)) (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
if (nslots_active)
{
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE), (errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is used by a logical replication slot", errmsg("database \"%s\" is used by an active logical replication slot",
dbname), dbname),
errdetail_plural("There is %d slot, %d of them active.", errdetail_plural("There is %d active slot",
"There are %d slots, %d of them active.", "There are %d active slots",
nslots, nslots_active, nslots_active)));
nslots, nslots_active))); }
/* /*
* Check for other backends in the target database. (Because we hold the * Check for other backends in the target database. (Because we hold the
...@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
*/ */
dropDatabaseDependencies(db_id); dropDatabaseDependencies(db_id);
/*
* Drop db-specific replication slots.
*/
ReplicationSlotsDropDBSlots(db_id);
/* /*
* Drop pages for this database that are in the shared buffer cache. This * Drop pages for this database that are in the shared buffer cache. This
* is important to ensure that no remaining backend tries to write out a * is important to ensure that no remaining backend tries to write out a
...@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record) ...@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
* InitPostgres() cannot fully re-execute concurrently. This * InitPostgres() cannot fully re-execute concurrently. This
* avoids backends re-connecting automatically to same database, * avoids backends re-connecting automatically to same database,
* which can happen in some cases. * which can happen in some cases.
*
* This will lock out walsenders trying to connect to db-specific
* slots for logical decoding too, so it's safe for us to drop slots.
*/ */
LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock); LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
ResolveRecoveryConflictWithDatabase(xlrec->db_id); ResolveRecoveryConflictWithDatabase(xlrec->db_id);
} }
/* Drop any database-specific replication slots */
ReplicationSlotsDropDBSlots(xlrec->db_id);
/* Drop pages for this database that are in the shared buffer cache */ /* Drop pages for this database that are in the shared buffer cache */
DropDatabaseBuffers(xlrec->db_id); DropDatabaseBuffers(xlrec->db_id);
......
...@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) ...@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
return false; return false;
} }
/*
* ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
* passed database oid. The caller should hold an exclusive lock on the
* pg_database oid for the database to prevent creation of new slots on the db
* or replay from existing slots.
*
* This routine isn't as efficient as it could be - but we don't drop databases
* often, especially databases with lots of slots.
*
* Another session that concurrently acquires an existing slot on the target DB
* (most likely to drop it) may cause this function to ERROR. If that happens
* it may have dropped some but not all slots.
*/
void
ReplicationSlotsDropDBSlots(Oid dboid)
{
int i;
if (max_replication_slots <= 0)
return;
restart:
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s;
NameData slotname;
int active_pid;
s = &ReplicationSlotCtl->replication_slots[i];
/* cannot change while ReplicationSlotCtlLock is held */
if (!s->in_use)
continue;
/* only logical slots are database specific, skip */
if (!SlotIsLogical(s))
continue;
/* not our database, skip */
if (s->data.database != dboid)
continue;
/* Claim the slot, as if ReplicationSlotAcquire()ing. */
SpinLockAcquire(&s->mutex);
strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
NameStr(slotname)[NAMEDATALEN-1] = '\0';
active_pid = s->active_pid;
if (active_pid == 0)
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
}
SpinLockRelease(&s->mutex);
/*
* We might fail here if the slot was active. Even though we hold an
* exclusive lock on the database object a logical slot for that DB can
* still be active if it's being dropped by a backend connected to
* another DB or is otherwise acquired.
*
* It's an unlikely race that'll only arise from concurrent user action,
* so we'll just bail out.
*/
if (active_pid)
elog(ERROR, "replication slot %s is in use by pid %d",
NameStr(slotname), active_pid);
/*
* To avoid largely duplicating ReplicationSlotDropAcquired() or
* complicating it with already_locked flags for ProcArrayLock,
* ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
* just release our ReplicationSlotControlLock to drop the slot.
*
* For safety we'll restart our scan from the beginning each
* time we release the lock.
*/
LWLockRelease(ReplicationSlotControlLock);
ReplicationSlotDropAcquired();
goto restart;
}
LWLockRelease(ReplicationSlotControlLock);
/* recompute limits once after all slots are dropped */
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
}
/* /*
* Check whether the server's configuration supports using replication * Check whether the server's configuration supports using replication
......
...@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); ...@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void); extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void StartupReplicationSlots(void); extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void); extern void CheckPointReplicationSlots(void);
......
...@@ -7,7 +7,7 @@ use strict; ...@@ -7,7 +7,7 @@ use strict;
use warnings; use warnings;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 5; use Test::More tests => 16;
# Initialize master node # Initialize master node
my $node_master = get_new_node('master'); my $node_master = get_new_node('master');
...@@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi ...@@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi
is($stdout_sql, $expected, 'got expected output from SQL decoding session'); is($stdout_sql, $expected, 'got expected output from SQL decoding session');
my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"); my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
diag "waiting to replay $endpos"; print "waiting to replay $endpos\n";
my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
chomp($stdout_recv); chomp($stdout_recv);
...@@ -64,5 +64,41 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo ...@@ -64,5 +64,41 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
chomp($stdout_recv); chomp($stdout_recv);
is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
'replaying logical slot from another database fails');
$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
# make sure you can't drop a slot while active
my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
'dropping a DB with inactive logical slots fails');
$pg_recvlogical->kill_kill;
is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
'logical slot still exists');
$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
'dropping a DB with inactive logical slots succeeds');
is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
'logical slot was actually dropped with DB');
# Restarting a node with wal_level = logical that has existing
# slots must succeed, but decoding from those slots must fail.
$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
$node_master->restart;
is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
'restored slot catalog_xmin is nonzero');
is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
'reading from slot with wal_level < logical fails');
is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
'can drop logical slot while wal_level = replica');
is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
# done with the node # done with the node
$node_master->stop; $node_master->stop;
...@@ -15,12 +15,15 @@ ...@@ -15,12 +15,15 @@
# This module uses the first approach to show that timeline following # This module uses the first approach to show that timeline following
# on a logical slot works. # on a logical slot works.
# #
# (For convenience, it also tests some recovery-related operations
# on logical slots).
#
use strict; use strict;
use warnings; use warnings;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 10; use Test::More tests => 13;
use RecursiveCopy; use RecursiveCopy;
use File::Copy; use File::Copy;
use IPC::Run (); use IPC::Run ();
...@@ -50,6 +53,16 @@ $node_master->safe_psql('postgres', ...@@ -50,6 +53,16 @@ $node_master->safe_psql('postgres',
$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
$node_master->safe_psql('postgres', $node_master->safe_psql('postgres',
"INSERT INTO decoding(blah) VALUES ('beforebb');"); "INSERT INTO decoding(blah) VALUES ('beforebb');");
# We also want to verify that DROP DATABASE on a standby with a logical
# slot works. This isn't strictly related to timeline following, but
# the only way to get a logical slot on a standby right now is to use
# the same physical copy trick, so:
$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
$node_master->safe_psql('dropme',
"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
);
$node_master->safe_psql('postgres', 'CHECKPOINT;'); $node_master->safe_psql('postgres', 'CHECKPOINT;');
my $backup_name = 'b1'; my $backup_name = 'b1';
...@@ -68,6 +81,17 @@ $node_replica->append_conf( ...@@ -68,6 +81,17 @@ $node_replica->append_conf(
$node_replica->start; $node_replica->start;
# If we drop 'dropme' on the master, the standby should drop the
# db and associated slot.
is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
'dropped DB with logical slot OK on master');
$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
'dropped DB dropme on standby');
is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
'logical slot was actually dropped on standby');
# Back to testing failover...
$node_master->safe_psql('postgres', $node_master->safe_psql('postgres',
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
); );
...@@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '', ...@@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '',
cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'}, cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
'xmin on physical slot must not be lower than catalog_xmin'); 'xmin on physical slot must not be lower than catalog_xmin');
$node_master->safe_psql('postgres', 'CHECKPOINT');
# Boom, crash # Boom, crash
$node_master->stop('immediate'); $node_master->stop('immediate');
$node_replica->promote; $node_replica->promote;
print "waiting for replica to come up\n";
$node_replica->poll_query_until('postgres', $node_replica->poll_query_until('postgres',
"SELECT NOT pg_is_in_recovery();"); "SELECT NOT pg_is_in_recovery();");
...@@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup', ...@@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
chomp($stdout); chomp($stdout);
is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup'); is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
# We don't need the standby anymore
$node_replica->teardown_node(); $node_replica->teardown_node();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment