Commit de829ddf authored by Thomas Munro's avatar Thomas Munro

Add condition variable for walreceiver shutdown.

Use this new CV to wait for walreceiver shutdown without a sleep/poll
loop, while also benefiting from standard postmaster death handling.

Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com
parent 600f2f50
...@@ -1766,6 +1766,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1766,6 +1766,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting for confirmation from a remote server during synchronous <entry>Waiting for confirmation from a remote server during synchronous
replication.</entry> replication.</entry>
</row> </row>
<row>
<entry><literal>WalrcvExit</literal></entry>
<entry>Waiting for the walreceiver to exit.</entry>
</row>
<row> <row>
<entry><literal>XactGroupUpdate</literal></entry> <entry><literal>XactGroupUpdate</literal></entry>
<entry>Waiting for the group leader to update transaction status at <entry>Waiting for the group leader to update transaction status at
......
...@@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) ...@@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_SYNC_REP: case WAIT_EVENT_SYNC_REP:
event_name = "SyncRep"; event_name = "SyncRep";
break; break;
case WAIT_EVENT_WALRCV_EXIT:
event_name = "WalrcvExit";
break;
case WAIT_EVENT_XACT_GROUP_UPDATE: case WAIT_EVENT_XACT_GROUP_UPDATE:
event_name = "XactGroupUpdate"; event_name = "XactGroupUpdate";
break; break;
......
...@@ -207,6 +207,7 @@ WalReceiverMain(void) ...@@ -207,6 +207,7 @@ WalReceiverMain(void)
case WALRCV_STOPPED: case WALRCV_STOPPED:
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
proc_exit(1); proc_exit(1);
break; break;
...@@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg) ...@@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg)
walrcv->latch = NULL; walrcv->latch = NULL;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
/* Terminate the connection gracefully. */ /* Terminate the connection gracefully. */
if (wrconn != NULL) if (wrconn != NULL)
walrcv_disconnect(wrconn); walrcv_disconnect(wrconn);
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <signal.h> #include <signal.h>
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "pgstat.h"
#include "postmaster/startup.h" #include "postmaster/startup.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
...@@ -62,6 +63,7 @@ WalRcvShmemInit(void) ...@@ -62,6 +63,7 @@ WalRcvShmemInit(void)
/* First time through, so initialize */ /* First time through, so initialize */
MemSet(WalRcv, 0, WalRcvShmemSize()); MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED; WalRcv->walRcvState = WALRCV_STOPPED;
ConditionVariableInit(&WalRcv->walRcvStoppedCV);
SpinLockInit(&WalRcv->mutex); SpinLockInit(&WalRcv->mutex);
pg_atomic_init_u64(&WalRcv->writtenUpto, 0); pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
WalRcv->latch = NULL; WalRcv->latch = NULL;
...@@ -95,12 +97,18 @@ WalRcvRunning(void) ...@@ -95,12 +97,18 @@ WalRcvRunning(void)
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{ {
SpinLockAcquire(&walrcv->mutex); bool stopped = false;
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING) if (walrcv->walRcvState == WALRCV_STARTING)
{
state = walrcv->walRcvState = WALRCV_STOPPED; state = walrcv->walRcvState = WALRCV_STOPPED;
stopped = true;
}
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
if (stopped)
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
} }
} }
...@@ -140,12 +148,18 @@ WalRcvStreaming(void) ...@@ -140,12 +148,18 @@ WalRcvStreaming(void)
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{ {
SpinLockAcquire(&walrcv->mutex); bool stopped = false;
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING) if (walrcv->walRcvState == WALRCV_STARTING)
{
state = walrcv->walRcvState = WALRCV_STOPPED; state = walrcv->walRcvState = WALRCV_STOPPED;
stopped = true;
}
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
if (stopped)
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
} }
} }
...@@ -165,6 +179,7 @@ ShutdownWalRcv(void) ...@@ -165,6 +179,7 @@ ShutdownWalRcv(void)
{ {
WalRcvData *walrcv = WalRcv; WalRcvData *walrcv = WalRcv;
pid_t walrcvpid = 0; pid_t walrcvpid = 0;
bool stopped = false;
/* /*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
...@@ -178,6 +193,7 @@ ShutdownWalRcv(void) ...@@ -178,6 +193,7 @@ ShutdownWalRcv(void)
break; break;
case WALRCV_STARTING: case WALRCV_STARTING:
walrcv->walRcvState = WALRCV_STOPPED; walrcv->walRcvState = WALRCV_STOPPED;
stopped = true;
break; break;
case WALRCV_STREAMING: case WALRCV_STREAMING:
...@@ -191,6 +207,10 @@ ShutdownWalRcv(void) ...@@ -191,6 +207,10 @@ ShutdownWalRcv(void)
} }
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Unnecessary but consistent. */
if (stopped)
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
/* /*
* Signal walreceiver process if it was still running. * Signal walreceiver process if it was still running.
*/ */
...@@ -201,16 +221,11 @@ ShutdownWalRcv(void) ...@@ -201,16 +221,11 @@ ShutdownWalRcv(void)
* Wait for walreceiver to acknowledge its death by setting state to * Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED. * WALRCV_STOPPED.
*/ */
ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
while (WalRcvRunning()) while (WalRcvRunning())
{ ConditionVariableSleep(&walrcv->walRcvStoppedCV,
/* WAIT_EVENT_WALRCV_EXIT);
* This possibly-long loop needs to handle interrupts of startup ConditionVariableCancelSleep();
* process.
*/
HandleStartupProcInterrupts();
pg_usleep(100000); /* 100ms */
}
} }
/* /*
......
...@@ -1009,6 +1009,7 @@ typedef enum ...@@ -1009,6 +1009,7 @@ typedef enum
WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP, WAIT_EVENT_SYNC_REP,
WAIT_EVENT_WALRCV_EXIT,
WAIT_EVENT_XACT_GROUP_UPDATE WAIT_EVENT_XACT_GROUP_UPDATE
} WaitEventIPC; } WaitEventIPC;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "port/atomics.h" #include "port/atomics.h"
#include "replication/logicalproto.h" #include "replication/logicalproto.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "utils/tuplestore.h" #include "utils/tuplestore.h"
...@@ -62,6 +63,7 @@ typedef struct ...@@ -62,6 +63,7 @@ typedef struct
*/ */
pid_t pid; pid_t pid;
WalRcvState walRcvState; WalRcvState walRcvState;
ConditionVariable walRcvStoppedCV;
pg_time_t startTime; pg_time_t startTime;
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment