Commit 0c013e08 authored by Andres Freund's avatar Andres Freund

Refactor replication connection code of various pg_basebackup utilities.

Move some more code to manage replication connection command to
streamutil.c. A later patch will introduce replication slot via
pg_receivexlog and this avoid duplicating relevant code between
pg_receivexlog and pg_recvlogical.

Author: Michael Paquier, with some editing by me.
parent fdf81c9a
...@@ -1569,8 +1569,8 @@ BaseBackup(void) ...@@ -1569,8 +1569,8 @@ BaseBackup(void)
{ {
PGresult *res; PGresult *res;
char *sysidentifier; char *sysidentifier;
uint32 latesttli; TimeLineID latesttli;
uint32 starttli; TimeLineID starttli;
char *basebkp; char *basebkp;
char escaped_label[MAXPGPATH]; char escaped_label[MAXPGPATH];
char *maxrate_clause = NULL; char *maxrate_clause = NULL;
...@@ -1624,23 +1624,8 @@ BaseBackup(void) ...@@ -1624,23 +1624,8 @@ BaseBackup(void)
/* /*
* Run IDENTIFY_SYSTEM so we can get the timeline * Run IDENTIFY_SYSTEM so we can get the timeline
*/ */
res = PQexec(conn, "IDENTIFY_SYSTEM"); if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
}
if (PQntuples(res) != 1 || PQnfields(res) < 3)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1);
}
sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
latesttli = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
/* /*
* Start the actual backup * Start the actual backup
......
...@@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli) ...@@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli)
static void static void
StreamLog(void) StreamLog(void)
{ {
PGresult *res; XLogRecPtr startpos, serverpos;
XLogRecPtr startpos; TimeLineID starttli, servertli;
uint32 starttli;
XLogRecPtr serverpos;
uint32 servertli;
uint32 hi,
lo;
/* /*
* Connect in replication mode to the server * Connect in replication mode to the server
...@@ -280,33 +275,12 @@ StreamLog(void) ...@@ -280,33 +275,12 @@ StreamLog(void)
} }
/* /*
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog * Identify server, obtaining start LSN position and current timeline ID
* position. * at the same time, necessary if not valid data can be found in the
* existing output directory.
*/ */
res = PQexec(conn, "IDENTIFY_SYSTEM"); if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1 || PQnfields(res) < 3)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1); disconnect_and_exit(1);
}
servertli = atoi(PQgetvalue(res, 0, 1));
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 2));
disconnect_and_exit(1);
}
serverpos = ((uint64) hi) << 32 | lo;
PQclear(res);
/* /*
* Figure out where to start streaming. * Figure out where to start streaming.
......
...@@ -596,7 +596,6 @@ sighup_handler(int signum) ...@@ -596,7 +596,6 @@ sighup_handler(int signum)
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
PGresult *res;
static struct option long_options[] = { static struct option long_options[] = {
/* general options */ /* general options */
{"file", required_argument, NULL, 'f'}, {"file", required_argument, NULL, 'f'},
...@@ -628,6 +627,7 @@ main(int argc, char **argv) ...@@ -628,6 +627,7 @@ main(int argc, char **argv)
int option_index; int option_index;
uint32 hi, uint32 hi,
lo; lo;
char *db_name;
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
...@@ -834,124 +834,62 @@ main(int argc, char **argv) ...@@ -834,124 +834,62 @@ main(int argc, char **argv)
#endif #endif
/* /*
* don't really need this but it actually helps to get more precise error * Obtain a connection to server. This is not really necessary but it
* messages about authentication, required GUCs and such without starting * helps to get more precise error messages about authentification,
* to loop around connection attempts lateron. * required GUC parameters and such.
*/ */
{
conn = GetConnection(); conn = GetConnection();
if (!conn) if (!conn)
/* Error message already written in GetConnection() */ /* Error message already written in GetConnection() */
exit(1); exit(1);
/* /*
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
* position. * replication connection.
*/ */
res = PQexec(conn, "IDENTIFY_SYSTEM"); if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
}
if (PQntuples(res) != 1 || PQnfields(res) < 4) if (db_name == NULL)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), _("%s: failed to establish database specific replication connection\n"),
progname, PQntuples(res), PQnfields(res), 1, 4); progname);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
PQclear(res);
}
/* /* Drop a replication slot. */
* drop a replication slot
*/
if (do_drop_slot) if (do_drop_slot)
{ {
char query[256];
if (verbose) if (verbose)
fprintf(stderr, fprintf(stderr,
_("%s: dropping replication slot \"%s\"\n"), _("%s: dropping replication slot \"%s\"\n"),
progname, replication_slot); progname, replication_slot);
snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", if (!DropReplicationSlot(conn, replication_slot))
replication_slot);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query, PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 0 || PQnfields(res) != 0) /* Create a replication slot. */
{
fprintf(stderr,
_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0);
disconnect_and_exit(1);
}
PQclear(res);
disconnect_and_exit(0);
}
/*
* create a replication slot
*/
if (do_create_slot) if (do_create_slot)
{ {
char query[256];
if (verbose) if (verbose)
fprintf(stderr, fprintf(stderr,
_("%s: creating replication slot \"%s\"\n"), _("%s: creating replication slot \"%s\"\n"),
progname, replication_slot); progname, replication_slot);
snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", if (!CreateReplicationSlot(conn, replication_slot, plugin,
replication_slot, plugin); &startpos, false))
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query, PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1 || PQnfields(res) != 4)
{
fprintf(stderr,
_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4);
disconnect_and_exit(1);
}
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 1));
disconnect_and_exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
replication_slot = strdup(PQgetvalue(res, 0, 0));
PQclear(res);
}
if (!do_start_slot) if (!do_start_slot)
disconnect_and_exit(0); disconnect_and_exit(0);
/* Stream loop */
while (true) while (true)
{ {
StreamLog(); StreamLogicalLog();
if (time_to_abort) if (time_to_abort)
{ {
/* /*
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "receivelog.h" #include "receivelog.h"
#include "streamutil.h" #include "streamutil.h"
#include "pqexpbuffer.h"
#include "common/fe_memutils.h" #include "common/fe_memutils.h"
#include "datatype/timestamp.h" #include "datatype/timestamp.h"
...@@ -227,11 +228,183 @@ GetConnection(void) ...@@ -227,11 +228,183 @@ GetConnection(void)
return tmpconn; return tmpconn;
} }
/*
* Run IDENTIFY_SYSTEM through a given connection and give back to caller
* some result information if requested:
* - Start LSN position
* - Current timeline ID
* - System identifier
* - Plugin name
*/
bool
RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
XLogRecPtr *startpos, char **db_name)
{
PGresult *res;
uint32 hi, lo;
/* Check connection existence */
Assert(conn != NULL);
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
return false;
}
if (PQntuples(res) != 1 || PQnfields(res) < 3)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
return false;
}
/* Get system identifier */
if (sysid != NULL)
*sysid = pg_strdup(PQgetvalue(res, 0, 0));
/* Get timeline ID to start streaming from */
if (starttli != NULL)
*starttli = atoi(PQgetvalue(res, 0, 1));
/* Get LSN start position if necessary */
if (startpos != NULL)
{
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 2));
return false;
}
*startpos = ((uint64) hi) << 32 | lo;
}
/* Get database name, only available in 9.4 and newer versions */
if (db_name != NULL)
{
if (PQnfields(res) < 4)
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 4);
if (PQgetisnull(res, 0, 3))
*db_name = NULL;
else
*db_name = pg_strdup(PQgetvalue(res, 0, 3));
}
PQclear(res);
return true;
}
/*
* Create a replication slot for the given connection. This function
* returns true in case of success as well as the start position
* obtained after the slot creation.
*/
bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
XLogRecPtr *startpos, bool is_physical)
{
PQExpBuffer query;
PGresult *res;
query = createPQExpBuffer();
Assert((is_physical && plugin == NULL) ||
(!is_physical && plugin != NULL));
Assert(slot_name != NULL);
/* Build query */
if (is_physical)
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
slot_name);
else
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
slot_name, plugin);
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query->data, PQerrorMessage(conn));
return false;
}
if (PQntuples(res) != 1 || PQnfields(res) != 4)
{
fprintf(stderr,
_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, slot_name,
PQntuples(res), PQnfields(res), 1, 4);
return false;
}
/* Get LSN start position if necessary */
if (startpos != NULL)
{
uint32 hi, lo;
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 1));
return false;
}
*startpos = ((uint64) hi) << 32 | lo;
}
PQclear(res);
return true;
}
/*
* Drop a replication slot for the given connection. This function
* returns true in case of success.
*/
bool
DropReplicationSlot(PGconn *conn, const char *slot_name)
{
PQExpBuffer query;
PGresult *res;
Assert(slot_name != NULL);
query = createPQExpBuffer();
/* Build query */
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
slot_name);
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query->data, PQerrorMessage(conn));
return false;
}
if (PQntuples(res) != 0 || PQnfields(res) != 0)
{
fprintf(stderr,
_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, slot_name,
PQntuples(res), PQnfields(res), 0, 0);
return false;
}
PQclear(res);
return true;
}
/* /*
* Frontend version of GetCurrentTimestamp(), since we are not linked with * Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of * backend code. The replication protocol always uses integer timestamps,
* server setting. * regardless of the server setting.
*/ */
int64 int64
feGetCurrentTimestamp(void) feGetCurrentTimestamp(void)
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include "libpq-fe.h" #include "libpq-fe.h"
#include "access/xlogdefs.h"
extern const char *progname; extern const char *progname;
extern char *connection_string; extern char *connection_string;
extern char *dbhost; extern char *dbhost;
...@@ -28,6 +30,15 @@ extern PGconn *conn; ...@@ -28,6 +30,15 @@ extern PGconn *conn;
extern PGconn *GetConnection(void); extern PGconn *GetConnection(void);
/* Replication commands */
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, XLogRecPtr *startpos,
bool is_physical);
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli,
XLogRecPtr *startpos,
char **db_name);
extern int64 feGetCurrentTimestamp(void); extern int64 feGetCurrentTimestamp(void);
extern void feTimestampDifference(int64 start_time, int64 stop_time, extern void feTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs); long *secs, int *microsecs);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment