Commit 4eafdcc2 authored by Fujii Masao's avatar Fujii Masao

Prevent logical rep workers with removed subscriptions from starting.

Any logical rep workers must have their subscription entries in
pg_subscription. To ensure this, we need to prevent the launcher
from starting new worker corresponding to the subscription that
DROP SUBSCRIPTION command is removing. To implement this,
previously LogicalRepLauncherLock was introduced and held until
the end of transaction running DROP SUBSCRIPTION. But using
LWLock for that purpose was not valid.

Instead, this commit changes DROP SUBSCRIPTION so that it takes
AccessExclusiveLock on pg_subscription, in order to ensure that
the launcher cannot see any subscriptions being removed. Also this
commit gets rid of LogicalRepLauncherLock.

Patch by me, reviewed by Petr Jelinek

Discussion: https://www.postgresql.org/message-id/CAHGQGwHPi8ky-yANFfe0sgmhKtsYcQLTnKx07bW9S7-Rn1746w@mail.gmail.com
parent a9f66f92
...@@ -461,7 +461,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ...@@ -461,7 +461,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (stmt->drop_slot) if (stmt->drop_slot)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
rel = heap_open(SubscriptionRelationId, RowExclusiveLock); /*
* Lock pg_subscription with AccessExclusiveLock to ensure
* that the launcher doesn't restart new worker during dropping
* the subscription
*/
rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname)); CStringGetDatum(stmt->subname));
...@@ -528,14 +533,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ...@@ -528,14 +533,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Clean up dependencies */ /* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Protect against launcher restarting the worker. */
LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
/* Kill the apply worker so that the slot becomes accessible. */ /* Kill the apply worker so that the slot becomes accessible. */
logicalrep_worker_stop(subid); logicalrep_worker_stop(subid);
LWLockRelease(LogicalRepLauncherLock);
/* Remove the origin tracking if exists. */ /* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid); snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true); originid = replorigin_by_name(originname, true);
......
...@@ -305,17 +305,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) ...@@ -305,17 +305,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
/* /*
* Stop the logical replication worker and wait until it detaches from the * Stop the logical replication worker and wait until it detaches from the
* slot. * slot.
*
* The caller must hold LogicalRepLauncherLock to ensure that new workers are
* not being started during this function call.
*/ */
void void
logicalrep_worker_stop(Oid subid) logicalrep_worker_stop(Oid subid)
{ {
LogicalRepWorker *worker; LogicalRepWorker *worker;
Assert(LWLockHeldByMe(LogicalRepLauncherLock));
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid); worker = logicalrep_worker_find(subid);
...@@ -602,9 +597,6 @@ ApplyLauncherMain(Datum main_arg) ...@@ -602,9 +597,6 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
oldctx = MemoryContextSwitchTo(subctx); oldctx = MemoryContextSwitchTo(subctx);
/* Block any concurrent DROP SUBSCRIPTION. */
LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
/* search for subscriptions to start or stop. */ /* search for subscriptions to start or stop. */
sublist = get_subscription_list(); sublist = get_subscription_list();
...@@ -628,8 +620,6 @@ ApplyLauncherMain(Datum main_arg) ...@@ -628,8 +620,6 @@ ApplyLauncherMain(Datum main_arg)
} }
} }
LWLockRelease(LogicalRepLauncherLock);
/* Switch back to original memory context. */ /* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */ /* Clean the temporary memory. */
......
...@@ -48,5 +48,4 @@ ReplicationOriginLock 40 ...@@ -48,5 +48,4 @@ ReplicationOriginLock 40
MultiXactTruncationLock 41 MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42 OldSnapshotTimeMapLock 42
BackendRandomLock 43 BackendRandomLock 43
LogicalRepLauncherLock 44 LogicalRepWorkerLock 44
LogicalRepWorkerLock 45
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment