Commit 76129e7f authored by Magnus Hagander's avatar Magnus Hagander

Include more status information in walsender results

Add the current xlog insert location to the response of
IDENTIFY_SYSTEM, and adds result sets containing start
and stop location of backups to BASE_BACKUP responses.
parent f001cb38
...@@ -1315,7 +1315,7 @@ The commands accepted in walsender mode are: ...@@ -1315,7 +1315,7 @@ The commands accepted in walsender mode are:
<listitem> <listitem>
<para> <para>
Requests the server to identify itself. Server replies with a result Requests the server to identify itself. Server replies with a result
set of a single row, containing two fields: set of a single row, containing three fields:
</para> </para>
<para> <para>
...@@ -1344,6 +1344,19 @@ The commands accepted in walsender mode are: ...@@ -1344,6 +1344,19 @@ The commands accepted in walsender mode are:
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term>
xlogpos
</term>
<listitem>
<para>
Current xlog write location. Useful to get a known location in the
transaction log where streaming can start.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</para> </para>
</listitem> </listitem>
...@@ -1520,15 +1533,16 @@ The commands accepted in walsender mode are: ...@@ -1520,15 +1533,16 @@ The commands accepted in walsender mode are:
</variablelist> </variablelist>
</para> </para>
<para> <para>
When the backup is started, the server will first send a header in When the backup is started, the server will first send two
ordinary result set format, followed by one or more CopyResponse ordinary result sets, followed by one or more CopyResponse
results, one for PGDATA and one for each additional tablespace other results.
than <literal>pg_default</> and <literal>pg_global</>. The data in </para>
the CopyResponse results will be a tar format (using ustar00 <para>
extensions) dump of the tablespace contents. The first ordinary result set contains the starting position of the
backup, given in XLogRecPtr format as a single column in a single row.
</para> </para>
<para> <para>
The header is an ordinary resultset with one row for each tablespace. The second ordinary result set has one row for each tablespace.
The fields in this row are: The fields in this row are:
<variablelist> <variablelist>
<varlistentry> <varlistentry>
...@@ -1560,6 +1574,15 @@ The commands accepted in walsender mode are: ...@@ -1560,6 +1574,15 @@ The commands accepted in walsender mode are:
</varlistentry> </varlistentry>
</variablelist> </variablelist>
</para> </para>
<para>
After the second regular result set, one or more CopyResponse results
will be sent, one for PGDATA and one for each additional tablespace other
than <literal>pg_default</> and <literal>pg_global</>. The data in
the CopyResponse results will be a tar format (using ustar00
extensions) dump of the tablespace contents. After the tar data is
complete, a final ordinary result set will be sent.
</para>
<para> <para>
The tar archive for the data directory and each tablespace will contain The tar archive for the data directory and each tablespace will contain
all files in the directories, regardless of whether they are all files in the directories, regardless of whether they are
...@@ -1583,6 +1606,11 @@ The commands accepted in walsender mode are: ...@@ -1583,6 +1606,11 @@ The commands accepted in walsender mode are:
Owner, group and file mode are set if the underlying filesystem on Owner, group and file mode are set if the underlying filesystem on
the server supports it. the server supports it.
</para> </para>
<para>
Once all tablespaces have been sent, a final regular result set will
be sent. This result set contains the end position of the
backup, given in XLogRecPtr format as a single column in a single row.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
......
...@@ -52,6 +52,7 @@ static void SendBackupHeader(List *tablespaces); ...@@ -52,6 +52,7 @@ static void SendBackupHeader(List *tablespaces);
static void base_backup_cleanup(int code, Datum arg); static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr);
/* /*
* Size of each block sent into the tar stream for larger files. * Size of each block sent into the tar stream for larger files.
...@@ -92,6 +93,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -92,6 +93,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
char *labelfile; char *labelfile;
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
SendXlogRecPtrResult(startptr);
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
{ {
...@@ -239,6 +241,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -239,6 +241,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Send CopyDone message for the last tar file */ /* Send CopyDone message for the last tar file */
pq_putemptymessage('c'); pq_putemptymessage('c');
} }
SendXlogRecPtrResult(endptr);
} }
/* /*
...@@ -431,6 +434,42 @@ SendBackupHeader(List *tablespaces) ...@@ -431,6 +434,42 @@ SendBackupHeader(List *tablespaces)
pq_puttextmessage('C', "SELECT"); pq_puttextmessage('C', "SELECT");
} }
/*
* Send a single resultset containing just a single
* XlogRecPtr record (in text format)
*/
static void
SendXlogRecPtrResult(XLogRecPtr ptr)
{
StringInfoData buf;
char str[MAXFNAMELEN];
snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff);
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint(&buf, 1, 2); /* 1 field */
/* Field header */
pq_sendstring(&buf, "recptr");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 1, 2); /* number of columns */
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
pq_endmessage(&buf);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
}
/* /*
* Inject a file with given name and content in the output tar stream. * Inject a file with given name and content in the output tar stream.
*/ */
......
...@@ -258,19 +258,26 @@ IdentifySystem(void) ...@@ -258,19 +258,26 @@ IdentifySystem(void)
StringInfoData buf; StringInfoData buf;
char sysid[32]; char sysid[32];
char tli[11]; char tli[11];
char xpos[MAXFNAMELEN];
XLogRecPtr logptr;
/* /*
* Reply with a result set with one row, two columns. First col is system * Reply with a result set with one row, three columns. First col is system
* ID, and second is timeline ID * ID, second is timeline ID, and third is current xlog location.
*/ */
snprintf(sysid, sizeof(sysid), UINT64_FORMAT, snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier()); GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
logptr = GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
/* Send a RowDescription message */ /* Send a RowDescription message */
pq_beginmessage(&buf, 'T'); pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 2, 2); /* 2 fields */ pq_sendint(&buf, 3, 2); /* 3 fields */
/* first field */ /* first field */
pq_sendstring(&buf, "systemid"); /* col name */ pq_sendstring(&buf, "systemid"); /* col name */
...@@ -289,15 +296,27 @@ IdentifySystem(void) ...@@ -289,15 +296,27 @@ IdentifySystem(void)
pq_sendint(&buf, 4, 2); /* typlen */ pq_sendint(&buf, 4, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */ pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */ pq_sendint(&buf, 0, 2); /* format code */
/* third field */
pq_sendstring(&buf, "xlogpos");
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_sendint(&buf, TEXTOID, 4);
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf); pq_endmessage(&buf);
/* Send a DataRow message */ /* Send a DataRow message */
pq_beginmessage(&buf, 'D'); pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 2, 2); /* # of columns */ pq_sendint(&buf, 3, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli)); pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
pq_endmessage(&buf); pq_endmessage(&buf);
/* Send CommandComplete and ReadyForQuery messages */ /* Send CommandComplete and ReadyForQuery messages */
......
...@@ -742,15 +742,40 @@ static void ...@@ -742,15 +742,40 @@ static void
BaseBackup() BaseBackup()
{ {
PGresult *res; PGresult *res;
uint32 timeline;
char current_path[MAXPGPATH]; char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH]; char escaped_label[MAXPGPATH];
int i; int i;
char xlogstart[64];
char xlogend[64];
/* /*
* Connect in replication mode to the server * Connect in replication mode to the server
*/ */
conn = GetConnection(); conn = GetConnection();
/*
* Run IDENFITY_SYSTEM so we can get the timeline
*/
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not identify system: %s\n"),
progname, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1)
{
fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
progname, PQntuples(res));
disconnect_and_exit(1);
}
timeline = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
/*
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s", snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s",
escaped_label, escaped_label,
...@@ -766,7 +791,7 @@ BaseBackup() ...@@ -766,7 +791,7 @@ BaseBackup()
} }
/* /*
* Get the header * Get the starting xlog position
*/ */
res = PQgetResult(conn); res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
...@@ -775,6 +800,28 @@ BaseBackup() ...@@ -775,6 +800,28 @@ BaseBackup()
progname, PQerrorMessage(conn)); progname, PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1)
{
fprintf(stderr, _("%s: no start point returned from server.\n"),
progname);
disconnect_and_exit(1);
}
strcpy(xlogstart, PQgetvalue(res, 0, 0));
if (verbose && includewal)
fprintf(stderr, "xlog start point: %s\n", xlogstart);
PQclear(res);
MemSet(xlogend, 0, sizeof(xlogend));
/*
* Get the header
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not get backup header: %s\n"),
progname, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) < 1) if (PQntuples(res) < 1)
{ {
fprintf(stderr, _("%s: no data returned from server.\n"), progname); fprintf(stderr, _("%s: no data returned from server.\n"), progname);
...@@ -828,6 +875,27 @@ BaseBackup() ...@@ -828,6 +875,27 @@ BaseBackup()
} }
PQclear(res); PQclear(res);
/*
* Get the stop position
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not get end xlog position from server.\n"),
progname);
disconnect_and_exit(1);
}
if (PQntuples(res) != 1)
{
fprintf(stderr, _("%s: no end point returned from server.\n"),
progname);
disconnect_and_exit(1);
}
strcpy(xlogend, PQgetvalue(res, 0, 0));
if (verbose && includewal)
fprintf(stderr, "xlog end point: %s\n", xlogend);
PQclear(res);
res = PQgetResult(conn); res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment