Commit d3d41469 authored by Robert Haas's avatar Robert Haas

Allow bidirectional copy messages in streaming replication mode.

Fujii Masao.  Review by Alvaro Herrera, Tom Lane, and myself.
parent 20f39642
......@@ -2194,6 +2194,16 @@ ExecStatusType PQresultStatus(const PGresult *res);
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-copy-both">
<term><literal>PGRES_COPY_BOTH</literal></term>
<listitem>
<para>
Copy In/Out (to and from server) data transfer started. This is
currently used only for streaming replication.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-bad-response">
<term><literal>PGRES_BAD_RESPONSE</literal></term>
<listitem>
......
......@@ -1033,12 +1033,25 @@
</para>
<para>
The CopyInResponse and CopyOutResponse messages include fields that
inform the frontend of the number of columns per row and the format
codes being used for each column. (As of the present implementation,
all columns in a given <command>COPY</> operation will use the same
format, but the message design does not assume this.)
There is another Copy-related mode called Copy-both, which allows
high-speed bulk data transfer to <emphasis>and</> from the server.
Copy-both mode is initiated when a backend in walsender mode
executes a <command>START_REPLICATION</command> statement. The
backend sends a CopyBothResponse message to the frontend. Both
the backend and the frontend may then send CopyData messages
until the connection is terminated. See see <xref
linkend="protocol-replication">.
</para>
<para>
The CopyInResponse, CopyOutResponse and CopyBothResponse messages
include fields that inform the frontend of the number of columns
per row and the format codes being used for each column. (As of
the present implementation, all columns in a given <command>COPY</>
operation will use the same format, but the message design does not
assume this.)
</para>
</sect2>
<sect2 id="protocol-async">
......@@ -1344,7 +1357,7 @@ The commands accepted in walsender mode are:
WAL position <replaceable>XXX</>/<replaceable>XXX</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
CopyBothResponse message, and then starts to stream WAL to the frontend.
WAL will continue to be streamed until the connection is broken;
no further commands will be accepted.
</para>
......@@ -2694,6 +2707,79 @@ CopyOutResponse (B)
</varlistentry>
<varlistentry>
<term>
CopyBothResponse (B)
</term>
<listitem>
<para>
<variablelist>
<varlistentry>
<term>
Byte1('W')
</term>
<listitem>
<para>
Identifies the message as a Start Copy Both response.
This message is used only for Streaming Replication.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int32
</term>
<listitem>
<para>
Length of message contents in bytes, including self.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int8
</term>
<listitem>
<para>
0 indicates the overall <command>COPY</command> format
is textual (rows separated by newlines, columns
separated by separator characters, etc). 1 indicates
the overall copy format is binary (similar to DataRow
format). See <xref linkend="sql-copy"> for more information.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int16
</term>
<listitem>
<para>
The number of columns in the data to be copied
(denoted <replaceable>N</> below).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int16[<replaceable>N</>]
</term>
<listitem>
<para>
The format codes to be used for each column.
Each must presently be zero (text) or one (binary).
All must be zero if the overall copy format is textual.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
DataRow (B)
......
......@@ -50,6 +50,7 @@ static char *recvBuf = NULL;
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
......@@ -64,10 +65,11 @@ _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
walrcv_disconnect != NULL)
walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
......@@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
......@@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query)
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
......@@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
return true;
}
/*
* Send a message to XLOG stream.
*
* ereports on error.
*/
static void
libpqrcv_send(const char *buffer, int nbytes)
{
if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
PQflush(streamConn))
ereport(ERROR,
(errmsg("could not send data to WAL stream: %s",
PQerrorMessage(streamConn))));
}
......@@ -57,6 +57,7 @@ bool am_walreceiver;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
......@@ -247,7 +248,7 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
walrcv_disconnect == NULL)
walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
......
......@@ -287,8 +287,8 @@ WalSndHandshake(void)
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
/* Send a CopyOutResponse message, and start streaming */
pq_beginmessage(&buf, 'H');
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
......
......@@ -84,6 +84,9 @@ typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
extern PGDLLIMPORT walrcv_send_type walrcv_send;
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
......
......@@ -35,6 +35,7 @@ char *const pgresStatus[] = {
"PGRES_TUPLES_OK",
"PGRES_COPY_OUT",
"PGRES_COPY_IN",
"PGRES_COPY_BOTH",
"PGRES_BAD_RESPONSE",
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR"
......@@ -174,6 +175,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_TUPLES_OK:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
/* non-error cases */
break;
default:
......@@ -1591,6 +1593,12 @@ PQgetResult(PGconn *conn)
else
res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
break;
case PGASYNC_COPY_BOTH:
if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
res = pqPrepareAsyncResult(conn);
else
res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
break;
default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("unexpected asyncStatus: %d\n"),
......@@ -1775,6 +1783,13 @@ PQexecStart(PGconn *conn)
return false;
}
}
else if (resultStatus == PGRES_COPY_BOTH)
{
/* We don't allow PQexec during COPY BOTH */
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("PQexec not allowed during COPY BOTH\n"));
return false;
}
/* check for loss of connection, too */
if (conn->status == CONNECTION_BAD)
return false;
......@@ -1798,7 +1813,7 @@ PQexecFinish(PGconn *conn)
* than one --- but merge error messages if we get more than one error
* result.
*
* We have to stop if we see copy in/out, however. We will resume parsing
* We have to stop if we see copy in/out/both, however. We will resume parsing
* after application performs the data transfer.
*
* Also stop if the connection is lost (else we'll loop infinitely).
......@@ -1827,6 +1842,7 @@ PQexecFinish(PGconn *conn)
lastResult = result;
if (result->resultStatus == PGRES_COPY_IN ||
result->resultStatus == PGRES_COPY_OUT ||
result->resultStatus == PGRES_COPY_BOTH ||
conn->status == CONNECTION_BAD)
break;
}
......@@ -2000,7 +2016,7 @@ PQnotifies(PGconn *conn)
}
/*
* PQputCopyData - send some data to the backend during COPY IN
* PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
*
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
......@@ -2010,7 +2026,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
{
if (!conn)
return -1;
if (conn->asyncStatus != PGASYNC_COPY_IN)
if (conn->asyncStatus != PGASYNC_COPY_IN &&
conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
......@@ -2148,6 +2165,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
......@@ -2161,7 +2179,8 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
*buffer = NULL; /* for all failure cases */
if (!conn)
return -2;
if (conn->asyncStatus != PGASYNC_COPY_OUT)
if (conn->asyncStatus != PGASYNC_COPY_OUT &&
conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
......
......@@ -541,6 +541,10 @@ pqParseInput2(PGconn *conn)
case 'H': /* Start Copy Out */
conn->asyncStatus = PGASYNC_COPY_OUT;
break;
/*
* Don't need to process CopyBothResponse here because
* it never arrives from the server during protocol 2.0.
*/
default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
......
......@@ -358,6 +358,12 @@ pqParseInput3(PGconn *conn)
conn->asyncStatus = PGASYNC_COPY_OUT;
conn->copy_already_done = 0;
break;
case 'W': /* Start Copy Both */
if (getCopyStart(conn, PGRES_COPY_BOTH))
return;
conn->asyncStatus = PGASYNC_COPY_BOTH;
conn->copy_already_done = 0;
break;
case 'd': /* Copy Data */
/*
......@@ -1196,7 +1202,8 @@ getNotify(PGconn *conn)
}
/*
* getCopyStart - process CopyInResponse or CopyOutResponse message
* getCopyStart - process CopyInResponse, CopyOutResponse or
* CopyBothResponse message
*
* parseInput already read the message type and length.
*/
......@@ -1367,6 +1374,7 @@ getCopyDataMessage(PGconn *conn)
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
......@@ -1390,10 +1398,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
if (msgLength < 0)
{
/*
* On end-of-copy, exit COPY_OUT mode and let caller read status
* with PQgetResult(). The normal case is that it's Copy Done,
* but we let parseInput read that. If error, we expect the state
* was already changed.
* On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
* read status with PQgetResult(). The normal case is that it's
* Copy Done, but we let parseInput read that. If error, we expect
* the state was already changed.
*/
if (msgLength == -1)
conn->asyncStatus = PGASYNC_BUSY;
......
......@@ -85,6 +85,7 @@ typedef enum
* contains the result tuples */
PGRES_COPY_OUT, /* Copy Out data transfer in progress */
PGRES_COPY_IN, /* Copy In data transfer in progress */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_BAD_RESPONSE, /* an unexpected response was recv'd from the
* backend */
PGRES_NONFATAL_ERROR, /* notice or warning message */
......
......@@ -218,7 +218,8 @@ typedef enum
PGASYNC_BUSY, /* query in progress */
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT /* Copy Out data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment