Commit 78c8c814 authored by Peter Eisentraut's avatar Peter Eisentraut

Refactor libpqwalreceiver

The whole walreceiver API is now wrapped into a struct, like most of our
other loadable module APIs.  The libpq connection is no longer a global
variable in libpqwalreceiver.  Instead, it is encapsulated into a struct
that is passed around the functions.  This allows multiple walreceivers
to run at the same time.

Add some rudimentary support for logical replication connections to
libpqwalreceiver.

These changes are mostly cosmetic and are going to be useful for the
future logical replication patches.

From: Petr Jelinek <petr@2ndquadrant.com>
parent 597a87cc
...@@ -32,59 +32,72 @@ PG_MODULE_MAGIC; ...@@ -32,59 +32,72 @@ PG_MODULE_MAGIC;
void _PG_init(void); void _PG_init(void);
/* Current connection to the primary, if any */ struct WalReceiverConn
static PGconn *streamConn = NULL; {
/* Current connection to the primary, if any */
/* Buffer for currently read records */ PGconn *streamConn;
static char *recvBuf = NULL; /* Used to remember if the connection is logical or physical */
bool logical;
/* Buffer for currently read records */
char *recvBuf;
};
/* Prototypes for interface functions */ /* Prototypes for interface functions */
static void libpqrcv_connect(char *conninfo); static WalReceiverConn *libpqrcv_connect(const char *conninfo,
static char *libpqrcv_get_conninfo(void); bool logical, const char *appname);
static void libpqrcv_identify_system(TimeLineID *primary_tli); static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); static char *libpqrcv_identify_system(WalReceiverConn *conn,
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, TimeLineID *primary_tli);
char *slotname); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
static void libpqrcv_endstreaming(TimeLineID *next_tli); TimeLineID tli, char **filename,
static int libpqrcv_receive(char **buffer, pgsocket *wait_fd); char **content, int *len);
static void libpqrcv_send(const char *buffer, int nbytes); static bool libpqrcv_startstreaming(WalReceiverConn *conn,
static void libpqrcv_disconnect(void); TimeLineID tli, XLogRecPtr startpoint,
const char *slotname);
static void libpqrcv_endstreaming(WalReceiverConn *conn,
TimeLineID *next_tli);
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
int nbytes);
static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect,
libpqrcv_get_conninfo,
libpqrcv_identify_system,
libpqrcv_readtimelinehistoryfile,
libpqrcv_startstreaming,
libpqrcv_endstreaming,
libpqrcv_receive,
libpqrcv_send,
libpqrcv_disconnect
};
/* Prototypes for private functions */ /* Prototypes for private functions */
static PGresult *libpqrcv_PQexec(const char *query); static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
/* /*
* Module load callback * Module initialization function
*/ */
void void
_PG_init(void) _PG_init(void)
{ {
/* Tell walreceiver how to reach us */ if (WalReceiverFunctions != NULL)
if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
walrcv_readtimelinehistoryfile != NULL ||
walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
walrcv_receive != NULL || walrcv_send != NULL ||
walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded"); elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect; WalReceiverFunctions = &PQWalReceiverFunctions;
walrcv_get_conninfo = libpqrcv_get_conninfo;
walrcv_identify_system = libpqrcv_identify_system;
walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
walrcv_startstreaming = libpqrcv_startstreaming;
walrcv_endstreaming = libpqrcv_endstreaming;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
} }
/* /*
* Establish the connection to the primary server for XLOG streaming * Establish the connection to the primary server for XLOG streaming
*/ */
static void static WalReceiverConn *
libpqrcv_connect(char *conninfo) libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
{ {
WalReceiverConn *conn;
const char *keys[5]; const char *keys[5];
const char *vals[5]; const char *vals[5];
int i = 0;
/* /*
* We use the expand_dbname parameter to process the connection string (or * We use the expand_dbname parameter to process the connection string (or
...@@ -93,22 +106,29 @@ libpqrcv_connect(char *conninfo) ...@@ -93,22 +106,29 @@ libpqrcv_connect(char *conninfo)
* database name is ignored by the server in replication mode, but specify * database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup. * "replication" for .pgpass lookup.
*/ */
keys[0] = "dbname"; keys[i] = "dbname";
vals[0] = conninfo; vals[i] = conninfo;
keys[1] = "replication"; keys[++i] = "replication";
vals[1] = "true"; vals[i] = logical ? "database" : "true";
keys[2] = "dbname"; if (!logical)
vals[2] = "replication"; {
keys[3] = "fallback_application_name"; keys[++i] = "dbname";
vals[3] = "walreceiver"; vals[i] = "replication";
keys[4] = NULL; }
vals[4] = NULL; keys[++i] = "fallback_application_name";
vals[i] = appname;
streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); keys[++i] = NULL;
if (PQstatus(streamConn) != CONNECTION_OK) vals[i] = NULL;
conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the primary server: %s", (errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
conn->logical = logical;
return conn;
} }
/* /*
...@@ -116,17 +136,17 @@ libpqrcv_connect(char *conninfo) ...@@ -116,17 +136,17 @@ libpqrcv_connect(char *conninfo)
* are obfuscated. * are obfuscated.
*/ */
static char * static char *
libpqrcv_get_conninfo(void) libpqrcv_get_conninfo(WalReceiverConn *conn)
{ {
PQconninfoOption *conn_opts; PQconninfoOption *conn_opts;
PQconninfoOption *conn_opt; PQconninfoOption *conn_opt;
PQExpBufferData buf; PQExpBufferData buf;
char *retval; char *retval;
Assert(streamConn != NULL); Assert(conn->streamConn != NULL);
initPQExpBuffer(&buf); initPQExpBuffer(&buf);
conn_opts = PQconninfo(streamConn); conn_opts = PQconninfo(conn->streamConn);
if (conn_opts == NULL) if (conn_opts == NULL)
ereport(ERROR, ereport(ERROR,
...@@ -164,25 +184,24 @@ libpqrcv_get_conninfo(void) ...@@ -164,25 +184,24 @@ libpqrcv_get_conninfo(void)
* Check that primary's system identifier matches ours, and fetch the current * Check that primary's system identifier matches ours, and fetch the current
* timeline ID of the primary. * timeline ID of the primary.
*/ */
static void static char *
libpqrcv_identify_system(TimeLineID *primary_tli) libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
{ {
PGresult *res; PGresult *res;
char *primary_sysid; char *primary_sysid;
char standby_sysid[32];
/* /*
* Get the system identifier and timeline ID as a DataRow message from the * Get the system identifier and timeline ID as a DataRow message from the
* primary server. * primary server.
*/ */
res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from " (errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s", "the primary server: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
if (PQnfields(res) < 3 || PQntuples(res) != 1) if (PQnfields(res) < 3 || PQntuples(res) != 1)
{ {
...@@ -195,24 +214,11 @@ libpqrcv_identify_system(TimeLineID *primary_tli) ...@@ -195,24 +214,11 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1))); ntuples, nfields, 3, 1)));
} }
primary_sysid = PQgetvalue(res, 0, 0); primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
* Confirm that the system identifier of the primary is the same as ours.
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
primary_sysid = pstrdup(primary_sysid);
PQclear(res);
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
PQclear(res); PQclear(res);
return primary_sysid;
} }
/* /*
...@@ -226,21 +232,30 @@ libpqrcv_identify_system(TimeLineID *primary_tli) ...@@ -226,21 +232,30 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
* throws an ERROR. * throws an ERROR.
*/ */
static bool static bool
libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) libpqrcv_startstreaming(WalReceiverConn *conn,
TimeLineID tli, XLogRecPtr startpoint,
const char *slotname)
{ {
char cmd[256]; StringInfoData cmd;
PGresult *res; PGresult *res;
Assert(!conn->logical);
initStringInfo(&cmd);
/* Start streaming from the point requested by startup process */ /* Start streaming from the point requested by startup process */
if (slotname != NULL) if (slotname != NULL)
snprintf(cmd, sizeof(cmd), appendStringInfo(&cmd,
"START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname, "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint, tli); slotname,
(uint32) (startpoint >> 32), (uint32) startpoint,
tli);
else else
snprintf(cmd, sizeof(cmd), appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
"START_REPLICATION %X/%X TIMELINE %u", (uint32) (startpoint >> 32), (uint32) startpoint,
(uint32) (startpoint >> 32), (uint32) startpoint, tli); tli);
res = libpqrcv_PQexec(cmd); res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) == PGRES_COMMAND_OK) if (PQresultStatus(res) == PGRES_COMMAND_OK)
{ {
...@@ -252,7 +267,7 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) ...@@ -252,7 +267,7 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not start WAL streaming: %s", (errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
PQclear(res); PQclear(res);
return true; return true;
...@@ -263,14 +278,17 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) ...@@ -263,14 +278,17 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
* reported by the server, or 0 if it did not report it. * reported by the server, or 0 if it did not report it.
*/ */
static void static void
libpqrcv_endstreaming(TimeLineID *next_tli) libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
{ {
PGresult *res; PGresult *res;
if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR, ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s", (errmsg("could not send end-of-streaming message to primary: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
*next_tli = 0;
/* /*
* After COPY is finished, we should receive a result set indicating the * After COPY is finished, we should receive a result set indicating the
...@@ -282,7 +300,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli) ...@@ -282,7 +300,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
* called after receiving CopyDone from the backend - the walreceiver * called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative. * never terminates replication on its own initiative.
*/ */
res = PQgetResult(streamConn); res = PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) if (PQresultStatus(res) == PGRES_TUPLES_OK)
{ {
/* /*
...@@ -296,47 +314,58 @@ libpqrcv_endstreaming(TimeLineID *next_tli) ...@@ -296,47 +314,58 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
PQclear(res); PQclear(res);
/* the result set should be followed by CommandComplete */ /* the result set should be followed by CommandComplete */
res = PQgetResult(streamConn); res = PQgetResult(conn->streamConn);
}
else if (PQresultStatus(res) == PGRES_COPY_OUT)
{
PQclear(res);
/* End the copy */
PQendcopy(conn->streamConn);
/* CommandComplete should follow */
res = PQgetResult(conn->streamConn);
} }
else
*next_tli = 0;
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR, ereport(ERROR,
(errmsg("error reading result of streaming command: %s", (errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
PQclear(res); PQclear(res);
/* Verify that there are no more results */ /* Verify that there are no more results */
res = PQgetResult(streamConn); res = PQgetResult(conn->streamConn);
if (res != NULL) if (res != NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s", (errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
/* /*
* Fetch the timeline history file for 'tli' from primary. * Fetch the timeline history file for 'tli' from primary.
*/ */
static void static void
libpqrcv_readtimelinehistoryfile(TimeLineID tli, libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
char **filename, char **content, int *len) TimeLineID tli, char **filename,
char **content, int *len)
{ {
PGresult *res; PGresult *res;
char cmd[64]; char cmd[64];
Assert(!conn->logical);
/* /*
* Request the primary to send over the history file for given timeline. * Request the primary to send over the history file for given timeline.
*/ */
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
res = libpqrcv_PQexec(cmd); res = libpqrcv_PQexec(conn->streamConn, cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive timeline history file from " (errmsg("could not receive timeline history file from "
"the primary server: %s", "the primary server: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
if (PQnfields(res) != 2 || PQntuples(res) != 1) if (PQnfields(res) != 2 || PQntuples(res) != 1)
{ {
...@@ -374,7 +403,7 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli, ...@@ -374,7 +403,7 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
* Queries are always executed on the connection in streamConn. * Queries are always executed on the connection in streamConn.
*/ */
static PGresult * static PGresult *
libpqrcv_PQexec(const char *query) libpqrcv_PQexec(PGconn *streamConn, const char *query)
{ {
PGresult *result = NULL; PGresult *result = NULL;
PGresult *lastResult = NULL; PGresult *lastResult = NULL;
...@@ -455,10 +484,12 @@ libpqrcv_PQexec(const char *query) ...@@ -455,10 +484,12 @@ libpqrcv_PQexec(const char *query)
* Disconnect connection to primary, if any. * Disconnect connection to primary, if any.
*/ */
static void static void
libpqrcv_disconnect(void) libpqrcv_disconnect(WalReceiverConn *conn)
{ {
PQfinish(streamConn); PQfinish(conn->streamConn);
streamConn = NULL; if (conn->recvBuf != NULL)
PQfreemem(conn->recvBuf);
pfree(conn);
} }
/* /*
...@@ -478,30 +509,31 @@ libpqrcv_disconnect(void) ...@@ -478,30 +509,31 @@ libpqrcv_disconnect(void)
* ereports on error. * ereports on error.
*/ */
static int static int
libpqrcv_receive(char **buffer, pgsocket *wait_fd) libpqrcv_receive(WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd)
{ {
int rawlen; int rawlen;
if (recvBuf != NULL) if (conn->recvBuf != NULL)
PQfreemem(recvBuf); PQfreemem(conn->recvBuf);
recvBuf = NULL; conn->recvBuf = NULL;
/* Try to receive a CopyData message */ /* Try to receive a CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
if (rawlen == 0) if (rawlen == 0)
{ {
/* Try consuming some data. */ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0) if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
/* Now that we've consumed some input, try again */ /* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
if (rawlen == 0) if (rawlen == 0)
{ {
/* Tell caller to try again when our socket is ready. */ /* Tell caller to try again when our socket is ready. */
*wait_fd = PQsocket(streamConn); *wait_fd = PQsocket(conn->streamConn);
return 0; return 0;
} }
} }
...@@ -509,7 +541,7 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) ...@@ -509,7 +541,7 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
{ {
PGresult *res; PGresult *res;
res = PQgetResult(streamConn); res = PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK || if (PQresultStatus(res) == PGRES_COMMAND_OK ||
PQresultStatus(res) == PGRES_COPY_IN) PQresultStatus(res) == PGRES_COPY_IN)
{ {
...@@ -521,16 +553,16 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) ...@@ -521,16 +553,16 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
} }
if (rawlen < -1) if (rawlen < -1)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
/* Return received messages to caller */ /* Return received messages to caller */
*buffer = recvBuf; *buffer = conn->recvBuf;
return rawlen; return rawlen;
} }
...@@ -540,11 +572,11 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) ...@@ -540,11 +572,11 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
* ereports on error. * ereports on error.
*/ */
static void static void
libpqrcv_send(const char *buffer, int nbytes) libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
{ {
if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
PQflush(streamConn)) PQflush(conn->streamConn))
ereport(ERROR, ereport(ERROR,
(errmsg("could not send data to WAL stream: %s", (errmsg("could not send data to WAL stream: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(conn->streamConn))));
} }
...@@ -74,16 +74,9 @@ int wal_receiver_status_interval; ...@@ -74,16 +74,9 @@ int wal_receiver_status_interval;
int wal_receiver_timeout; int wal_receiver_timeout;
bool hot_standby_feedback; bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */ /* libpqwalreceiver connection */
walrcv_connect_type walrcv_connect = NULL; static WalReceiverConn *wrconn = NULL;
walrcv_get_conninfo_type walrcv_get_conninfo = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL;
walrcv_identify_system_type walrcv_identify_system = NULL;
walrcv_startstreaming_type walrcv_startstreaming = NULL;
walrcv_endstreaming_type walrcv_endstreaming = NULL;
walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
...@@ -286,14 +279,7 @@ WalReceiverMain(void) ...@@ -286,14 +279,7 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */ /* Load the libpq-specific functions */
load_file("libpqwalreceiver", false); load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || if (WalReceiverFunctions == NULL)
walrcv_get_conninfo == NULL ||
walrcv_startstreaming == NULL ||
walrcv_endstreaming == NULL ||
walrcv_identify_system == NULL ||
walrcv_readtimelinehistoryfile == NULL ||
walrcv_receive == NULL || walrcv_send == NULL ||
walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly"); elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/* /*
...@@ -307,14 +293,14 @@ WalReceiverMain(void) ...@@ -307,14 +293,14 @@ WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */ /* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit(); EnableWalRcvImmediateExit();
walrcv_connect(conninfo); wrconn = walrcv_connect(conninfo, false, "walreceiver");
DisableWalRcvImmediateExit(); DisableWalRcvImmediateExit();
/* /*
* Save user-visible connection string. This clobbers the original * Save user-visible connection string. This clobbers the original
* conninfo, for security. * conninfo, for security.
*/ */
tmp_conninfo = walrcv_get_conninfo(); tmp_conninfo = walrcv_get_conninfo(wrconn);
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
memset(walrcv->conninfo, 0, MAXCONNINFO); memset(walrcv->conninfo, 0, MAXCONNINFO);
if (tmp_conninfo) if (tmp_conninfo)
...@@ -328,12 +314,25 @@ WalReceiverMain(void) ...@@ -328,12 +314,25 @@ WalReceiverMain(void)
first_stream = true; first_stream = true;
for (;;) for (;;)
{ {
char *primary_sysid;
char standby_sysid[32];
/* /*
* Check that we're connected to a valid server using the * Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command, * IDENTIFY_SYSTEM replication command,
*/ */
EnableWalRcvImmediateExit(); EnableWalRcvImmediateExit();
walrcv_identify_system(&primaryTLI); primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
DisableWalRcvImmediateExit(); DisableWalRcvImmediateExit();
/* /*
...@@ -370,7 +369,7 @@ WalReceiverMain(void) ...@@ -370,7 +369,7 @@ WalReceiverMain(void)
* on the new timeline. * on the new timeline.
*/ */
ThisTimeLineID = startpointTLI; ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(startpointTLI, startpoint, if (walrcv_startstreaming(wrconn, startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL)) slotname[0] != '\0' ? slotname : NULL))
{ {
if (first_stream) if (first_stream)
...@@ -422,7 +421,7 @@ WalReceiverMain(void) ...@@ -422,7 +421,7 @@ WalReceiverMain(void)
} }
/* See if we can read data immediately */ /* See if we can read data immediately */
len = walrcv_receive(&buf, &wait_fd); len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0) if (len != 0)
{ {
/* /*
...@@ -453,7 +452,7 @@ WalReceiverMain(void) ...@@ -453,7 +452,7 @@ WalReceiverMain(void)
endofwal = true; endofwal = true;
break; break;
} }
len = walrcv_receive(&buf, &wait_fd); len = walrcv_receive(wrconn, &buf, &wait_fd);
} }
/* Let the master know that we received some data. */ /* Let the master know that we received some data. */
...@@ -570,7 +569,7 @@ WalReceiverMain(void) ...@@ -570,7 +569,7 @@ WalReceiverMain(void)
* our side, too. * our side, too.
*/ */
EnableWalRcvImmediateExit(); EnableWalRcvImmediateExit();
walrcv_endstreaming(&primaryTLI); walrcv_endstreaming(wrconn, &primaryTLI);
DisableWalRcvImmediateExit(); DisableWalRcvImmediateExit();
/* /*
...@@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) ...@@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
tli))); tli)));
EnableWalRcvImmediateExit(); EnableWalRcvImmediateExit();
walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
DisableWalRcvImmediateExit(); DisableWalRcvImmediateExit();
/* /*
...@@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg) ...@@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg)
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */ /* Terminate the connection gracefully. */
if (walrcv_disconnect != NULL) if (wrconn != NULL)
walrcv_disconnect(); walrcv_disconnect(wrconn);
/* Wake up the startup process to notice promptly that we're gone */ /* Wake up the startup process to notice promptly that we're gone */
WakeupRecovery(); WakeupRecovery();
...@@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) ...@@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
(uint32) (applyPtr >> 32), (uint32) applyPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr,
requestReply ? " (reply requested)" : ""); requestReply ? " (reply requested)" : "");
walrcv_send(reply_message.data, reply_message.len); walrcv_send(wrconn, reply_message.data, reply_message.len);
} }
/* /*
...@@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed) ...@@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed)
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4); pq_sendint(&reply_message, nextEpoch, 4);
walrcv_send(reply_message.data, reply_message.len); walrcv_send(wrconn, reply_message.data, reply_message.len);
if (TransactionIdIsValid(xmin)) if (TransactionIdIsValid(xmin))
master_has_standby_xmin = true; master_has_standby_xmin = true;
else else
......
...@@ -134,33 +134,64 @@ typedef struct ...@@ -134,33 +134,64 @@ typedef struct
extern WalRcvData *WalRcv; extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */ struct WalReceiverConn;
typedef void (*walrcv_connect_type) (char *conninfo); typedef struct WalReceiverConn WalReceiverConn;
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef char *(*walrcv_get_conninfo_type) (void);
extern PGDLLIMPORT walrcv_get_conninfo_type walrcv_get_conninfo;
typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); /* libpqwalreceiver hooks */
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
const char *appname);
typedef int (*walrcv_receive_type) (char **buffer, pgsocket *wait_fd); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
typedef void (*walrcv_send_type) (const char *buffer, int nbytes); typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
extern PGDLLIMPORT walrcv_send_type walrcv_send; TimeLineID tli,
char **filename,
typedef void (*walrcv_disconnect_type) (void); char **content, int *size);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
TimeLineID tli,
XLogRecPtr startpoint,
const char *slotname);
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
TimeLineID *next_tli);
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn connect;
walrcv_get_conninfo_fn get_conninfo;
walrcv_identify_system_fn identify_system;
walrcv_readtimelinehistoryfile_fn readtimelinehistoryfile;
walrcv_startstreaming_fn startstreaming;
walrcv_endstreaming_fn endstreaming;
walrcv_receive_fn receive;
walrcv_send_fn send;
walrcv_disconnect_fn disconnect;
} WalReceiverFunctionsType;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
#define walrcv_connect(conninfo, logical, appname) \
WalReceiverFunctions->connect(conninfo, logical, appname)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->get_conninfo(conn)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->identify_system(conn, primary_tli)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
WalReceiverFunctions->readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, tli, startpoint, slotname) \
WalReceiverFunctions->startstreaming(conn, tli, startpoint, slotname)
#define walrcv_endstreaming(conn, next_tli) \
WalReceiverFunctions->endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd) \
WalReceiverFunctions->receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->send(conn, buffer, nbytes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->disconnect(conn)
/* prototypes for functions in walreceiver.c */ /* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn(); extern void WalReceiverMain(void) pg_attribute_noreturn();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment