Commit 723d0184 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Use a latch to make startup process wake up and replay immediately when

new WAL arrives via streaming replication. This reduces the latency, and
also allows us to use a longer polling interval, which is good for energy
efficiency.

We still need to poll to check for the appearance of a trigger file, but
the interval is now 5 seconds (instead of 100ms), like when waiting for
a new WAL segment to appear in WAL archive.
parent 236b6bc2
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.434 2010/08/30 15:37:41 sriggs Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.435 2010/09/15 10:35:05 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "storage/smgr.h" #include "storage/smgr.h"
...@@ -392,6 +393,13 @@ typedef struct XLogCtlData ...@@ -392,6 +393,13 @@ typedef struct XLogCtlData
*/ */
bool SharedRecoveryInProgress; bool SharedRecoveryInProgress;
/*
* recoveryWakeupLatch is used to wake up the startup process to
* continue WAL replay, if it is waiting for WAL to arrive or failover
* trigger file to appear.
*/
Latch recoveryWakeupLatch;
/* /*
* During recovery, we keep a copy of the latest checkpoint record here. * During recovery, we keep a copy of the latest checkpoint record here.
* Used by the background writer when it wants to create a restartpoint. * Used by the background writer when it wants to create a restartpoint.
...@@ -4840,6 +4848,7 @@ XLOGShmemInit(void) ...@@ -4840,6 +4848,7 @@ XLOGShmemInit(void)
XLogCtl->SharedRecoveryInProgress = true; XLogCtl->SharedRecoveryInProgress = true;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->info_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
/* /*
* If we are not in bootstrap mode, pg_control should already exist. Read * If we are not in bootstrap mode, pg_control should already exist. Read
...@@ -5814,6 +5823,13 @@ StartupXLOG(void) ...@@ -5814,6 +5823,13 @@ StartupXLOG(void)
(errmsg("starting archive recovery"))); (errmsg("starting archive recovery")));
} }
/*
* Take ownership of the wakup latch if we're going to sleep during
* recovery.
*/
if (StandbyMode)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
if (read_backup_label(&checkPointLoc)) if (read_backup_label(&checkPointLoc))
{ {
/* /*
...@@ -6274,6 +6290,13 @@ StartupXLOG(void) ...@@ -6274,6 +6290,13 @@ StartupXLOG(void)
if (WalRcvInProgress()) if (WalRcvInProgress())
elog(PANIC, "wal receiver still active"); elog(PANIC, "wal receiver still active");
/*
* We don't need the latch anymore. It's not strictly necessary to disown
* it, but let's do it for the sake of tidyness.
*/
if (StandbyMode)
DisownLatch(&XLogCtl->recoveryWakeupLatch);
/* /*
* We are now done reading the xlog from stream. Turn off streaming * We are now done reading the xlog from stream. Turn off streaming
* recovery to force fetching the files (which would be required at end of * recovery to force fetching the files (which would be required at end of
...@@ -9139,6 +9162,13 @@ startupproc_quickdie(SIGNAL_ARGS) ...@@ -9139,6 +9162,13 @@ startupproc_quickdie(SIGNAL_ARGS)
} }
/* SIGUSR1: let latch facility handle the signal */
static void
StartupProcSigUsr1Handler(SIGNAL_ARGS)
{
latch_sigusr1_handler();
}
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */
static void static void
StartupProcSigHupHandler(SIGNAL_ARGS) StartupProcSigHupHandler(SIGNAL_ARGS)
...@@ -9213,7 +9243,7 @@ StartupProcessMain(void) ...@@ -9213,7 +9243,7 @@ StartupProcessMain(void)
else else
pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN); pqsignal(SIGUSR1, StartupProcSigUsr1Handler);
pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGUSR2, SIG_IGN);
/* /*
...@@ -9397,16 +9427,17 @@ retry: ...@@ -9397,16 +9427,17 @@ retry:
} }
/* /*
* Data not here yet, so check for trigger then sleep. * Data not here yet, so check for trigger then sleep for
* five seconds like in the WAL file polling case below.
*/ */
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
goto triggered; goto triggered;
/* /*
* When streaming is active, we want to react quickly when * Wait for more WAL to arrive, or timeout to be reached
* the next WAL record arrives, so sleep only a bit.
*/ */
pg_usleep(100000L); /* 100ms */ WaitLatch(&XLogCtl->recoveryWakeupLatch, 5000000L);
ResetLatch(&XLogCtl->recoveryWakeupLatch);
} }
else else
{ {
...@@ -9681,3 +9712,13 @@ CheckForStandbyTrigger(void) ...@@ -9681,3 +9712,13 @@ CheckForStandbyTrigger(void)
} }
return false; return false;
} }
/*
* Wake up startup process to replay newly arrived WAL, or to notice that
* failover has been requested.
*/
void
WakeupRecovery(void)
{
SetLatch(&XLogCtl->recoveryWakeupLatch);
}
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.16 2010/07/06 19:18:57 momjian Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.17 2010/09/15 10:35:05 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -529,6 +529,9 @@ XLogWalRcvFlush(void) ...@@ -529,6 +529,9 @@ XLogWalRcvFlush(void)
walrcv->receivedUpto = LogstreamResult.Flush; walrcv->receivedUpto = LogstreamResult.Flush;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Signal the startup process that new WAL has arrived */
WakeupRecovery();
/* Report XLOG streaming progress in PS display */ /* Report XLOG streaming progress in PS display */
if (update_process_title) if (update_process_title)
{ {
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.116 2010/08/12 23:24:54 rhaas Exp $ * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.117 2010/09/15 10:35:05 heikki Exp $
*/ */
#ifndef XLOG_H #ifndef XLOG_H
#define XLOG_H #define XLOG_H
...@@ -303,5 +303,6 @@ extern TimeLineID GetRecoveryTargetTLI(void); ...@@ -303,5 +303,6 @@ extern TimeLineID GetRecoveryTargetTLI(void);
extern void HandleStartupProcInterrupts(void); extern void HandleStartupProcInterrupts(void);
extern void StartupProcessMain(void); extern void StartupProcessMain(void);
extern void WakeupRecovery(void);
#endif /* XLOG_H */ #endif /* XLOG_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment