Commit 5c21ad07 authored by Andres Freund's avatar Andres Freund

Code review for recent slot.c changes.

parent df1a699e
...@@ -410,7 +410,7 @@ ReplicationSlotRelease(void) ...@@ -410,7 +410,7 @@ ReplicationSlotRelease(void)
* Cleanup all temporary slots created in current session. * Cleanup all temporary slots created in current session.
*/ */
void void
ReplicationSlotCleanup() ReplicationSlotCleanup(void)
{ {
int i; int i;
...@@ -802,12 +802,12 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) ...@@ -802,12 +802,12 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
* pg_database oid for the database to prevent creation of new slots on the db * pg_database oid for the database to prevent creation of new slots on the db
* or replay from existing slots. * or replay from existing slots.
* *
* This routine isn't as efficient as it could be - but we don't drop databases
* often, especially databases with lots of slots.
*
* Another session that concurrently acquires an existing slot on the target DB * Another session that concurrently acquires an existing slot on the target DB
* (most likely to drop it) may cause this function to ERROR. If that happens * (most likely to drop it) may cause this function to ERROR. If that happens
* it may have dropped some but not all slots. * it may have dropped some but not all slots.
*
* This routine isn't as efficient as it could be - but we don't drop
* databases often, especially databases with lots of slots.
*/ */
void void
ReplicationSlotsDropDBSlots(Oid dboid) ReplicationSlotsDropDBSlots(Oid dboid)
...@@ -822,7 +822,7 @@ restart: ...@@ -822,7 +822,7 @@ restart:
for (i = 0; i < max_replication_slots; i++) for (i = 0; i < max_replication_slots; i++)
{ {
ReplicationSlot *s; ReplicationSlot *s;
NameData slotname; char *slotname;
int active_pid; int active_pid;
s = &ReplicationSlotCtl->replication_slots[i]; s = &ReplicationSlotCtl->replication_slots[i];
...@@ -839,10 +839,10 @@ restart: ...@@ -839,10 +839,10 @@ restart:
if (s->data.database != dboid) if (s->data.database != dboid)
continue; continue;
/* Claim the slot, as if ReplicationSlotAcquire()ing. */ /* acquire slot, so ReplicationSlotDropAcquired can be reused */
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN); /* can't change while ReplicationSlotControlLock is held */
NameStr(slotname)[NAMEDATALEN-1] = '\0'; slotname = NameStr(s->data.name);
active_pid = s->active_pid; active_pid = s->active_pid;
if (active_pid == 0) if (active_pid == 0)
{ {
...@@ -852,36 +852,32 @@ restart: ...@@ -852,36 +852,32 @@ restart:
SpinLockRelease(&s->mutex); SpinLockRelease(&s->mutex);
/* /*
* We might fail here if the slot was active. Even though we hold an * Even though we hold an exclusive lock on the database object a
* exclusive lock on the database object a logical slot for that DB can * logical slot for that DB can still be active, e.g. if it's
* still be active if it's being dropped by a backend connected to * concurrently being dropped by a backend connected to another DB.
* another DB or is otherwise acquired.
* *
* It's an unlikely race that'll only arise from concurrent user action, * That's fairly unlikely in practice, so we'll just bail out.
* so we'll just bail out.
*/ */
if (active_pid) if (active_pid)
elog(ERROR, "replication slot %s is in use by pid %d", ereport(ERROR,
NameStr(slotname), active_pid); (errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
slotname, active_pid)));
/* /*
* To avoid largely duplicating ReplicationSlotDropAcquired() or * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
* complicating it with already_locked flags for ProcArrayLock, * holding ReplicationSlotControlLock over filesystem operations,
* ReplicationSlotControlLock and ReplicationSlotAllocationLock, we * release ReplicationSlotControlLock and use
* just release our ReplicationSlotControlLock to drop the slot. * ReplicationSlotDropAcquired.
* *
* For safety we'll restart our scan from the beginning each * As that means the set of slots could change, restart scan from the
* time we release the lock. * beginning each time we release the lock.
*/ */
LWLockRelease(ReplicationSlotControlLock); LWLockRelease(ReplicationSlotControlLock);
ReplicationSlotDropAcquired(); ReplicationSlotDropAcquired();
goto restart; goto restart;
} }
LWLockRelease(ReplicationSlotControlLock); LWLockRelease(ReplicationSlotControlLock);
/* recompute limits once after all slots are dropped */
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment