Commit 3e1683d3 authored by Tom Lane's avatar Tom Lane

Fix, or at least ameliorate, bugs in logicalrep_worker_launch().

If we failed to get a background worker slot, the code just walked
away from the logicalrep-worker slot it already had, leaving that
looking like the worker is still starting up.  This led to an indefinite
hang in subscription startup, as reported by Thomas Munro.  We must
release the slot on failure.

Also fix a thinko: we must capture the worker slot's generation before
releasing LogicalRepWorkerLock the first time, else testing to see if
it's changed is pretty meaningless.

BTW, the CHECK_FOR_INTERRUPTS() in WaitForReplicationWorkerAttach is a
ticking time bomb, even without considering the possibility of elog(ERROR)
in one of the other functions it calls.  Really, this entire business needs
a redesign with some actual thought about error recovery.  But for now
I'm just band-aiding the case observed in testing.

Back-patch to v10 where this code was added.

Discussion: https://postgr.es/m/CAEepm=2bP3TBMFBArP6o20AZaRduWjMnjCjt22hSdnA-EvrtCw@mail.gmail.com
parent 4b17c894
...@@ -168,14 +168,11 @@ get_subscription_list(void) ...@@ -168,14 +168,11 @@ get_subscription_list(void)
*/ */
static void static void
WaitForReplicationWorkerAttach(LogicalRepWorker *worker, WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle) BackgroundWorkerHandle *handle)
{ {
BgwHandleStatus status; BgwHandleStatus status;
int rc; int rc;
uint16 generation;
/* Remember generation for future identification. */
generation = worker->generation;
for (;;) for (;;)
{ {
...@@ -282,7 +279,7 @@ logicalrep_workers_find(Oid subid, bool only_running) ...@@ -282,7 +279,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
} }
/* /*
* Start new apply background worker. * Start new apply background worker, if possible.
*/ */
void void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
...@@ -290,6 +287,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, ...@@ -290,6 +287,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
{ {
BackgroundWorker bgw; BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle; BackgroundWorkerHandle *bgw_handle;
uint16 generation;
int i; int i;
int slot = 0; int slot = 0;
LogicalRepWorker *worker = NULL; LogicalRepWorker *worker = NULL;
...@@ -406,6 +404,9 @@ retry: ...@@ -406,6 +404,9 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr; worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time); TIMESTAMP_NOBEGIN(worker->reply_time);
/* Before releasing lock, remember generation for future identification. */
generation = worker->generation;
LWLockRelease(LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock);
/* Register the new dynamic worker. */ /* Register the new dynamic worker. */
...@@ -428,6 +429,12 @@ retry: ...@@ -428,6 +429,12 @@ retry:
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{ {
/* Failed to start worker, so clean up the worker slot. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
Assert(generation == worker->generation);
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
ereport(WARNING, ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"), errmsg("out of background worker slots"),
...@@ -436,7 +443,7 @@ retry: ...@@ -436,7 +443,7 @@ retry:
} }
/* Now wait until it attaches. */ /* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle); WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment