Commit e04a9ccd authored by Andres Freund's avatar Andres Freund

Consistency improvements for slot and decoding code.

Change the order of checks in similar functions to be the same; remove
a parameter that's not needed anymore; rename a memory context and
expand a couple of comments.

Per review comments from Amit Kapila
parent 4d92b158
...@@ -6270,7 +6270,7 @@ StartupXLOG(void) ...@@ -6270,7 +6270,7 @@ StartupXLOG(void)
* Initialize replication slots, before there's a chance to remove * Initialize replication slots, before there's a chance to remove
* required resources. * required resources.
*/ */
StartupReplicationSlots(checkPoint.redo); StartupReplicationSlots();
/* /*
* Startup logical state, needs to be setup now so we have proper data * Startup logical state, needs to be setup now so we have proper data
......
...@@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options,
slot = MyReplicationSlot; slot = MyReplicationSlot;
context = AllocSetContextCreate(CurrentMemoryContext, context = AllocSetContextCreate(CurrentMemoryContext,
"Changeset Extraction Context", "Logical Decoding Context",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
......
...@@ -829,7 +829,7 @@ CheckPointReplicationSlots(void) ...@@ -829,7 +829,7 @@ CheckPointReplicationSlots(void)
* needs to be run before we start crash recovery. * needs to be run before we start crash recovery.
*/ */
void void
StartupReplicationSlots(XLogRecPtr checkPointRedo) StartupReplicationSlots(void)
{ {
DIR *replication_dir; DIR *replication_dir;
struct dirent *replication_de; struct dirent *replication_de;
......
...@@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) ...@@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
HeapTuple tuple; HeapTuple tuple;
Datum result; Datum result;
check_permissions(); Assert(!MyReplicationSlot);
CheckSlotRequirements();
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type"); elog(ERROR, "return type must be a row type");
check_permissions();
CheckSlotRequirements();
/* acquire replication slot, this will check for conflicting names */ /* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
...@@ -87,6 +89,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) ...@@ -87,6 +89,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Datum values[2]; Datum values[2];
bool nulls[2]; bool nulls[2];
Assert(!MyReplicationSlot);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type"); elog(ERROR, "return type must be a row type");
...@@ -94,10 +98,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) ...@@ -94,10 +98,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
CheckLogicalDecodingRequirements(); CheckLogicalDecodingRequirements();
Assert(!MyReplicationSlot);
/* /*
* Acquire a logical decoding slot, this will check for conflicting names. * Acquire a logical decoding slot, this will check for conflicting
* names. Initially create it as ephemeral - that allows us to nicely
* handle errors during initialization because it'll get dropped if this
* transaction fails. We'll make it persistent at the end.
*/ */
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
......
...@@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
else else
{ {
CheckLogicalDecodingRequirements(); CheckLogicalDecodingRequirements();
/*
* Initially create the slot as ephemeral - that allows us to nicely
* handle errors during initialization because it'll get dropped if
* this transaction fails. We'll make it persistent at the end.
*/
ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL); ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
} }
...@@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void) ...@@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void)
* If we're using a replication slot we reserve the xmin via that, * If we're using a replication slot we reserve the xmin via that,
* otherwise via the walsender's PGXACT entry. * otherwise via the walsender's PGXACT entry.
* *
* XXX: It might make sense to introduce ephemeral slots and always use * XXX: It might make sense to generalize the ephemeral slot concept and
* the slot mechanism. * always use the slot mechanism to handle the feedback xmin.
*/ */
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
PhysicalReplicationSlotNewXmin(feedbackXmin); PhysicalReplicationSlotNewXmin(feedbackXmin);
......
...@@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); ...@@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void); extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void); extern void CheckSlotRequirements(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment