Commit cd142e03 authored by Amit Kapila's avatar Amit Kapila

Make pg_replication_origin_drop safe against concurrent drops.

Currently, we get the origin id from the name and then drop the origin by
taking ExclusiveLock on ReplicationOriginRelationId. So, two concurrent
sessions can get the id from the name at the same time and then when they
try to drop the origin, one of the sessions will get the either
"tuple concurrently deleted" or "cache lookup failed for replication
origin ..".

To prevent this race condition we do the entire operation under lock. This
obviates the need for replorigin_drop() API and we have removed it so if
any extension authors are using it they need to instead use
replorigin_drop_by_name. See it's usage in pg_replication_origin_drop().

Author: Peter Smith
Reviewed-by: Amit Kapila, Euler Taveira, Petr Jelinek, and Alvaro
Herrera
Discussion: https://www.postgresql.org/message-id/CAHut%2BPuW8DWV5fskkMWWMqzt-x7RPcNQOtJQBp6SdwyRghCk7A%40mail.gmail.com
parent 31c7fb41
...@@ -926,7 +926,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ...@@ -926,7 +926,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ListCell *lc; ListCell *lc;
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
char *err = NULL; char *err = NULL;
RepOriginId originid;
WalReceiverConn *wrconn = NULL; WalReceiverConn *wrconn = NULL;
StringInfoData cmd; StringInfoData cmd;
Form_pg_subscription form; Form_pg_subscription form;
...@@ -1050,9 +1049,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ...@@ -1050,9 +1049,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Remove the origin tracking if exists. */ /* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid); snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true); replorigin_drop_by_name(originname, true, false);
if (originid != InvalidRepOriginId)
replorigin_drop(originid, false);
/* /*
* If there is no slot associated with the subscription, we can finish * If there is no slot associated with the subscription, we can finish
......
...@@ -322,27 +322,15 @@ replorigin_create(char *roname) ...@@ -322,27 +322,15 @@ replorigin_create(char *roname)
return roident; return roident;
} }
/* /*
* Drop replication origin. * Helper function to drop a replication origin.
*
* Needs to be called in a transaction.
*/ */
void static void
replorigin_drop(RepOriginId roident, bool nowait) replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
{ {
HeapTuple tuple; HeapTuple tuple;
Relation rel;
int i; int i;
Assert(IsTransactionState());
/*
* To interlock against concurrent drops, we hold ExclusiveLock on
* pg_replication_origin throughout this function.
*/
rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
/* /*
* First, clean up the slot state info, if there is any matching slot. * First, clean up the slot state info, if there is any matching slot.
*/ */
...@@ -415,11 +403,40 @@ restart: ...@@ -415,11 +403,40 @@ restart:
ReleaseSysCache(tuple); ReleaseSysCache(tuple);
CommandCounterIncrement(); CommandCounterIncrement();
/* now release lock again */
table_close(rel, ExclusiveLock);
} }
/*
* Drop replication origin (by name).
*
* Needs to be called in a transaction.
*/
void
replorigin_drop_by_name(char *name, bool missing_ok, bool nowait)
{
RepOriginId roident;
Relation rel;
Assert(IsTransactionState());
/*
* To interlock against concurrent drops, we hold ExclusiveLock on
* pg_replication_origin till xact commit.
*
* XXX We can optimize this by acquiring the lock on a specific origin by
* using LockSharedObject if required. However, for that, we first to
* acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
* the specific origin and then re-check if the origin still exists.
*/
rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
roident = replorigin_by_name(name, missing_ok);
if (OidIsValid(roident))
replorigin_drop_guts(rel, roident, nowait);
/* We keep the lock on pg_replication_origin until commit */
table_close(rel, NoLock);
}
/* /*
* Lookup replication origin via its oid and return the name. * Lookup replication origin via its oid and return the name.
...@@ -1256,16 +1273,12 @@ Datum ...@@ -1256,16 +1273,12 @@ Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS) pg_replication_origin_drop(PG_FUNCTION_ARGS)
{ {
char *name; char *name;
RepOriginId roident;
replorigin_check_prerequisites(false, false); replorigin_check_prerequisites(false, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
roident = replorigin_by_name(name, false); replorigin_drop_by_name(name, false, true);
Assert(OidIsValid(roident));
replorigin_drop(roident, true);
pfree(name); pfree(name);
......
...@@ -40,7 +40,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; ...@@ -40,7 +40,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
/* API for querying & manipulating replication origins */ /* API for querying & manipulating replication origins */
extern RepOriginId replorigin_by_name(char *name, bool missing_ok); extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
extern RepOriginId replorigin_create(char *name); extern RepOriginId replorigin_create(char *name);
extern void replorigin_drop(RepOriginId roident, bool nowait); extern void replorigin_drop_by_name(char *name, bool missing_ok, bool nowait);
extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
char **roname); char **roname);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment