Commit 814f1d8b authored by Thomas Munro's avatar Thomas Munro

Use condition variables for ProcSignalBarriers.

Instead of a poll/sleep loop, use a condition variable for precise
wake-up whenever a backend's pss_barrierGeneration advances.

Discussion: https://postgr.es/m/CA+hUKGLdemy2gBm80kz20GTe6hNVwoErE8KwcJk6-U56oStjtg@mail.gmail.com
parent 8bdb1332
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "storage/proc.h" #include "storage/proc.h"
...@@ -59,10 +60,11 @@ ...@@ -59,10 +60,11 @@
*/ */
typedef struct typedef struct
{ {
pid_t pss_pid; volatile pid_t pss_pid;
sig_atomic_t pss_signalFlags[NUM_PROCSIGNALS]; volatile sig_atomic_t pss_signalFlags[NUM_PROCSIGNALS];
pg_atomic_uint64 pss_barrierGeneration; pg_atomic_uint64 pss_barrierGeneration;
pg_atomic_uint32 pss_barrierCheckMask; pg_atomic_uint32 pss_barrierCheckMask;
ConditionVariable pss_barrierCV;
} ProcSignalSlot; } ProcSignalSlot;
/* /*
...@@ -93,7 +95,7 @@ typedef struct ...@@ -93,7 +95,7 @@ typedef struct
((flags) &= ~(((uint32) 1) << (uint32) (type))) ((flags) &= ~(((uint32) 1) << (uint32) (type)))
static ProcSignalHeader *ProcSignal = NULL; static ProcSignalHeader *ProcSignal = NULL;
static volatile ProcSignalSlot *MyProcSignalSlot = NULL; static ProcSignalSlot *MyProcSignalSlot = NULL;
static bool CheckProcSignal(ProcSignalReason reason); static bool CheckProcSignal(ProcSignalReason reason);
static void CleanupProcSignalState(int status, Datum arg); static void CleanupProcSignalState(int status, Datum arg);
...@@ -142,6 +144,7 @@ ProcSignalShmemInit(void) ...@@ -142,6 +144,7 @@ ProcSignalShmemInit(void)
MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags)); MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags));
pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0); pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0);
ConditionVariableInit(&slot->pss_barrierCV);
} }
} }
} }
...@@ -156,7 +159,7 @@ ProcSignalShmemInit(void) ...@@ -156,7 +159,7 @@ ProcSignalShmemInit(void)
void void
ProcSignalInit(int pss_idx) ProcSignalInit(int pss_idx)
{ {
volatile ProcSignalSlot *slot; ProcSignalSlot *slot;
uint64 barrier_generation; uint64 barrier_generation;
Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots); Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);
...@@ -208,7 +211,7 @@ static void ...@@ -208,7 +211,7 @@ static void
CleanupProcSignalState(int status, Datum arg) CleanupProcSignalState(int status, Datum arg)
{ {
int pss_idx = DatumGetInt32(arg); int pss_idx = DatumGetInt32(arg);
volatile ProcSignalSlot *slot; ProcSignalSlot *slot;
slot = &ProcSignal->psh_slot[pss_idx - 1]; slot = &ProcSignal->psh_slot[pss_idx - 1];
Assert(slot == MyProcSignalSlot); Assert(slot == MyProcSignalSlot);
...@@ -237,6 +240,7 @@ CleanupProcSignalState(int status, Datum arg) ...@@ -237,6 +240,7 @@ CleanupProcSignalState(int status, Datum arg)
* no barrier waits block on it. * no barrier waits block on it.
*/ */
pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
ConditionVariableBroadcast(&slot->pss_barrierCV);
slot->pss_pid = 0; slot->pss_pid = 0;
} }
...@@ -391,13 +395,11 @@ EmitProcSignalBarrier(ProcSignalBarrierType type) ...@@ -391,13 +395,11 @@ EmitProcSignalBarrier(ProcSignalBarrierType type)
void void
WaitForProcSignalBarrier(uint64 generation) WaitForProcSignalBarrier(uint64 generation)
{ {
long timeout = 125L;
Assert(generation <= pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration)); Assert(generation <= pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration));
for (int i = NumProcSignalSlots - 1; i >= 0; i--) for (int i = NumProcSignalSlots - 1; i >= 0; i--)
{ {
volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; ProcSignalSlot *slot = &ProcSignal->psh_slot[i];
uint64 oldval; uint64 oldval;
/* /*
...@@ -409,20 +411,11 @@ WaitForProcSignalBarrier(uint64 generation) ...@@ -409,20 +411,11 @@ WaitForProcSignalBarrier(uint64 generation)
oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
while (oldval < generation) while (oldval < generation)
{ {
int events; ConditionVariableSleep(&slot->pss_barrierCV,
WAIT_EVENT_PROC_SIGNAL_BARRIER);
CHECK_FOR_INTERRUPTS();
events =
WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
timeout, WAIT_EVENT_PROC_SIGNAL_BARRIER);
ResetLatch(MyLatch);
oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
if (events & WL_TIMEOUT)
timeout = Min(timeout * 2, 1000L);
} }
ConditionVariableCancelSleep();
} }
/* /*
...@@ -589,6 +582,7 @@ ProcessProcSignalBarrier(void) ...@@ -589,6 +582,7 @@ ProcessProcSignalBarrier(void)
* next called. * next called.
*/ */
pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, shared_gen); pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, shared_gen);
ConditionVariableBroadcast(&MyProcSignalSlot->pss_barrierCV);
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment