Commit abfd192b authored by Heikki Linnakangas's avatar Heikki Linnakangas

Allow a streaming replication standby to follow a timeline switch.

Before this patch, streaming replication would refuse to start replicating
if the timeline in the primary doesn't exactly match the standby. The
situation where it doesn't match is when you have a master, and two
standbys, and you promote one of the standbys to become new master.
Promoting bumps up the timeline ID, and after that bump, the other standby
would refuse to continue.

There's significantly more timeline related logic in streaming replication
now. First of all, when a standby connects to primary, it will ask the
primary for any timeline history files that are missing from the standby.
The missing files are sent using a new replication command TIMELINE_HISTORY,
and stored in standby's pg_xlog directory. Using the timeline history files,
the standby can follow the latest timeline present in the primary
(recovery_target_timeline='latest'), just as it can follow new timelines
appearing in an archive directory.

START_REPLICATION now takes a TIMELINE parameter, to specify exactly which
timeline to stream WAL from. This allows the standby to request the primary
to send over WAL that precedes the promotion. The replication protocol is
changed slightly (in a backwards-compatible way although there's little hope
of streaming replication working across major versions anyway), to allow
replication to stop when the end of timeline reached, putting the walsender
back into accepting a replication command.

Many thanks to Amit Kapila for testing and reviewing various versions of
this patch.
parent 52766871
...@@ -912,10 +912,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' ...@@ -912,10 +912,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
</para> </para>
<para> <para>
Promoting a cascading standby terminates the immediate downstream replication If an upstream standby server is promoted to become new master, downstream
connections which it serves. This is because the timeline becomes different servers will continue to stream from the new master if
between standbys, and they can no longer continue replication. The <varname>recovery_target_timeline</> is set to <literal>'latest'</>.
affected standby(s) may reconnect to reestablish streaming replication.
</para> </para>
<para> <para>
......
...@@ -1018,14 +1018,21 @@ ...@@ -1018,14 +1018,21 @@
</para> </para>
<para> <para>
There is another Copy-related mode called Copy-both, which allows There is another Copy-related mode called copy-both, which allows
high-speed bulk data transfer to <emphasis>and</> from the server. high-speed bulk data transfer to <emphasis>and</> from the server.
Copy-both mode is initiated when a backend in walsender mode Copy-both mode is initiated when a backend in walsender mode
executes a <command>START_REPLICATION</command> statement. The executes a <command>START_REPLICATION</command> statement. The
backend sends a CopyBothResponse message to the frontend. Both backend sends a CopyBothResponse message to the frontend. Both
the backend and the frontend may then send CopyData messages the backend and the frontend may then send CopyData messages
until the connection is terminated. See <xref until either end sends a CopyDone message. After the client
linkend="protocol-replication">. sends a CopyDone message, the connection goes from copy-both mode to
copy-out mode, and the client may not send any more CopyData messages.
Similarly, when the server sends a CopyDone message, the connection
goes into copy-in mode, and the server may not send any more CopyData
messages. After both sides have sent a CopyDone message, the copy mode
is terminated, and the backend reverts to the command-processing mode.
See <xref linkend="protocol-replication"> for more information on the
subprotocol transmitted over copy-both mode.
</para> </para>
<para> <para>
...@@ -1350,19 +1357,69 @@ The commands accepted in walsender mode are: ...@@ -1350,19 +1357,69 @@ The commands accepted in walsender mode are:
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term> <term>TIMELINE_HISTORY <replaceable class="parameter">tli</replaceable></term>
<listitem>
<para>
Requests the server to send over the timeline history file for timeline
<replaceable class="parameter">tli</replaceable>. Server replies with a
result set of a single row, containing two fields:
</para>
<para>
<variablelist>
<varlistentry>
<term>
filename
</term>
<listitem>
<para>
Filename of the timeline history file, e.g 00000002.history.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
content
</term>
<listitem>
<para>
Contents of the timeline history file.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
<listitem> <listitem>
<para> <para>
Instructs server to start streaming WAL, starting at Instructs server to start streaming WAL, starting at
WAL position <replaceable>XXX</>/<replaceable>XXX</>. WAL position <replaceable class="parameter">XXX/XXX</> on timeline
<replaceable class="parameter">tli</>.
The server can reply with an error, e.g. if the requested section of WAL The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a has already been recycled. On success, server responds with a
CopyBothResponse message, and then starts to stream WAL to the frontend. CopyBothResponse message, and then starts to stream WAL to the frontend.
WAL will continue to be streamed until the connection is broken; </para>
no further commands will be accepted. If the WAL sender process is
terminated normally (during postmaster shutdown), it will send a <para>
CommandComplete message before exiting. This might not happen during an If the client requests a timeline that's not the latest, but is part of
abnormal shutdown, of course. the history of the server, the server will stream all the WAL on that
timeline starting from the requested startpoint, up to the point where
the server switched to another timeline. If the client requests
streaming at exactly the end of an old timeline, the server responds
immediately with CommandComplete without entering COPY mode.
</para>
<para>
After streaming all the WAL on a timeline that is not the latest one,
the server will end streaming by exiting the COPY mode. When the client
acknowledges this by also exiting COPY mode, the server responds with a
CommandComplete message, and is ready to accept a new command.
</para> </para>
<para> <para>
......
...@@ -410,6 +410,89 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, ...@@ -410,6 +410,89 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
XLogArchiveNotify(histfname); XLogArchiveNotify(histfname);
} }
/*
* Writes a history file for given timeline and contents.
*
* Currently this is only used in the walreceiver process, and so there are
* no locking considerations. But we should be just as tense as XLogFileInit
* to avoid emplacing a bogus file.
*/
void
writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
int fd;
/*
* Write into a temp file name.
*/
snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
unlink(tmppath);
/* do not use get_sync_bit() here --- want to fsync only at end of fill */
fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
S_IRUSR | S_IWUSR);
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not create file \"%s\": %m", tmppath)));
errno = 0;
if ((int) write(fd, content, size) != size)
{
int save_errno = errno;
/*
* If we fail to make the file, delete it to release disk space
*/
unlink(tmppath);
/* if write didn't set errno, assume problem is no disk space */
errno = save_errno ? save_errno : ENOSPC;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m", tmppath)));
}
if (pg_fsync(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fsync file \"%s\": %m", tmppath)));
if (CloseTransientFile(fd))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m", tmppath)));
/*
* Now move the completed history file into place with its final name.
*/
TLHistoryFilePath(path, tli);
/*
* Prefer link() to rename() here just to be really sure that we don't
* overwrite an existing logfile. However, there shouldn't be one, so
* rename() is an acceptable substitute except for the truly paranoid.
*/
#if HAVE_WORKING_LINK
if (link(tmppath, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not link file \"%s\" to \"%s\": %m",
tmppath, path)));
unlink(tmppath);
#else
if (rename(tmppath, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rename file \"%s\" to \"%s\": %m",
tmppath, path)));
#endif
}
/* /*
* Returns true if 'expectedTLEs' contains a timeline with id 'tli' * Returns true if 'expectedTLEs' contains a timeline with id 'tli'
*/ */
......
...@@ -153,6 +153,7 @@ static XLogRecPtr LastRec; ...@@ -153,6 +153,7 @@ static XLogRecPtr LastRec;
/* Local copy of WalRcv->receivedUpto */ /* Local copy of WalRcv->receivedUpto */
static XLogRecPtr receivedUpto = 0; static XLogRecPtr receivedUpto = 0;
static TimeLineID receiveTLI = 0;
/* /*
* During recovery, lastFullPageWrites keeps track of full_page_writes that * During recovery, lastFullPageWrites keeps track of full_page_writes that
...@@ -6366,6 +6367,12 @@ StartupXLOG(void) ...@@ -6366,6 +6367,12 @@ StartupXLOG(void)
xlogctl->SharedRecoveryInProgress = false; xlogctl->SharedRecoveryInProgress = false;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
} }
/*
* If there were cascading standby servers connected to us, nudge any
* wal sender processes to notice that we've been promoted.
*/
WalSndWakeup();
} }
/* /*
...@@ -7626,7 +7633,7 @@ CreateRestartPoint(int flags) ...@@ -7626,7 +7633,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr; XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */ /* Get the current (or recent) end of xlog */
endptr = GetStandbyFlushRecPtr(NULL); endptr = GetStandbyFlushRecPtr();
KeepLogSeg(endptr, &_logSegNo); KeepLogSeg(endptr, &_logSegNo);
_logSegNo--; _logSegNo--;
...@@ -9087,13 +9094,10 @@ do_pg_abort_backup(void) ...@@ -9087,13 +9094,10 @@ do_pg_abort_backup(void)
/* /*
* Get latest redo apply position. * Get latest redo apply position.
* *
* Optionally, returns the current recovery target timeline. Callers not
* interested in that may pass NULL for targetTLI.
*
* Exported to allow WALReceiver to read the pointer directly. * Exported to allow WALReceiver to read the pointer directly.
*/ */
XLogRecPtr XLogRecPtr
GetXLogReplayRecPtr(TimeLineID *targetTLI) GetXLogReplayRecPtr(void)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl; volatile XLogCtlData *xlogctl = XLogCtl;
...@@ -9101,8 +9105,6 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI) ...@@ -9101,8 +9105,6 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI)
SpinLockAcquire(&xlogctl->info_lck); SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->lastReplayedEndRecPtr; recptr = xlogctl->lastReplayedEndRecPtr;
if (targetTLI)
*targetTLI = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
return recptr; return recptr;
...@@ -9111,18 +9113,15 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI) ...@@ -9111,18 +9113,15 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI)
/* /*
* Get current standby flush position, ie, the last WAL position * Get current standby flush position, ie, the last WAL position
* known to be fsync'd to disk in standby. * known to be fsync'd to disk in standby.
*
* If 'targetTLI' is not NULL, it's set to the current recovery target
* timeline.
*/ */
XLogRecPtr XLogRecPtr
GetStandbyFlushRecPtr(TimeLineID *targetTLI) GetStandbyFlushRecPtr(void)
{ {
XLogRecPtr receivePtr; XLogRecPtr receivePtr;
XLogRecPtr replayPtr; XLogRecPtr replayPtr;
receivePtr = GetWalRcvWriteRecPtr(NULL); receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
replayPtr = GetXLogReplayRecPtr(targetTLI); replayPtr = GetXLogReplayRecPtr();
if (XLByteLT(receivePtr, replayPtr)) if (XLByteLT(receivePtr, replayPtr))
return replayPtr; return replayPtr;
...@@ -9611,7 +9610,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9611,7 +9610,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* archive and pg_xlog before failover. * archive and pg_xlog before failover.
*/ */
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
{
ShutdownWalRcv();
return false; return false;
}
/* /*
* If primary_conninfo is set, launch walreceiver to try to * If primary_conninfo is set, launch walreceiver to try to
...@@ -9626,8 +9628,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9626,8 +9628,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (PrimaryConnInfo) if (PrimaryConnInfo)
{ {
XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
TimeLineID tli = tliOfPointInHistory(ptr, expectedTLEs);
RequestXLogStreaming(ptr, PrimaryConnInfo); if (curFileTLI > 0 && tli < curFileTLI)
elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
(uint32) (ptr >> 32), (uint32) ptr,
tli, curFileTLI);
curFileTLI = tli;
RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
} }
/* /*
* Move to XLOG_FROM_STREAM state in either case. We'll get * Move to XLOG_FROM_STREAM state in either case. We'll get
...@@ -9653,10 +9661,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9653,10 +9661,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/ */
/* /*
* Before we leave XLOG_FROM_STREAM state, make sure that * Before we leave XLOG_FROM_STREAM state, make sure that
* walreceiver is not running, so that it won't overwrite * walreceiver is not active, so that it won't overwrite
* any WAL that we restore from archive. * WAL that we restore from archive.
*/ */
if (WalRcvInProgress()) if (WalRcvStreaming())
ShutdownWalRcv(); ShutdownWalRcv();
/* /*
...@@ -9749,7 +9757,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9749,7 +9757,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* /*
* Check if WAL receiver is still active. * Check if WAL receiver is still active.
*/ */
if (!WalRcvInProgress()) if (!WalRcvStreaming())
{ {
lastSourceFailed = true; lastSourceFailed = true;
break; break;
...@@ -9772,8 +9780,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9772,8 +9780,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
{ {
XLogRecPtr latestChunkStart; XLogRecPtr latestChunkStart;
receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
if (XLByteLT(RecPtr, receivedUpto)) if (XLByteLT(RecPtr, receivedUpto) && receiveTLI == curFileTLI)
{ {
havedata = true; havedata = true;
if (!XLByteLT(RecPtr, latestChunkStart)) if (!XLByteLT(RecPtr, latestChunkStart))
...@@ -9888,8 +9896,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) ...@@ -9888,8 +9896,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
/* /*
* Check to see whether the user-specified trigger file exists and whether a * Check to see whether the user-specified trigger file exists and whether a
* promote request has arrived. If either condition holds, request postmaster * promote request has arrived. If either condition holds, return true.
* to shut down walreceiver, wait for it to exit, and return true.
*/ */
static bool static bool
CheckForStandbyTrigger(void) CheckForStandbyTrigger(void)
...@@ -9904,7 +9911,6 @@ CheckForStandbyTrigger(void) ...@@ -9904,7 +9911,6 @@ CheckForStandbyTrigger(void)
{ {
ereport(LOG, ereport(LOG,
(errmsg("received promote request"))); (errmsg("received promote request")));
ShutdownWalRcv();
ResetPromoteTriggered(); ResetPromoteTriggered();
triggered = true; triggered = true;
return true; return true;
...@@ -9917,7 +9923,6 @@ CheckForStandbyTrigger(void) ...@@ -9917,7 +9923,6 @@ CheckForStandbyTrigger(void)
{ {
ereport(LOG, ereport(LOG,
(errmsg("trigger file found: %s", TriggerFile))); (errmsg("trigger file found: %s", TriggerFile)));
ShutdownWalRcv();
unlink(TriggerFile); unlink(TriggerFile);
triggered = true; triggered = true;
return true; return true;
......
...@@ -226,7 +226,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS) ...@@ -226,7 +226,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr; XLogRecPtr recptr;
char location[MAXFNAMELEN]; char location[MAXFNAMELEN];
recptr = GetWalRcvWriteRecPtr(NULL); recptr = GetWalRcvWriteRecPtr(NULL, NULL);
if (recptr == 0) if (recptr == 0)
PG_RETURN_NULL(); PG_RETURN_NULL();
...@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS) ...@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr; XLogRecPtr recptr;
char location[MAXFNAMELEN]; char location[MAXFNAMELEN];
recptr = GetXLogReplayRecPtr(NULL); recptr = GetXLogReplayRecPtr();
if (recptr == 0) if (recptr == 0)
PG_RETURN_NULL(); PG_RETURN_NULL();
......
...@@ -2563,27 +2563,6 @@ reaper(SIGNAL_ARGS) ...@@ -2563,27 +2563,6 @@ reaper(SIGNAL_ARGS)
ReachedNormalRunning = true; ReachedNormalRunning = true;
pmState = PM_RUN; pmState = PM_RUN;
/*
* Kill any walsenders to force the downstream standby(s) to
* reread the timeline history file, adjust their timelines and
* establish replication connections again. This is required
* because the timeline of cascading standby is not consistent
* with that of cascaded one just after failover. We LOG this
* message since we need to leave a record to explain this
* disconnection.
*
* XXX should avoid the need for disconnection. When we do,
* am_cascading_walsender should be replaced with
* RecoveryInProgress()
*/
if (max_wal_senders > 0 && CountChildren(BACKEND_TYPE_WALSND) > 0)
{
ereport(LOG,
(errmsg("terminating all walsender processes to force cascaded "
"standby(s) to update timeline and reconnect")));
SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
}
/* /*
* Crank up the background tasks, if we didn't do that already * Crank up the background tasks, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter * when we entered consistent recovery state. It doesn't matter
......
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
* The Startup process initialises the server and performs any recovery * The Startup process initialises the server and performs any recovery
* actions that have been specified. Notice that there is no "main loop" * actions that have been specified. Notice that there is no "main loop"
* since the Startup process ends as soon as initialisation is complete. * since the Startup process ends as soon as initialisation is complete.
* (in standby mode, one can think of the replay loop as a main loop,
* though.)
* *
* *
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
......
...@@ -56,6 +56,9 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); ...@@ -56,6 +56,9 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr); static void SendXlogRecPtrResult(XLogRecPtr ptr);
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
/* /*
* Size of each block sent into the tar stream for larger files. * Size of each block sent into the tar stream for larger files.
* *
...@@ -94,6 +97,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -94,6 +97,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
XLogRecPtr endptr; XLogRecPtr endptr;
char *labelfile; char *labelfile;
backup_started_in_recovery = RecoveryInProgress();
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
SendXlogRecPtrResult(startptr); SendXlogRecPtrResult(startptr);
...@@ -261,7 +266,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -261,7 +266,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* http://lists.apple.com/archives/xcode-users/2003/Dec//msg000 * http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
* 51.html * 51.html
*/ */
XLogRead(buf, ptr, TAR_SEND_SIZE); XLogRead(buf, ThisTimeLineID, ptr, TAR_SEND_SIZE);
if (pq_putmessage('d', buf, TAR_SEND_SIZE)) if (pq_putmessage('d', buf, TAR_SEND_SIZE))
ereport(ERROR, ereport(ERROR,
(errmsg("base backup could not send data, aborting backup"))); (errmsg("base backup could not send data, aborting backup")));
...@@ -592,11 +597,19 @@ sendDir(char *path, int basepathlen, bool sizeonly) ...@@ -592,11 +597,19 @@ sendDir(char *path, int basepathlen, bool sizeonly)
/* /*
* Check if the postmaster has signaled us to exit, and abort with an * Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call * error in that case. The error handler further up will call
* do_pg_abort_backup() for us. * do_pg_abort_backup() for us. Also check that if the backup was
* started while still in recovery, the server wasn't promoted.
* dp_pg_stop_backup() will check that too, but it's better to stop
* the backup early than continue to the end and fail there.
*/ */
if (ProcDiePending || walsender_ready_to_stop) CHECK_FOR_INTERRUPTS();
if (RecoveryInProgress() != backup_started_in_recovery)
ereport(ERROR, ereport(ERROR,
(errmsg("shutdown requested, aborting active base backup"))); (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("the standby was promoted during online backup"),
errhint("This means that the backup being taken is corrupt "
"and should not be used. "
"Try taking another online backup.")));
snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name); snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
......
...@@ -46,9 +46,12 @@ static PGconn *streamConn = NULL; ...@@ -46,9 +46,12 @@ static PGconn *streamConn = NULL;
static char *recvBuf = NULL; static char *recvBuf = NULL;
/* Prototypes for interface functions */ /* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static void libpqrcv_connect(char *conninfo);
static bool libpqrcv_receive(int timeout, unsigned char *type, static void libpqrcv_identify_system(TimeLineID *primary_tli);
char **buffer, int *len); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
static void libpqrcv_endstreaming(void);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void); static void libpqrcv_disconnect(void);
...@@ -63,10 +66,17 @@ void ...@@ -63,10 +66,17 @@ void
_PG_init(void) _PG_init(void)
{ {
/* Tell walreceiver how to reach us */ /* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL || if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
walrcv_send != NULL || walrcv_disconnect != NULL) walrcv_readtimelinehistoryfile != NULL ||
walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
walrcv_receive != NULL || walrcv_send != NULL ||
walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded"); elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect; walrcv_connect = libpqrcv_connect;
walrcv_identify_system = libpqrcv_identify_system;
walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
walrcv_startstreaming = libpqrcv_startstreaming;
walrcv_endstreaming = libpqrcv_endstreaming;
walrcv_receive = libpqrcv_receive; walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send; walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect; walrcv_disconnect = libpqrcv_disconnect;
...@@ -75,16 +85,10 @@ _PG_init(void) ...@@ -75,16 +85,10 @@ _PG_init(void)
/* /*
* Establish the connection to the primary server for XLOG streaming * Establish the connection to the primary server for XLOG streaming
*/ */
static bool static void
libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) libpqrcv_connect(char *conninfo)
{ {
char conninfo_repl[MAXCONNINFO + 75]; char conninfo_repl[MAXCONNINFO + 75];
char *primary_sysid;
char standby_sysid[32];
TimeLineID primary_tli;
TimeLineID standby_tli;
PGresult *res;
char cmd[64];
/* /*
* Connect using deliberately undocumented parameter: replication. The * Connect using deliberately undocumented parameter: replication. The
...@@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ...@@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the primary server: %s", (errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
}
/*
* Check that primary's system identifier matches ours, and fetch the current
* timeline ID of the primary.
*/
static void
libpqrcv_identify_system(TimeLineID *primary_tli)
{
PGresult *res;
char *primary_sysid;
char standby_sysid[32];
/* /*
* Get the system identifier and timeline ID as a DataRow message from the * Get the system identifier and timeline ID as a DataRow message from the
...@@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ...@@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ntuples, nfields))); ntuples, nfields)));
} }
primary_sysid = PQgetvalue(res, 0, 0); primary_sysid = PQgetvalue(res, 0, 0);
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/* /*
* Confirm that the system identifier of the primary is the same as ours. * Confirm that the system identifier of the primary is the same as ours.
...@@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ...@@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
errdetail("The primary's identifier is %s, the standby's identifier is %s.", errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid))); primary_sysid, standby_sysid)));
} }
PQclear(res);
}
/* /*
* Confirm that the current timeline of the primary is the same as the * Start streaming WAL data from given startpoint and timeline.
* recovery target timeline. *
* Returns true if we switched successfully to copy-both mode. False
* means the server received the command and executed it successfully, but
* didn't switch to copy-mode. That means that there was no WAL on the
* requested timeline and starting point, because the server switched to
* another timeline at or before the requested starting point. On failure,
* throws an ERROR.
*/ */
standby_tli = GetRecoveryTargetTLI(); static bool
PQclear(res); libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
if (primary_tli != standby_tli) {
ereport(ERROR, char cmd[64];
(errmsg("timeline %u of the primary does not match recovery target timeline %u", PGresult *res;
primary_tli, standby_tli)));
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */ /* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint); (uint32) (startpoint >> 32), (uint32) startpoint,
tli);
res = libpqrcv_PQexec(cmd); res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
return false;
}
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
...@@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ...@@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
} }
PQclear(res); PQclear(res);
return true;
}
ereport(LOG, /*
(errmsg("streaming replication successfully connected to primary"))); * Stop streaming WAL data.
*/
static void
libpqrcv_endstreaming(void)
{
PGresult *res;
return true; if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s",
PQerrorMessage(streamConn))));
/* Read the command result after COPY is finished */
while ((res = PQgetResult(streamConn)) != NULL)
{
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
/*
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
* is also possible. However, at the moment this function is only
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative.
*/
PQclear(res);
}
}
/*
* Fetch the timeline history file for 'tli' from primary.
*/
static void
libpqrcv_readtimelinehistoryfile(TimeLineID tli,
char **filename, char **content, int *len)
{
PGresult *res;
char cmd[64];
/*
* Request the primary to send over the history file for given timeline.
*/
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive timeline history file from "
"the primary server: %s",
PQerrorMessage(streamConn))));
}
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
*filename = pstrdup(PQgetvalue(res, 0, 0));
*len = PQgetlength(res, 0, 1);
*content = palloc(*len);
memcpy(*content, PQgetvalue(res, 0, 1), *len);
PQclear(res);
} }
/* /*
...@@ -327,20 +426,19 @@ libpqrcv_disconnect(void) ...@@ -327,20 +426,19 @@ libpqrcv_disconnect(void)
* *
* Returns: * Returns:
* *
* True if data was received. *type, *buffer and *len are set to * If data was received, returns the length of the data. *buffer is set to
* the type of the received data, buffer holding it, and length, * point to a buffer holding the received message. The buffer is only valid
* respectively. * until the next libpqrcv_* call.
* *
* False if no data was available within timeout, or wait was interrupted * 0 if no data was available within timeout, or wait was interrupted
* by signal. * by signal.
* *
* The buffer returned is only valid until the next call of this function or * -1 if the server ended the COPY.
* libpq_connect/disconnect.
* *
* ereports on error. * ereports on error.
*/ */
static bool static int
libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) libpqrcv_receive(int timeout, char **buffer)
{ {
int rawlen; int rawlen;
...@@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) ...@@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
if (timeout > 0) if (timeout > 0)
{ {
if (!libpq_select(timeout)) if (!libpq_select(timeout))
return false; return 0;
} }
if (PQconsumeInput(streamConn) == 0) if (PQconsumeInput(streamConn) == 0)
...@@ -370,35 +468,35 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) ...@@ -370,35 +468,35 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
/* Now that we've consumed some input, try again */ /* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0) if (rawlen == 0)
return false; return 0;
} }
if (rawlen == -1) /* end-of-streaming or error */ if (rawlen == -1) /* end-of-streaming or error */
{ {
PGresult *res; PGresult *res;
res = PQgetResult(streamConn); res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK) if (PQresultStatus(res) == PGRES_COMMAND_OK ||
PQresultStatus(res) == PGRES_COPY_IN)
{ {
PQclear(res); PQclear(res);
ereport(ERROR, return -1;
(errmsg("replication terminated by primary server")));
} }
else
{
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
} }
}
if (rawlen < -1) if (rawlen < -1)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
/* Return received messages to caller */ /* Return received messages to caller */
*type = *((unsigned char *) recvBuf); *buffer = recvBuf;
*buffer = recvBuf + sizeof(*type); return rawlen;
*len = rawlen - sizeof(*type);
return true;
} }
/* /*
......
...@@ -56,6 +56,7 @@ Node *replication_parse_result; ...@@ -56,6 +56,7 @@ Node *replication_parse_result;
%union { %union {
char *str; char *str;
bool boolval; bool boolval;
int32 intval;
XLogRecPtr recptr; XLogRecPtr recptr;
Node *node; Node *node;
...@@ -65,22 +66,26 @@ Node *replication_parse_result; ...@@ -65,22 +66,26 @@ Node *replication_parse_result;
/* Non-keyword tokens */ /* Non-keyword tokens */
%token <str> SCONST %token <str> SCONST
%token <intval> ICONST
%token <recptr> RECPTR %token <recptr> RECPTR
/* Keyword tokens. */ /* Keyword tokens. */
%token K_BASE_BACKUP %token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM %token K_IDENTIFY_SYSTEM
%token K_START_REPLICATION
%token K_TIMELINE_HISTORY
%token K_LABEL %token K_LABEL
%token K_PROGRESS %token K_PROGRESS
%token K_FAST %token K_FAST
%token K_NOWAIT %token K_NOWAIT
%token K_WAL %token K_WAL
%token K_START_REPLICATION %token K_TIMELINE
%type <node> command %type <node> command
%type <node> base_backup start_replication identify_system %type <node> base_backup start_replication identify_system timeline_history
%type <list> base_backup_opt_list %type <list> base_backup_opt_list
%type <defelt> base_backup_opt %type <defelt> base_backup_opt
%type <intval> opt_timeline
%% %%
firstcmd: command opt_semicolon firstcmd: command opt_semicolon
...@@ -97,6 +102,7 @@ command: ...@@ -97,6 +102,7 @@ command:
identify_system identify_system
| base_backup | base_backup
| start_replication | start_replication
| timeline_history
; ;
/* /*
...@@ -153,15 +159,48 @@ base_backup_opt: ...@@ -153,15 +159,48 @@ base_backup_opt:
; ;
/* /*
* START_REPLICATION %X/%X * START_REPLICATION %X/%X [TIMELINE %d]
*/ */
start_replication: start_replication:
K_START_REPLICATION RECPTR K_START_REPLICATION RECPTR opt_timeline
{ {
StartReplicationCmd *cmd; StartReplicationCmd *cmd;
cmd = makeNode(StartReplicationCmd); cmd = makeNode(StartReplicationCmd);
cmd->startpoint = $2; cmd->startpoint = $2;
cmd->timeline = $3;
$$ = (Node *) cmd;
}
;
opt_timeline:
K_TIMELINE ICONST
{
if ($2 <= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg("invalid timeline %d", $2))));
$$ = $2;
}
| /* nothing */ { $$ = 0; }
;
/*
* TIMELINE_HISTORY %d
*/
timeline_history:
K_TIMELINE_HISTORY ICONST
{
TimeLineHistoryCmd *cmd;
if ($2 <= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg("invalid timeline %d", $2))));
cmd = makeNode(TimeLineHistoryCmd);
cmd->timeline = $2;
$$ = (Node *) cmd; $$ = (Node *) cmd;
} }
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "utils/builtins.h"
/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
#undef fprintf #undef fprintf
#define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg))) #define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg)))
...@@ -49,6 +51,7 @@ xqstart {quote} ...@@ -49,6 +51,7 @@ xqstart {quote}
xqdouble {quote}{quote} xqdouble {quote}{quote}
xqinside [^']+ xqinside [^']+
digit [0-9]+
hexdigit [0-9A-Za-z]+ hexdigit [0-9A-Za-z]+
quote ' quote '
...@@ -63,7 +66,9 @@ LABEL { return K_LABEL; } ...@@ -63,7 +66,9 @@ LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; } NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; } PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; } WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; } START_REPLICATION { return K_START_REPLICATION; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
"," { return ','; } "," { return ','; }
";" { return ';'; } ";" { return ';'; }
...@@ -71,6 +76,11 @@ START_REPLICATION { return K_START_REPLICATION; } ...@@ -71,6 +76,11 @@ START_REPLICATION { return K_START_REPLICATION; }
[\t] ; [\t] ;
" " ; " " ;
{digit}+ {
yylval.intval = pg_atoi(yytext, sizeof(int32), 0);
return ICONST;
}
{hexdigit}+\/{hexdigit}+ { {hexdigit}+\/{hexdigit}+ {
uint32 hi, uint32 hi,
lo; lo;
......
This diff is collapsed.
...@@ -64,12 +64,13 @@ WalRcvShmemInit(void) ...@@ -64,12 +64,13 @@ WalRcvShmemInit(void)
MemSet(WalRcv, 0, WalRcvShmemSize()); MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED; WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex); SpinLockInit(&WalRcv->mutex);
InitSharedLatch(&WalRcv->latch);
} }
} }
/* Is walreceiver in progress (or starting up)? */ /* Is walreceiver running (or starting up)? */
bool bool
WalRcvInProgress(void) WalRcvRunning(void)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
...@@ -110,6 +111,53 @@ WalRcvInProgress(void) ...@@ -110,6 +111,53 @@ WalRcvInProgress(void)
return false; return false;
} }
/*
* Is walreceiver running and streaming (or at least attempting to connect,
* or starting up)?
*/
bool
WalRcvStreaming(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
startTime = walrcv->startTime;
SpinLockRelease(&walrcv->mutex);
/*
* If it has taken too long for walreceiver to start up, give up. Setting
* the state to STOPPED ensures that if walreceiver later does start up
* after all, it will see that it's not supposed to be running and die
* without doing anything.
*/
if (state == WALRCV_STARTING)
{
pg_time_t now = (pg_time_t) time(NULL);
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING)
state = walrcv->walRcvState = WALRCV_STOPPED;
SpinLockRelease(&walrcv->mutex);
}
}
if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
state == WALRCV_RESTARTING)
return true;
else
return false;
}
/* /*
* Stop walreceiver (if running) and wait for it to die. * Stop walreceiver (if running) and wait for it to die.
* Executed by the Startup process. * Executed by the Startup process.
...@@ -135,7 +183,9 @@ ShutdownWalRcv(void) ...@@ -135,7 +183,9 @@ ShutdownWalRcv(void)
walrcv->walRcvState = WALRCV_STOPPED; walrcv->walRcvState = WALRCV_STOPPED;
break; break;
case WALRCV_RUNNING: case WALRCV_STREAMING:
case WALRCV_WAITING:
case WALRCV_RESTARTING:
walrcv->walRcvState = WALRCV_STOPPING; walrcv->walRcvState = WALRCV_STOPPING;
/* fall through */ /* fall through */
case WALRCV_STOPPING: case WALRCV_STOPPING:
...@@ -154,7 +204,7 @@ ShutdownWalRcv(void) ...@@ -154,7 +204,7 @@ ShutdownWalRcv(void)
* Wait for walreceiver to acknowledge its death by setting state to * Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED. * WALRCV_STOPPED.
*/ */
while (WalRcvInProgress()) while (WalRcvRunning())
{ {
/* /*
* This possibly-long loop needs to handle interrupts of startup * This possibly-long loop needs to handle interrupts of startup
...@@ -173,10 +223,11 @@ ShutdownWalRcv(void) ...@@ -173,10 +223,11 @@ ShutdownWalRcv(void)
* is a libpq connection string to use. * is a libpq connection string to use.
*/ */
void void
RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
bool launch = false;
pg_time_t now = (pg_time_t) time(NULL); pg_time_t now = (pg_time_t) time(NULL);
/* /*
...@@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
/* It better be stopped before we try to restart it */ /* It better be stopped if we try to restart it */
Assert(walrcv->walRcvState == WALRCV_STOPPED); Assert(walrcv->walRcvState == WALRCV_STOPPED ||
walrcv->walRcvState == WALRCV_WAITING);
if (conninfo != NULL) if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else else
walrcv->conninfo[0] = '\0'; walrcv->conninfo[0] = '\0';
if (walrcv->walRcvState == WALRCV_STOPPED)
{
launch = true;
walrcv->walRcvState = WALRCV_STARTING; walrcv->walRcvState = WALRCV_STARTING;
}
else
walrcv->walRcvState = WALRCV_RESTARTING;
walrcv->startTime = now; walrcv->startTime = now;
/* /*
...@@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->latestChunkStart = recptr; walrcv->latestChunkStart = recptr;
} }
walrcv->receiveStart = recptr; walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
else
SetLatch(&walrcv->latch);
} }
/* /*
...@@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
* *
* Optionally, returns the previous chunk start, that is the first byte * Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not * written in the most recent walreceiver flush cycle. Callers not
* interested in that value may pass NULL for latestChunkStart. * interested in that value may pass NULL for latestChunkStart. Same for
* receiveTLI.
*/ */
XLogRecPtr XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
...@@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) ...@@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
recptr = walrcv->receivedUpto; recptr = walrcv->receivedUpto;
if (latestChunkStart) if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart; *latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI)
*receiveTLI = walrcv->receivedTLI;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
return recptr; return recptr;
...@@ -258,7 +324,7 @@ GetReplicationApplyDelay(void) ...@@ -258,7 +324,7 @@ GetReplicationApplyDelay(void)
receivePtr = walrcv->receivedUpto; receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL); replayPtr = GetXLogReplayRecPtr();
if (XLByteEQ(receivePtr, replayPtr)) if (XLByteEQ(receivePtr, replayPtr))
return 0; return 0;
......
This diff is collapsed.
...@@ -34,6 +34,7 @@ extern bool existsTimeLineHistory(TimeLineID probeTLI); ...@@ -34,6 +34,7 @@ extern bool existsTimeLineHistory(TimeLineID probeTLI);
extern TimeLineID findNewestTimeLine(TimeLineID startTLI); extern TimeLineID findNewestTimeLine(TimeLineID startTLI);
extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
XLogRecPtr switchpoint, char *reason); XLogRecPtr switchpoint, char *reason);
extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
extern bool tliInHistory(TimeLineID tli, List *expectedTLIs); extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history); extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history); extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
......
...@@ -283,8 +283,8 @@ extern bool RecoveryInProgress(void); ...@@ -283,8 +283,8 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void); extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void); extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI); extern XLogRecPtr GetXLogReplayRecPtr(void);
extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI); extern XLogRecPtr GetStandbyFlushRecPtr(void);
extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void); extern bool RecoveryIsPaused(void);
......
...@@ -407,6 +407,7 @@ typedef enum NodeTag ...@@ -407,6 +407,7 @@ typedef enum NodeTag
T_IdentifySystemCmd, T_IdentifySystemCmd,
T_BaseBackupCmd, T_BaseBackupCmd,
T_StartReplicationCmd, T_StartReplicationCmd,
T_TimeLineHistoryCmd,
/* /*
* TAGS FOR RANDOM OTHER STUFF * TAGS FOR RANDOM OTHER STUFF
......
...@@ -46,7 +46,19 @@ typedef struct BaseBackupCmd ...@@ -46,7 +46,19 @@ typedef struct BaseBackupCmd
typedef struct StartReplicationCmd typedef struct StartReplicationCmd
{ {
NodeTag type; NodeTag type;
TimeLineID timeline;
XLogRecPtr startpoint; XLogRecPtr startpoint;
} StartReplicationCmd; } StartReplicationCmd;
/* ----------------------
* TIMELINE_HISTORY command
* ----------------------
*/
typedef struct TimeLineHistoryCmd
{
NodeTag type;
TimeLineID timeline;
} TimeLineHistoryCmd;
#endif /* REPLNODES_H */ #endif /* REPLNODES_H */
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "access/xlog.h" #include "access/xlog.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include "storage/latch.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "pgtime.h" #include "pgtime.h"
...@@ -40,7 +41,9 @@ typedef enum ...@@ -40,7 +41,9 @@ typedef enum
WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */ * initialized yet */
WALRCV_RUNNING, /* walreceiver is running */ WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */ WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState; } WalRcvState;
...@@ -57,19 +60,23 @@ typedef struct ...@@ -57,19 +60,23 @@ typedef struct
pg_time_t startTime; pg_time_t startTime;
/* /*
* receiveStart is the first byte position that will be received. When * receiveStart and receiveStartTLI indicate the first byte position
* startup process starts the walreceiver, it sets receiveStart to the * and timeline that will be received. When startup process starts the
* point where it wants the streaming to begin. * walreceiver, it sets these to the point where it wants the streaming
* to begin.
*/ */
XLogRecPtr receiveStart; XLogRecPtr receiveStart;
TimeLineID receiveStartTLI;
/* /*
* receivedUpto-1 is the last byte position that has already been * receivedUpto-1 is the last byte position that has already been
* received. At the first startup of walreceiver, receivedUpto is set to * received, and receivedTLI is the timeline it came from. At the first
* receiveStart. After that, walreceiver updates this whenever it flushes * startup of walreceiver, these are set to receiveStart and
* the received WAL to disk. * receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk.
*/ */
XLogRecPtr receivedUpto; XLogRecPtr receivedUpto;
TimeLineID receivedTLI;
/* /*
* latestChunkStart is the starting byte position of the current "batch" * latestChunkStart is the starting byte position of the current "batch"
...@@ -97,16 +104,34 @@ typedef struct ...@@ -97,16 +104,34 @@ typedef struct
char conninfo[MAXCONNINFO]; char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */ slock_t mutex; /* locks shared variables shown above */
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI).
*/
Latch latch;
} WalRcvData; } WalRcvData;
extern WalRcvData *WalRcv; extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */ /* libpqwalreceiver hooks */
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); typedef void (*walrcv_connect_type) (char *conninfo);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect; extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
char **buffer, int *len); extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (void);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
typedef int (*walrcv_receive_type) (int timeout, char **buffer);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive; extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes); typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
...@@ -122,9 +147,10 @@ extern void WalReceiverMain(void) __attribute__((noreturn)); ...@@ -122,9 +147,10 @@ extern void WalReceiverMain(void) __attribute__((noreturn));
extern Size WalRcvShmemSize(void); extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void); extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void); extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void); extern bool WalRcvStreaming(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern bool WalRcvRunning(void);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void); extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void); extern int GetReplicationTransferLatency(void);
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
/* global state */ /* global state */
extern bool am_walsender; extern bool am_walsender;
extern bool am_cascading_walsender; extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_ready_to_stop;
extern bool wake_wal_senders; extern bool wake_wal_senders;
/* user-settable parameters */ /* user-settable parameters */
......
...@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl; ...@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;
extern void WalSndSetState(WalSndState state); extern void WalSndSetState(WalSndState state);
extern void XLogRead(char *buf, XLogRecPtr startptr, Size count); extern void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count);
/* /*
* Internal functions for parsing the replication grammar, in repl_gram.y and * Internal functions for parsing the replication grammar, in repl_gram.y and
......
...@@ -2245,7 +2245,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) ...@@ -2245,7 +2245,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
{ {
if (!conn) if (!conn)
return -1; return -1;
if (conn->asyncStatus != PGASYNC_COPY_IN) if (conn->asyncStatus != PGASYNC_COPY_IN &&
conn->asyncStatus != PGASYNC_COPY_BOTH)
{ {
printfPQExpBuffer(&conn->errorMessage, printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n")); libpq_gettext("no COPY in progress\n"));
...@@ -2305,6 +2306,9 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) ...@@ -2305,6 +2306,9 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
} }
/* Return to active duty */ /* Return to active duty */
if (conn->asyncStatus == PGASYNC_COPY_BOTH)
conn->asyncStatus = PGASYNC_COPY_OUT;
else
conn->asyncStatus = PGASYNC_BUSY; conn->asyncStatus = PGASYNC_BUSY;
resetPQExpBuffer(&conn->errorMessage); resetPQExpBuffer(&conn->errorMessage);
......
...@@ -1484,7 +1484,12 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) ...@@ -1484,7 +1484,12 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
* expect the state was already changed. * expect the state was already changed.
*/ */
if (msgLength == -1) if (msgLength == -1)
{
if (conn->asyncStatus == PGASYNC_COPY_BOTH)
conn->asyncStatus = PGASYNC_COPY_IN;
else
conn->asyncStatus = PGASYNC_BUSY; conn->asyncStatus = PGASYNC_BUSY;
}
return msgLength; /* end-of-copy or error */ return msgLength; /* end-of-copy or error */
} }
if (msgLength == 0) if (msgLength == 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment