Commit 808969d0 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Add a message type header to the CopyData messages sent from primary

to standby in streaming replication. While we only have one message type
at the moment, adding a message type header makes this easier to extend.
parent 47c5b8f5
<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.77 2010/01/15 09:18:59 heikki Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.78 2010/02/03 09:47:19 heikki Exp $ -->
<chapter id="protocol"> <chapter id="protocol">
<title>Frontend/Backend Protocol</title> <title>Frontend/Backend Protocol</title>
...@@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are: ...@@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are:
already been recycled. On success, server responds with a already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData CopyOutResponse message, and backend starts to stream WAL as CopyData
messages. messages.
The payload in CopyData message consists of the following format.
</para> </para>
<para> <para>
The payload in each CopyData message consists of an XLogRecPtr, <variablelist>
indicating the starting point of the WAL in the message, immediately <varlistentry>
followed by the WAL data itself. <term>
XLogData (B)
</term>
<listitem>
<para>
<variablelist>
<varlistentry>
<term>
Byte1('w')
</term>
<listitem>
<para>
Identifies the message as WAL data.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int32
</term>
<listitem>
<para>
The log file number of the LSN, indicating the starting point of
the WAL in the message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int32
</term>
<listitem>
<para>
The byte offset of the LSN, indicating the starting point of
the WAL in the message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte<replaceable>n</replaceable>
</term>
<listitem>
<para>
Data that forms part of WAL data stream.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
</variablelist>
</para> </para>
<para> <para>
A single WAL record is never split across two CopyData messages. When A single WAL record is never split across two CopyData messages. When
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.2 2010/01/20 11:58:44 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -48,8 +48,8 @@ static char *recvBuf = NULL; ...@@ -48,8 +48,8 @@ static char *recvBuf = NULL;
/* Prototypes for interface functions */ /* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, static bool libpqrcv_receive(int timeout, unsigned char *type,
int *len); char **buffer, int *len);
static void libpqrcv_disconnect(void); static void libpqrcv_disconnect(void);
/* Prototypes for private functions */ /* Prototypes for private functions */
...@@ -236,13 +236,13 @@ libpqrcv_disconnect(void) ...@@ -236,13 +236,13 @@ libpqrcv_disconnect(void)
} }
/* /*
* Receive any WAL records available from XLOG stream, blocking for * Receive a message available from XLOG stream, blocking for
* maximum of 'timeout' ms. * maximum of 'timeout' ms.
* *
* Returns: * Returns:
* *
* True if data was received. *recptr, *buffer and *len are set to * True if data was received. *type, *buffer and *len are set to
* the WAL location of the received data, buffer holding it, and length, * the type of the received data, buffer holding it, and length,
* respectively. * respectively.
* *
* False if no data was available within timeout, or wait was interrupted * False if no data was available within timeout, or wait was interrupted
...@@ -254,7 +254,7 @@ libpqrcv_disconnect(void) ...@@ -254,7 +254,7 @@ libpqrcv_disconnect(void)
* ereports on error. * ereports on error.
*/ */
static bool static bool
libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{ {
int rawlen; int rawlen;
...@@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) ...@@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
if (PQconsumeInput(streamConn) == 0) if (PQconsumeInput(streamConn) == 0)
ereport(ERROR, ereport(ERROR,
(errmsg("could not read xlog records: %s", (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
} }
justconnected = false; justconnected = false;
/* Receive CopyData message */ /* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0) /* no records available yet, then return */ if (rawlen == 0) /* no data available yet, then return */
return false; return false;
if (rawlen == -1) /* end-of-streaming or error */ if (rawlen == -1) /* end-of-streaming or error */
{ {
...@@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) ...@@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
} }
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not read xlog records: %s", (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
} }
if (rawlen < -1) if (rawlen < -1)
ereport(ERROR, ereport(ERROR,
(errmsg("could not read xlog records: %s", (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
if (rawlen < sizeof(XLogRecPtr)) /* Return received messages to caller */
ereport(ERROR, *type = *((unsigned char *) recvBuf);
(errmsg("invalid WAL message received from primary"))); *buffer = recvBuf + sizeof(*type);
*len = rawlen - sizeof(*type);
/* Return received WAL records to caller */
*recptr = *((XLogRecPtr *) recvBuf);
*buffer = recvBuf + sizeof(XLogRecPtr);
*len = rawlen - sizeof(XLogRecPtr);
return true; return true;
} }
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS); ...@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */ /* Prototypes for private functions */
static void WalRcvDie(int code, Datum arg); static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void); static void XLogWalRcvFlush(void);
...@@ -258,7 +259,7 @@ WalReceiverMain(void) ...@@ -258,7 +259,7 @@ WalReceiverMain(void)
/* Loop until end-of-streaming or error */ /* Loop until end-of-streaming or error */
for (;;) for (;;)
{ {
XLogRecPtr recptr; unsigned char type;
char *buf; char *buf;
int len; int len;
...@@ -287,17 +288,17 @@ WalReceiverMain(void) ...@@ -287,17 +288,17 @@ WalReceiverMain(void)
} }
/* Wait a while for data to arrive */ /* Wait a while for data to arrive */
if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{ {
/* Write received WAL records to disk */ /* Accept the received data, and process it */
XLogWalRcvWrite(buf, len, recptr); XLogWalRcvProcessMsg(type, buf, len);
/* Receive any more WAL records we can without sleeping */ /* Receive any more data we can without sleeping */
while(walrcv_receive(0, &recptr, &buf, &len)) while(walrcv_receive(0, &type, &buf, &len))
XLogWalRcvWrite(buf, len, recptr); XLogWalRcvProcessMsg(type, buf, len);
/* /*
* Now that we've written some records, flush them to disk and * If we've written some records, flush them to disk and
* let the startup process know about them. * let the startup process know about them.
*/ */
XLogWalRcvFlush(); XLogWalRcvFlush();
...@@ -375,6 +376,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) ...@@ -375,6 +376,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
exit(2); exit(2);
} }
/*
* Accept the message from XLOG stream, and process it.
*/
static void
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
switch (type)
{
case 'w': /* WAL records */
{
XLogRecPtr recptr;
if (len < sizeof(XLogRecPtr))
ereport(ERROR,
(errmsg("invalid WAL message received from primary")));
recptr = *((XLogRecPtr *) buf);
buf += sizeof(XLogRecPtr);
len -= sizeof(XLogRecPtr);
XLogWalRcvWrite(buf, len, recptr);
break;
}
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid replication message type %d",
type)));
}
}
/* /*
* Write XLOG data to disk. * Write XLOG data to disk.
*/ */
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.4 2010/01/27 16:41:09 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.5 2010/02/03 09:47:19 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg) ...@@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg)
* have the same byte order. If they have different byte order, we * have the same byte order. If they have different byte order, we
* don't reach here. * don't reach here.
*/ */
pq_sendbyte(outMsg, 'w');
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
if (endptr.xlogid != startptr.xlogid) if (endptr.xlogid != startptr.xlogid)
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* *
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.6 2010/02/03 09:47:19 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -66,7 +66,8 @@ extern WalRcvData *WalRcv; ...@@ -66,7 +66,8 @@ extern WalRcvData *WalRcv;
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect; extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len); typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive; extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void); typedef void (*walrcv_disconnect_type) (void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment