Commit 52861058 authored by Simon Riggs's avatar Simon Riggs

Cascading replication feature for streaming log-based replication.

Standby servers can now have WALSender processes, which can work with
either WALReceiver or archive_commands to pass data. Fully updated
docs, including new conceptual terms of sending server, upstream and
downstream servers. WALSenders terminated when promote to master.

Fujii Masao, review, rework and doc rewrite by Simon Riggs
parent 3d4890c0
This diff is collapsed.
...@@ -877,8 +877,66 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' ...@@ -877,8 +877,66 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
network delay, or that the standby is under heavy load. network delay, or that the standby is under heavy load.
</para> </para>
</sect3> </sect3>
</sect2>
<sect2 id="cascading-replication">
<title>Cascading Replication</title>
<indexterm zone="high-availability">
<primary>Cascading Replication</primary>
</indexterm>
<para>
The cascading replication feature allows a standby server to accept replication
connections and stream WAL records to other standbys, acting as a relay.
This can be used to reduce the number of direct connections to the master
and also to minimise inter-site bandwidth overheads.
</para>
<para>
A standby acting as both a receiver and a sender is known as a cascading
standby. Standbys that are more directly connected to the master are known
as upstream servers, while those standby servers further away are downstream
servers. Cascading replication does not place limits on the number or
arrangement of downstream servers, though each standby connects to only
one upstream server which eventually links to a single master/primary
server.
</para>
<para>
A cascading standby sends not only WAL records received from the
master but also those restored from the archive. So even if the replication
connection in some upstream connection is terminated, streaming replication
continues downstream for as long as new WAL records are available.
</para>
<para>
Cascading replication is currently asynchronous. Synchronous replication
(see <xref linkend="synchronous-replication">) settings have no effect on
cascading replication at present.
</para>
<para>
Hot Standby feedback propagates upstream, whatever the cascaded arrangement.
</para>
<para>
Promoting a cascading standby terminates the immediate downstream replication
connections which it serves. This is because the timeline becomes different
between standbys, and they can no longer continue replication. The
effected standby(s) may reconnect to reestablish streaming replication.
</para>
<para>
To use cascading replication, set up the cascading standby so that it can
accept replication connections, i.e., set <varname>max_wal_senders</>,
<varname>hot_standby</> and authentication option (see
<xref linkend="streaming-replication"> and <xref linkend="hot-standby">).
Also set <varname>primary_conninfo</> in the downstream standby to point
to the cascading standby.
</para>
</sect2> </sect2>
<sect2 id="synchronous-replication"> <sect2 id="synchronous-replication">
<title>Synchronous Replication</title> <title>Synchronous Replication</title>
...@@ -955,7 +1013,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' ...@@ -955,7 +1013,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
confirmation that the commit record has been received. These parameters confirmation that the commit record has been received. These parameters
allow the administrator to specify which standby servers should be allow the administrator to specify which standby servers should be
synchronous standbys. Note that the configuration of synchronous synchronous standbys. Note that the configuration of synchronous
replication is mainly on the master. replication is mainly on the master. Named standbys must be directly
connected to the master; the master knows nothing about downstream
standby servers using cascaded replication.
</para> </para>
<para> <para>
......
This diff is collapsed.
...@@ -2316,6 +2316,26 @@ reaper(SIGNAL_ARGS) ...@@ -2316,6 +2316,26 @@ reaper(SIGNAL_ARGS)
ReachedNormalRunning = true; ReachedNormalRunning = true;
pmState = PM_RUN; pmState = PM_RUN;
/*
* Kill any walsenders to force the downstream standby(s) to
* reread the timeline history file, adjust their timelines and
* establish replication connections again. This is required
* because the timeline of cascading standby is not consistent
* with that of cascaded one just after failover. We LOG this
* message since we need to leave a record to explain this
* disconnection.
*
* XXX should avoid the need for disconnection. When we do,
* am_cascading_walsender should be replaced with RecoveryInProgress()
*/
if (max_wal_senders > 0)
{
ereport(LOG,
(errmsg("terminating walsender all processes to force cascaded"
"standby(s) to update timeline and reconnect")));
SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
}
/* /*
* Crank up the background writer, if we didn't do that already * Crank up the background writer, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter * when we entered consistent recovery state. It doesn't matter
......
...@@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd) ...@@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd)
MemoryContext old_context; MemoryContext old_context;
basebackup_options opt; basebackup_options opt;
if (am_cascading_walsender)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
parse_basebackup_options(cmd->options, &opt); parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext, backup_context = AllocSetContextCreate(CurrentMemoryContext,
......
...@@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void) ...@@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void)
int priority = 0; int priority = 0;
bool found = false; bool found = false;
/*
* Since synchronous cascade replication is not allowed, we always
* set the priority of cascading walsender to zero.
*/
if (am_cascading_walsender)
return 0;
/* Need a modifiable copy of string */ /* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames); rawstring = pstrdup(SyncRepStandbyNames);
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "replication/walprotocol.h" #include "replication/walprotocol.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/procarray.h" #include "storage/procarray.h"
...@@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying) ...@@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying)
} }
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Signal the startup process that new WAL has arrived */ /* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery(); WakeupRecovery();
if (AllowCascadeReplication())
WalSndWakeup();
/* Report XLOG streaming progress in PS display */ /* Report XLOG streaming progress in PS display */
if (update_process_title) if (update_process_title)
...@@ -625,7 +628,7 @@ XLogWalRcvSendReply(void) ...@@ -625,7 +628,7 @@ XLogWalRcvSendReply(void)
/* Construct a new message */ /* Construct a new message */
reply_message.write = LogstreamResult.Write; reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush; reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr(); reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now; reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
......
...@@ -48,6 +48,7 @@ ...@@ -48,6 +48,7 @@
#include "replication/basebackup.h" #include "replication/basebackup.h"
#include "replication/replnodes.h" #include "replication/replnodes.h"
#include "replication/walprotocol.h" #include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
...@@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL; ...@@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL;
/* Global state */ /* Global state */
bool am_walsender = false; /* Am I a walsender process ? */ bool am_walsender = false; /* Am I a walsender process ? */
bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
/* User-settable parameters for walsender */ /* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
...@@ -135,10 +137,7 @@ WalSenderMain(void) ...@@ -135,10 +137,7 @@ WalSenderMain(void)
{ {
MemoryContext walsnd_context; MemoryContext walsnd_context;
if (RecoveryInProgress()) am_cascading_walsender = RecoveryInProgress();
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("recovery is still in progress, can't accept WAL streaming connections")));
/* Create a per-walsender data structure in shared memory */ /* Create a per-walsender data structure in shared memory */
InitWalSnd(); InitWalSnd();
...@@ -165,6 +164,12 @@ WalSenderMain(void) ...@@ -165,6 +164,12 @@ WalSenderMain(void)
/* Unblock signals (they were blocked when the postmaster forked us) */ /* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
/*
* Use the recovery target timeline ID during recovery
*/
if (am_cascading_walsender)
ThisTimeLineID = GetRecoveryTargetTLI();
/* Tell the standby that walsender is ready for receiving commands */ /* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote); ReadyForQuery(DestRemote);
...@@ -290,7 +295,7 @@ IdentifySystem(void) ...@@ -290,7 +295,7 @@ IdentifySystem(void)
GetSystemIdentifier()); GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
logptr = GetInsertRecPtr(); logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X", snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff); logptr.xlogid, logptr.xrecoff);
...@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd)
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/* /*
* Check that we're logging enough information in the WAL for * We assume here that we're logging enough information in the WAL for
* log-shipping. * log-shipping, since this is checked in PostmasterMain().
* *
* NOTE: This only checks the current value of wal_level. Even if the * NOTE: wal_level can only change at shutdown, so in most cases it is
* current setting is not 'minimal', there can be old WAL in the pg_xlog * difficult for there to be WAL data that we can still see that was written
* directory that was created with 'minimal'. So this is not bulletproof, * at wal_level='minimal'.
* the purpose is just to give a user-friendly error message that hints
* how to configure the system correctly.
*/ */
if (wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
/* /*
* When we first start replication the standby will be behind the primary. * When we first start replication the standby will be behind the primary.
...@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void) ...@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void)
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
} }
SyncRepReleaseWaiters(); if (!am_cascading_walsender)
SyncRepReleaseWaiters();
} }
/* /*
...@@ -764,6 +764,8 @@ WalSndLoop(void) ...@@ -764,6 +764,8 @@ WalSndLoop(void)
/* /*
* When SIGUSR2 arrives, we send any outstanding logs up to the * When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit. * shutdown checkpoint record (i.e., the latest record) and exit.
* This may be a normal termination at shutdown, or a promotion,
* the walsender is not sure which.
*/ */
if (walsender_ready_to_stop && !pq_is_send_pending()) if (walsender_ready_to_stop && !pq_is_send_pending())
{ {
...@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg) ...@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg)
} }
/* /*
* Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
* *
* XXX probably this should be improved to suck data directly from the * XXX probably this should be improved to suck data directly from the
* WAL buffers when possible. * WAL buffers when possible.
...@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg) ...@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg)
* more than one. * more than one.
*/ */
void void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) XLogRead(char *buf, XLogRecPtr startptr, Size count)
{ {
XLogRecPtr startRecPtr = recptr; char *p;
char path[MAXPGPATH]; XLogRecPtr recptr;
Size nbytes;
uint32 lastRemovedLog; uint32 lastRemovedLog;
uint32 lastRemovedSeg; uint32 lastRemovedSeg;
uint32 log; uint32 log;
uint32 seg; uint32 seg;
retry:
p = buf;
recptr = startptr;
nbytes = count;
while (nbytes > 0) while (nbytes > 0)
{ {
uint32 startoff; uint32 startoff;
...@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
{ {
char path[MAXPGPATH];
/* Switch to another logfile segment */ /* Switch to another logfile segment */
if (sendFile >= 0) if (sendFile >= 0)
close(sendFile); close(sendFile);
...@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
else else
segbytes = nbytes; segbytes = nbytes;
readbytes = read(sendFile, buf, segbytes); readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0) if (readbytes <= 0)
ereport(ERROR, ereport(ERROR,
(errcode_for_file_access(), (errcode_for_file_access(),
...@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
sendOff += readbytes; sendOff += readbytes;
nbytes -= readbytes; nbytes -= readbytes;
buf += readbytes; p += readbytes;
} }
/* /*
...@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* already have been overwritten with new WAL records. * already have been overwritten with new WAL records.
*/ */
XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg); XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
XLByteToSeg(startRecPtr, log, seg); XLByteToSeg(startptr, log, seg);
if (log < lastRemovedLog || if (log < lastRemovedLog ||
(log == lastRemovedLog && seg <= lastRemovedSeg)) (log == lastRemovedLog && seg <= lastRemovedSeg))
{ {
...@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
errmsg("requested WAL segment %s has already been removed", errmsg("requested WAL segment %s has already been removed",
filename))); filename)));
} }
/*
* During recovery, the currently-open WAL file might be replaced with
* the file of the same name retrieved from archive. So we always need
* to check what we read was valid after reading into the buffer. If it's
* invalid, we try to open and read the file again.
*/
if (am_cascading_walsender)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
bool reload;
SpinLockAcquire(&walsnd->mutex);
reload = walsnd->needreload;
walsnd->needreload = false;
SpinLockRelease(&walsnd->mutex);
if (reload && sendFile >= 0)
{
close(sendFile);
sendFile = -1;
goto retry;
}
}
} }
/* /*
...@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup) ...@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL * subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master. * that gets lost on the master.
*/ */
SendRqstPtr = GetFlushRecPtr(); SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */ /* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr)) if (XLByteLE(SendRqstPtr, sentPtr))
...@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup) ...@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup)
return; return;
} }
/*
* Request walsenders to reload the currently-open WAL file
*/
void
WalSndRqstFileReload(void)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
if (walsnd->pid == 0)
continue;
SpinLockAcquire(&walsnd->mutex);
walsnd->needreload = true;
SpinLockRelease(&walsnd->mutex);
}
}
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */
static void static void
WalSndSigHupHandler(SIGNAL_ARGS) WalSndSigHupHandler(SIGNAL_ARGS)
......
...@@ -221,6 +221,9 @@ extern int wal_level; ...@@ -221,6 +221,9 @@ extern int wal_level;
/* Do we need to WAL-log information required only for Hot Standby? */ /* Do we need to WAL-log information required only for Hot Standby? */
#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
/* Can we allow the standby to accept replication connection from another standby? */
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
extern bool XLOG_DEBUG; extern bool XLOG_DEBUG;
#endif #endif
...@@ -292,7 +295,8 @@ extern bool RecoveryInProgress(void); ...@@ -292,7 +295,8 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void); extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void); extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(void); extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
extern XLogRecPtr GetStandbyFlushRecPtr(void);
extern void UpdateControlFile(void); extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void); extern uint64 GetSystemIdentifier(void);
......
...@@ -35,6 +35,7 @@ typedef struct WalSnd ...@@ -35,6 +35,7 @@ typedef struct WalSnd
pid_t pid; /* this walsender's process id, or 0 */ pid_t pid; /* this walsender's process id, or 0 */
WalSndState state; /* this walsender's state */ WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be reloaded? */
/* /*
* The xlog locations that have been written, flushed, and applied by * The xlog locations that have been written, flushed, and applied by
...@@ -92,6 +93,7 @@ extern WalSndCtlData *WalSndCtl; ...@@ -92,6 +93,7 @@ extern WalSndCtlData *WalSndCtl;
/* global state */ /* global state */
extern bool am_walsender; extern bool am_walsender;
extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop; extern volatile sig_atomic_t walsender_ready_to_stop;
...@@ -106,7 +108,8 @@ extern Size WalSndShmemSize(void); ...@@ -106,7 +108,8 @@ extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void); extern void WalSndShmemInit(void);
extern void WalSndWakeup(void); extern void WalSndWakeup(void);
extern void WalSndSetState(WalSndState state); extern void WalSndSetState(WalSndState state);
extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
extern void WalSndRqstFileReload(void);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment