Commit 1632ea43 authored by Alvaro Herrera's avatar Alvaro Herrera

Return ReplicationSlotAcquire API to its original form

Per 96540f80; the awkward API introduced by c6550776 is no
longer needed.

Author: Andres Freund <andres@anarazel.de>
Reviewed-by: default avatarÁlvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20210408020913.zzprrlvqyvlt5cyy@alap3.anarazel.de
parent b676ac44
...@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
(void) ReplicationSlotAcquire(NameStr(*name), SAB_Error); ReplicationSlotAcquire(NameStr(*name), true);
PG_TRY(); PG_TRY();
{ {
......
...@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL; ...@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication int max_replication_slots = 0; /* the maximum number of replication
* slots */ * slots */
static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot); static void ReplicationSlotDropPtr(ReplicationSlot *slot);
...@@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock) ...@@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
/* /*
* Find a previously created slot and mark it as used by this process. * Find a previously created slot and mark it as used by this process.
* *
* The return value is only useful if behavior is SAB_Inquire, in which * An error is raised if nowait is true and the slot is currently in use. If
* it's zero if we successfully acquired the slot, -1 if the slot no longer * nowait is false, we sleep until the slot is released by the owning process.
* exists, or the PID of the owning process otherwise. If behavior is
* SAB_Error, then trying to acquire an owned slot is an error.
* If SAB_Block, we sleep until the slot is released by the owning process.
*/
int
ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
{
return ReplicationSlotAcquireInternal(NULL, name, behavior);
}
/*
* Mark the specified slot as used by this process.
*
* Only one of slot and name can be specified.
* If slot == NULL, search for the slot with the given name.
*
* See comments about the return value in ReplicationSlotAcquire().
*/ */
static int void
ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, ReplicationSlotAcquire(const char *name, bool nowait)
SlotAcquireBehavior behavior)
{ {
ReplicationSlot *s; ReplicationSlot *s;
int active_pid; int active_pid;
AssertArg((slot == NULL) ^ (name == NULL)); AssertArg(name != NULL);
retry: retry:
Assert(MyReplicationSlot == NULL); Assert(MyReplicationSlot == NULL);
...@@ -412,17 +392,15 @@ retry: ...@@ -412,17 +392,15 @@ retry:
* Search for the slot with the specified name if the slot to acquire is * Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out. * not given. If the slot is not found, we either return -1 or error out.
*/ */
s = slot ? slot : SearchNamedReplicationSlot(name, false); s = SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use) if (s == NULL || !s->in_use)
{ {
LWLockRelease(ReplicationSlotControlLock); LWLockRelease(ReplicationSlotControlLock);
if (behavior == SAB_Inquire)
return -1;
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", errmsg("replication slot \"%s\" does not exist",
name ? name : NameStr(slot->data.name)))); name)));
} }
/* /*
...@@ -436,7 +414,7 @@ retry: ...@@ -436,7 +414,7 @@ retry:
* (We may end up not sleeping, but we don't want to do this while * (We may end up not sleeping, but we don't want to do this while
* holding the spinlock.) * holding the spinlock.)
*/ */
if (behavior == SAB_Block) if (!nowait)
ConditionVariablePrepareToSleep(&s->active_cv); ConditionVariablePrepareToSleep(&s->active_cv);
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
...@@ -456,13 +434,11 @@ retry: ...@@ -456,13 +434,11 @@ retry:
*/ */
if (active_pid != MyProcPid) if (active_pid != MyProcPid)
{ {
if (behavior == SAB_Error) if (!nowait)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE), (errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d", errmsg("replication slot \"%s\" is active for PID %d",
NameStr(s->data.name), active_pid))); NameStr(s->data.name), active_pid)));
else if (behavior == SAB_Inquire)
return active_pid;
/* Wait here until we get signaled, and then restart */ /* Wait here until we get signaled, and then restart */
ConditionVariableSleep(&s->active_cv, ConditionVariableSleep(&s->active_cv,
...@@ -470,7 +446,7 @@ retry: ...@@ -470,7 +446,7 @@ retry:
ConditionVariableCancelSleep(); ConditionVariableCancelSleep();
goto retry; goto retry;
} }
else if (behavior == SAB_Block) else if (!nowait)
ConditionVariableCancelSleep(); /* no sleep needed after all */ ConditionVariableCancelSleep(); /* no sleep needed after all */
/* Let everybody know we've modified this slot */ /* Let everybody know we've modified this slot */
...@@ -478,9 +454,6 @@ retry: ...@@ -478,9 +454,6 @@ retry:
/* We made this slot active, so it's ours now. */ /* We made this slot active, so it's ours now. */
MyReplicationSlot = s; MyReplicationSlot = s;
/* success */
return 0;
} }
/* /*
...@@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait) ...@@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
{ {
Assert(MyReplicationSlot == NULL); Assert(MyReplicationSlot == NULL);
(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); ReplicationSlotAcquire(name, nowait);
ReplicationSlotDropAcquired(); ReplicationSlotDropAcquired();
} }
...@@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) ...@@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
WAIT_EVENT_REPLICATION_SLOT_DROP); WAIT_EVENT_REPLICATION_SLOT_DROP);
/* /*
* Re-acquire lock and start over; we expect to invalidate the slot * Re-acquire lock and start over; we expect to invalidate the
* next time (unless another process acquires the slot in the * slot next time (unless another process acquires the slot in the
* meantime). * meantime).
*/ */
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
......
...@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ...@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
/* Acquire the slot so we "own" it */ /* Acquire the slot so we "own" it */
(void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error); ReplicationSlotAcquire(NameStr(*slotname), true);
/* A slot whose restart_lsn has never been reserved cannot be advanced */ /* A slot whose restart_lsn has never been reserved cannot be advanced */
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
......
...@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname) if (cmd->slotname)
{ {
(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot)) if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
...@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) ...@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot); Assert(!MyReplicationSlot);
(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); ReplicationSlotAcquire(cmd->slotname, true);
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR, ereport(ERROR,
......
...@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency ...@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency
RS_TEMPORARY RS_TEMPORARY
} ReplicationSlotPersistency; } ReplicationSlotPersistency;
/* For ReplicationSlotAcquire, q.v. */
typedef enum SlotAcquireBehavior
{
SAB_Error,
SAB_Block,
SAB_Inquire
} SlotAcquireBehavior;
/* /*
* On-Disk data of a replication slot, preserved across restarts. * On-Disk data of a replication slot, preserved across restarts.
*/ */
...@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, ...@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
extern void ReplicationSlotPersist(void); extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDrop(const char *name, bool nowait);
extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior); extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void); extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void); extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void); extern void ReplicationSlotSave(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment