Commit 431ba7be authored by Robert Haas's avatar Robert Haas

pg_basebackup: Refactor code for reading COPY and tar data.

Add a new function ReceiveCopyData that does just that, taking a
callback as an argument to specify what should be done with each chunk
as it is received. This allows a single copy of the logic to be shared
between ReceiveTarFile and ReceiveAndUnpackTarFile, and eliminates
a few #ifdef conditions based on HAVE_LIBZ.

While this is slightly more code, it's arguably clearer, and
there is a pending patch that introduces additional calls to
ReceiveCopyData.

This commit is not intended to result in any functional change.

Discussion: http://postgr.es/m/CA+TgmoYZDTHbSpwZtW=JDgAhwVAYvmdSrRUjOd+AYdfNNXVBDg@mail.gmail.com
parent 42f36296
...@@ -57,6 +57,40 @@ typedef struct TablespaceList ...@@ -57,6 +57,40 @@ typedef struct TablespaceList
TablespaceListCell *tail; TablespaceListCell *tail;
} TablespaceList; } TablespaceList;
typedef struct WriteTarState
{
int tablespacenum;
char filename[MAXPGPATH];
FILE *tarfile;
char tarhdr[512];
bool basetablespace;
bool in_tarhdr;
bool skip_file;
bool is_recovery_guc_supported;
bool is_postgresql_auto_conf;
bool found_postgresql_auto_conf;
int file_padding_len;
size_t tarhdrsz;
pgoff_t filesz;
#ifdef HAVE_LIBZ
gzFile ztarfile;
#endif
} WriteTarState;
typedef struct UnpackTarState
{
int tablespacenum;
char current_path[MAXPGPATH];
char filename[MAXPGPATH];
const char *mapped_tblspc_path;
pgoff_t current_len_left;
int current_padding;
FILE *file;
} UnpackTarState;
typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
void *callback_data);
/* /*
* pg_xlog has been renamed to pg_wal in version 10. This version number * pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion(). * should be compared with PQserverVersion().
...@@ -142,7 +176,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo ...@@ -142,7 +176,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
static void progress_report(int tablespacenum, const char *filename, bool force); static void progress_report(int tablespacenum, const char *filename, bool force);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
void *callback_data);
static void BaseBackup(void); static void BaseBackup(void);
static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
...@@ -873,43 +910,79 @@ parse_max_rate(char *src) ...@@ -873,43 +910,79 @@ parse_max_rate(char *src)
return (int32) result; return (int32) result;
} }
/*
* Read a stream of COPY data and invoke the provided callback for each
* chunk.
*/
static void
ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
void *callback_data)
{
PGresult *res;
/* Get the COPY data stream. */
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
pg_log_error("could not get COPY data stream: %s",
PQerrorMessage(conn));
exit(1);
}
PQclear(res);
/* Loop over chunks until done. */
while (1)
{
int r;
char *copybuf;
r = PQgetCopyData(conn, &copybuf, 0);
if (r == -1)
{
/* End of chunk. */
break;
}
else if (r == -2)
{
pg_log_error("could not read COPY data: %s",
PQerrorMessage(conn));
exit(1);
}
(*callback) (r, copybuf, callback_data);
PQfreemem(copybuf);
}
}
/* /*
* Write a piece of tar data * Write a piece of tar data
*/ */
static void static void
writeTarData( writeTarData(WriteTarState *state, char *buf, int r)
#ifdef HAVE_LIBZ
gzFile ztarfile,
#endif
FILE *tarfile, char *buf, int r, char *current_file)
{ {
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (ztarfile != NULL) if (state->ztarfile != NULL)
{ {
if (gzwrite(ztarfile, buf, r) != r) if (gzwrite(state->ztarfile, buf, r) != r)
{ {
pg_log_error("could not write to compressed file \"%s\": %s", pg_log_error("could not write to compressed file \"%s\": %s",
current_file, get_gz_error(ztarfile)); state->filename, get_gz_error(state->ztarfile));
exit(1); exit(1);
} }
} }
else else
#endif #endif
{ {
if (fwrite(buf, r, 1, tarfile) != 1) if (fwrite(buf, r, 1, state->tarfile) != 1)
{ {
pg_log_error("could not write to file \"%s\": %m", current_file); pg_log_error("could not write to file \"%s\": %m",
state->filename);
exit(1); exit(1);
} }
} }
} }
#ifdef HAVE_LIBZ
#define WRITE_TAR_DATA(buf, sz) writeTarData(ztarfile, tarfile, buf, sz, filename)
#else
#define WRITE_TAR_DATA(buf, sz) writeTarData(tarfile, buf, sz, filename)
#endif
/* /*
* Receive a tar format file from the connection to the server, and write * Receive a tar format file from the connection to the server, and write
* the data from this file directly into a tar file. If compression is * the data from this file directly into a tar file. If compression is
...@@ -923,29 +996,19 @@ writeTarData( ...@@ -923,29 +996,19 @@ writeTarData(
static void static void
ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
char filename[MAXPGPATH]; char zerobuf[1024];
char *copybuf = NULL; WriteTarState state;
FILE *tarfile = NULL;
char tarhdr[512];
bool basetablespace = PQgetisnull(res, rownum, 0);
bool in_tarhdr = true;
bool skip_file = false;
bool is_recovery_guc_supported = true;
bool is_postgresql_auto_conf = false;
bool found_postgresql_auto_conf = false;
int file_padding_len = 0;
size_t tarhdrsz = 0;
pgoff_t filesz = 0;
#ifdef HAVE_LIBZ memset(&state, 0, sizeof(state));
gzFile ztarfile = NULL; state.tablespacenum = rownum;
#endif state.basetablespace = PQgetisnull(res, rownum, 0);
state.in_tarhdr = true;
/* recovery.conf is integrated into postgresql.conf in 12 and newer */ /* recovery.conf is integrated into postgresql.conf in 12 and newer */
if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_RECOVERY_GUC) if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
is_recovery_guc_supported = false; state.is_recovery_guc_supported = true;
if (basetablespace) if (state.basetablespace)
{ {
/* /*
* Base tablespaces * Base tablespaces
...@@ -959,40 +1022,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -959,40 +1022,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (compresslevel != 0) if (compresslevel != 0)
{ {
ztarfile = gzdopen(dup(fileno(stdout)), "wb"); state.ztarfile = gzdopen(dup(fileno(stdout)), "wb");
if (gzsetparams(ztarfile, compresslevel, if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK) Z_DEFAULT_STRATEGY) != Z_OK)
{ {
pg_log_error("could not set compression level %d: %s", pg_log_error("could not set compression level %d: %s",
compresslevel, get_gz_error(ztarfile)); compresslevel, get_gz_error(state.ztarfile));
exit(1); exit(1);
} }
} }
else else
#endif #endif
tarfile = stdout; state.tarfile = stdout;
strcpy(filename, "-"); strcpy(state.filename, "-");
} }
else else
{ {
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (compresslevel != 0) if (compresslevel != 0)
{ {
snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); snprintf(state.filename, sizeof(state.filename),
ztarfile = gzopen(filename, "wb"); "%s/base.tar.gz", basedir);
if (gzsetparams(ztarfile, compresslevel, state.ztarfile = gzopen(state.filename, "wb");
if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK) Z_DEFAULT_STRATEGY) != Z_OK)
{ {
pg_log_error("could not set compression level %d: %s", pg_log_error("could not set compression level %d: %s",
compresslevel, get_gz_error(ztarfile)); compresslevel, get_gz_error(state.ztarfile));
exit(1); exit(1);
} }
} }
else else
#endif #endif
{ {
snprintf(filename, sizeof(filename), "%s/base.tar", basedir); snprintf(state.filename, sizeof(state.filename),
tarfile = fopen(filename, "wb"); "%s/base.tar", basedir);
state.tarfile = fopen(state.filename, "wb");
} }
} }
} }
...@@ -1004,34 +1069,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1004,34 +1069,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (compresslevel != 0) if (compresslevel != 0)
{ {
snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, snprintf(state.filename, sizeof(state.filename),
PQgetvalue(res, rownum, 0)); "%s/%s.tar.gz",
ztarfile = gzopen(filename, "wb"); basedir, PQgetvalue(res, rownum, 0));
if (gzsetparams(ztarfile, compresslevel, state.ztarfile = gzopen(state.filename, "wb");
if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK) Z_DEFAULT_STRATEGY) != Z_OK)
{ {
pg_log_error("could not set compression level %d: %s", pg_log_error("could not set compression level %d: %s",
compresslevel, get_gz_error(ztarfile)); compresslevel, get_gz_error(state.ztarfile));
exit(1); exit(1);
} }
} }
else else
#endif #endif
{ {
snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
PQgetvalue(res, rownum, 0)); basedir, PQgetvalue(res, rownum, 0));
tarfile = fopen(filename, "wb"); state.tarfile = fopen(state.filename, "wb");
} }
} }
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (compresslevel != 0) if (compresslevel != 0)
{ {
if (!ztarfile) if (!state.ztarfile)
{ {
/* Compression is in use */ /* Compression is in use */
pg_log_error("could not create compressed file \"%s\": %s", pg_log_error("could not create compressed file \"%s\": %s",
filename, get_gz_error(ztarfile)); state.filename, get_gz_error(state.ztarfile));
exit(1); exit(1);
} }
} }
...@@ -1039,66 +1105,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1039,66 +1105,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#endif #endif
{ {
/* Either no zlib support, or zlib support but compresslevel = 0 */ /* Either no zlib support, or zlib support but compresslevel = 0 */
if (!tarfile) if (!state.tarfile)
{ {
pg_log_error("could not create file \"%s\": %m", filename); pg_log_error("could not create file \"%s\": %m", state.filename);
exit(1); exit(1);
} }
} }
/* ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
* Get the COPY data stream
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
pg_log_error("could not get COPY data stream: %s",
PQerrorMessage(conn));
exit(1);
}
while (1)
{
int r;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
r = PQgetCopyData(conn, &copybuf, 0);
if (r == -1)
{
/* /*
* End of chunk. If requested, and this is the base tablespace, * End of copy data. If requested, and this is the base tablespace, write
* write configuration file into the tarfile. When done, close the * configuration file into the tarfile. When done, close the file (but not
* file (but not stdout). * stdout).
* *
* Also, write two completely empty blocks at the end of the tar * Also, write two completely empty blocks at the end of the tar file, as
* file, as required by some tar programs. * required by some tar programs.
*/ */
char zerobuf[1024];
MemSet(zerobuf, 0, sizeof(zerobuf)); MemSet(zerobuf, 0, sizeof(zerobuf));
if (basetablespace && writerecoveryconf) if (state.basetablespace && writerecoveryconf)
{ {
char header[512]; char header[512];
/* /*
* If postgresql.auto.conf has not been found in the streamed * If postgresql.auto.conf has not been found in the streamed data,
* data, add recovery configuration to postgresql.auto.conf if * add recovery configuration to postgresql.auto.conf if recovery
* recovery parameters are GUCs. If the instance connected to * parameters are GUCs. If the instance connected to is older than
* is older than 12, create recovery.conf with this data * 12, create recovery.conf with this data otherwise.
* otherwise.
*/ */
if (!found_postgresql_auto_conf || !is_recovery_guc_supported) if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
{ {
int padding; int padding;
tarCreateHeader(header, tarCreateHeader(header,
is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf", state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
NULL, NULL,
recoveryconfcontents->len, recoveryconfcontents->len,
pg_file_create_mode, 04000, 02000, pg_file_create_mode, 04000, 02000,
...@@ -1106,39 +1148,38 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1106,39 +1148,38 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len; padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
WRITE_TAR_DATA(header, sizeof(header)); writeTarData(&state, header, sizeof(header));
WRITE_TAR_DATA(recoveryconfcontents->data, writeTarData(&state, recoveryconfcontents->data,
recoveryconfcontents->len); recoveryconfcontents->len);
if (padding) if (padding)
WRITE_TAR_DATA(zerobuf, padding); writeTarData(&state, zerobuf, padding);
} }
/* /*
* standby.signal is supported only if recovery parameters are * standby.signal is supported only if recovery parameters are GUCs.
* GUCs.
*/ */
if (is_recovery_guc_supported) if (state.is_recovery_guc_supported)
{ {
tarCreateHeader(header, "standby.signal", NULL, tarCreateHeader(header, "standby.signal", NULL,
0, /* zero-length file */ 0, /* zero-length file */
pg_file_create_mode, 04000, 02000, pg_file_create_mode, 04000, 02000,
time(NULL)); time(NULL));
WRITE_TAR_DATA(header, sizeof(header)); writeTarData(&state, header, sizeof(header));
WRITE_TAR_DATA(zerobuf, 511); writeTarData(&state, zerobuf, 511);
} }
} }
/* 2 * 512 bytes empty data at end of file */ /* 2 * 512 bytes empty data at end of file */
WRITE_TAR_DATA(zerobuf, sizeof(zerobuf)); writeTarData(&state, zerobuf, sizeof(zerobuf));
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (ztarfile != NULL) if (state.ztarfile != NULL)
{ {
if (gzclose(ztarfile) != 0) if (gzclose(state.ztarfile) != 0)
{ {
pg_log_error("could not close compressed file \"%s\": %s", pg_log_error("could not close compressed file \"%s\": %s",
filename, get_gz_error(ztarfile)); state.filename, get_gz_error(state.ztarfile));
exit(1); exit(1);
} }
} }
...@@ -1147,128 +1188,140 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1147,128 +1188,140 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
if (strcmp(basedir, "-") != 0) if (strcmp(basedir, "-") != 0)
{ {
if (fclose(tarfile) != 0) if (fclose(state.tarfile) != 0)
{ {
pg_log_error("could not close file \"%s\": %m", pg_log_error("could not close file \"%s\": %m",
filename); state.filename);
exit(1); exit(1);
} }
} }
} }
break; progress_report(rownum, state.filename, true);
}
else if (r == -2)
{
pg_log_error("could not read COPY data: %s",
PQerrorMessage(conn));
exit(1);
}
if (!writerecoveryconf || !basetablespace) /*
* Do not sync the resulting tar file yet, all files are synced once at
* the end.
*/
}
/*
* Receive one chunk of tar-format data from the server.
*/
static void
ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
{
WriteTarState *state = callback_data;
if (!writerecoveryconf || !state->basetablespace)
{ {
/* /*
* When not writing config file, or when not working on the base * When not writing config file, or when not working on the base
* tablespace, we never have to look for an existing configuration * tablespace, we never have to look for an existing configuration
* file in the stream. * file in the stream.
*/ */
WRITE_TAR_DATA(copybuf, r); writeTarData(state, copybuf, r);
} }
else else
{ {
/* /*
* Look for a config file in the existing tar stream. If it's * Look for a config file in the existing tar stream. If it's there,
* there, we must skip it so we can later overwrite it with our * we must skip it so we can later overwrite it with our own version
* own version of the file. * of the file.
* *
* To do this, we have to process the individual files inside the * To do this, we have to process the individual files inside the TAR
* TAR stream. The stream consists of a header and zero or more * stream. The stream consists of a header and zero or more chunks,
* chunks, all 512 bytes long. The stream from the server is * all 512 bytes long. The stream from the server is broken up into
* broken up into smaller pieces, so we have to track the size of * smaller pieces, so we have to track the size of the files to find
* the files to find the next header structure. * the next header structure.
*/ */
int rr = r; int rr = r;
int pos = 0; int pos = 0;
while (rr > 0) while (rr > 0)
{ {
if (in_tarhdr) if (state->in_tarhdr)
{ {
/* /*
* We're currently reading a header structure inside the * We're currently reading a header structure inside the TAR
* TAR stream, i.e. the file metadata. * stream, i.e. the file metadata.
*/ */
if (tarhdrsz < 512) if (state->tarhdrsz < 512)
{ {
/* /*
* Copy the header structure into tarhdr in case the * Copy the header structure into tarhdr in case the
* header is not aligned to 512 bytes or it's not * header is not aligned to 512 bytes or it's not returned
* returned in whole by the last PQgetCopyData call. * in whole by the last PQgetCopyData call.
*/ */
int hdrleft; int hdrleft;
int bytes2copy; int bytes2copy;
hdrleft = 512 - tarhdrsz; hdrleft = 512 - state->tarhdrsz;
bytes2copy = (rr > hdrleft ? hdrleft : rr); bytes2copy = (rr > hdrleft ? hdrleft : rr);
memcpy(&tarhdr[tarhdrsz], copybuf + pos, bytes2copy); memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
bytes2copy);
rr -= bytes2copy; rr -= bytes2copy;
pos += bytes2copy; pos += bytes2copy;
tarhdrsz += bytes2copy; state->tarhdrsz += bytes2copy;
} }
else else
{ {
/* /*
* We have the complete header structure in tarhdr, * We have the complete header structure in tarhdr, look
* look at the file metadata: we may want append * at the file metadata: we may want append recovery info
* recovery info into postgresql.auto.conf and skip * into postgresql.auto.conf and skip standby.signal file
* standby.signal file if recovery parameters are * if recovery parameters are integrated as GUCs, and
* integrated as GUCs, and recovery.conf otherwise. In * recovery.conf otherwise. In both cases we must
* both cases we must calculate tar padding. * calculate tar padding.
*/ */
if (is_recovery_guc_supported) if (state->is_recovery_guc_supported)
{ {
skip_file = (strcmp(&tarhdr[0], "standby.signal") == 0); state->skip_file =
is_postgresql_auto_conf = (strcmp(&tarhdr[0], "postgresql.auto.conf") == 0); (strcmp(&state->tarhdr[0], "standby.signal") == 0);
state->is_postgresql_auto_conf =
(strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
} }
else else
skip_file = (strcmp(&tarhdr[0], "recovery.conf") == 0); state->skip_file =
(strcmp(&state->tarhdr[0], "recovery.conf") == 0);
filesz = read_tar_number(&tarhdr[124], 12); state->filesz = read_tar_number(&state->tarhdr[124], 12);
file_padding_len = ((filesz + 511) & ~511) - filesz; state->file_padding_len =
((state->filesz + 511) & ~511) - state->filesz;
if (is_recovery_guc_supported && if (state->is_recovery_guc_supported &&
is_postgresql_auto_conf && state->is_postgresql_auto_conf &&
writerecoveryconf) writerecoveryconf)
{ {
/* replace tar header */ /* replace tar header */
char header[512]; char header[512];
tarCreateHeader(header, "postgresql.auto.conf", NULL, tarCreateHeader(header, "postgresql.auto.conf", NULL,
filesz + recoveryconfcontents->len, state->filesz + recoveryconfcontents->len,
pg_file_create_mode, 04000, 02000, pg_file_create_mode, 04000, 02000,
time(NULL)); time(NULL));
WRITE_TAR_DATA(header, sizeof(header)); writeTarData(state, header, sizeof(header));
} }
else else
{ {
/* copy stream with padding */ /* copy stream with padding */
filesz += file_padding_len; state->filesz += state->file_padding_len;
if (!skip_file) if (!state->skip_file)
{ {
/* /*
* If we're not skipping the file, write the * If we're not skipping the file, write the tar
* tar header unmodified. * header unmodified.
*/ */
WRITE_TAR_DATA(tarhdr, 512); writeTarData(state, state->tarhdr, 512);
} }
} }
/* Next part is the file, not the header */ /* Next part is the file, not the header */
in_tarhdr = false; state->in_tarhdr = false;
} }
} }
else else
...@@ -1276,49 +1329,50 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1276,49 +1329,50 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
/* /*
* We're processing a file's contents. * We're processing a file's contents.
*/ */
if (filesz > 0) if (state->filesz > 0)
{ {
/* /*
* We still have data to read (and possibly write). * We still have data to read (and possibly write).
*/ */
int bytes2write; int bytes2write;
bytes2write = (filesz > rr ? rr : filesz); bytes2write = (state->filesz > rr ? rr : state->filesz);
if (!skip_file) if (!state->skip_file)
WRITE_TAR_DATA(copybuf + pos, bytes2write); writeTarData(state, copybuf + pos, bytes2write);
rr -= bytes2write; rr -= bytes2write;
pos += bytes2write; pos += bytes2write;
filesz -= bytes2write; state->filesz -= bytes2write;
} }
else if (is_recovery_guc_supported && else if (state->is_recovery_guc_supported &&
is_postgresql_auto_conf && state->is_postgresql_auto_conf &&
writerecoveryconf) writerecoveryconf)
{ {
/* append recovery config to postgresql.auto.conf */ /* append recovery config to postgresql.auto.conf */
int padding; int padding;
int tailsize; int tailsize;
tailsize = (512 - file_padding_len) + recoveryconfcontents->len; tailsize = (512 - state->file_padding_len) + recoveryconfcontents->len;
padding = ((tailsize + 511) & ~511) - tailsize; padding = ((tailsize + 511) & ~511) - tailsize;
WRITE_TAR_DATA(recoveryconfcontents->data, recoveryconfcontents->len); writeTarData(state, recoveryconfcontents->data,
recoveryconfcontents->len);
if (padding) if (padding)
{ {
char zerobuf[512]; char zerobuf[512];
MemSet(zerobuf, 0, sizeof(zerobuf)); MemSet(zerobuf, 0, sizeof(zerobuf));
WRITE_TAR_DATA(zerobuf, padding); writeTarData(state, zerobuf, padding);
} }
/* skip original file padding */ /* skip original file padding */
is_postgresql_auto_conf = false; state->is_postgresql_auto_conf = false;
skip_file = true; state->skip_file = true;
filesz += file_padding_len; state->filesz += state->file_padding_len;
found_postgresql_auto_conf = true; state->found_postgresql_auto_conf = true;
} }
else else
{ {
...@@ -1326,27 +1380,17 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1326,27 +1380,17 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
* No more data in the current file, the next piece of * No more data in the current file, the next piece of
* data (if any) will be a new file header structure. * data (if any) will be a new file header structure.
*/ */
in_tarhdr = true; state->in_tarhdr = true;
skip_file = false; state->skip_file = false;
is_postgresql_auto_conf = false; state->is_postgresql_auto_conf = false;
tarhdrsz = 0; state->tarhdrsz = 0;
filesz = 0; state->filesz = 0;
} }
} }
} }
} }
totaldone += r; totaldone += r;
progress_report(rownum, filename, false); progress_report(state->tablespacenum, state->filename, false);
} /* while (1) */
progress_report(rownum, filename, true);
if (copybuf != NULL)
PQfreemem(copybuf);
/*
* Do not sync the resulting tar file yet, all files are synced once at
* the end.
*/
} }
...@@ -1384,64 +1428,49 @@ get_tablespace_mapping(const char *dir) ...@@ -1384,64 +1428,49 @@ get_tablespace_mapping(const char *dir)
static void static void
ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
char current_path[MAXPGPATH]; UnpackTarState state;
char filename[MAXPGPATH];
const char *mapped_tblspc_path;
pgoff_t current_len_left = 0;
int current_padding = 0;
bool basetablespace; bool basetablespace;
char *copybuf = NULL;
FILE *file = NULL; memset(&state, 0, sizeof(state));
state.tablespacenum = rownum;
basetablespace = PQgetisnull(res, rownum, 0); basetablespace = PQgetisnull(res, rownum, 0);
if (basetablespace) if (basetablespace)
strlcpy(current_path, basedir, sizeof(current_path)); strlcpy(state.current_path, basedir, sizeof(state.current_path));
else else
strlcpy(current_path, strlcpy(state.current_path,
get_tablespace_mapping(PQgetvalue(res, rownum, 1)), get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
sizeof(current_path)); sizeof(state.current_path));
/* ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
* Get the COPY data
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
pg_log_error("could not get COPY data stream: %s",
PQerrorMessage(conn));
exit(1);
}
while (1)
{
int r;
if (copybuf != NULL) if (state.file)
fclose(state.file);
progress_report(rownum, state.filename, true);
if (state.file != NULL)
{ {
PQfreemem(copybuf); pg_log_error("COPY stream ended before last file was finished");
copybuf = NULL; exit(1);
} }
r = PQgetCopyData(conn, &copybuf, 0); if (basetablespace && writerecoveryconf)
WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
if (r == -1)
{
/* /*
* End of chunk * No data is synced here, everything is done for all tablespaces at the
* end.
*/ */
if (file) }
fclose(file);
break; static void
} ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
else if (r == -2) {
{ UnpackTarState *state = callback_data;
pg_log_error("could not read COPY data: %s",
PQerrorMessage(conn));
exit(1);
}
if (file == NULL) if (state->file == NULL)
{ {
#ifndef WIN32 #ifndef WIN32
int filemode; int filemode;
...@@ -1452,12 +1481,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1452,12 +1481,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/ */
if (r != 512) if (r != 512)
{ {
pg_log_error("invalid tar block header size: %d", r); pg_log_error("invalid tar block header size: %zu", r);
exit(1); exit(1);
} }
totaldone += 512; totaldone += 512;
current_len_left = read_tar_number(&copybuf[124], 12); state->current_len_left = read_tar_number(&copybuf[124], 12);
#ifndef WIN32 #ifndef WIN32
/* Set permissions on the file */ /* Set permissions on the file */
...@@ -1467,15 +1496,15 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1467,15 +1496,15 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/* /*
* All files are padded up to 512 bytes * All files are padded up to 512 bytes
*/ */
current_padding = state->current_padding =
((current_len_left + 511) & ~511) - current_len_left; ((state->current_len_left + 511) & ~511) - state->current_len_left;
/* /*
* First part of header is zero terminated filename * First part of header is zero terminated filename
*/ */
snprintf(filename, sizeof(filename), "%s/%s", current_path, snprintf(state->filename, sizeof(state->filename),
copybuf); "%s/%s", state->current_path, copybuf);
if (filename[strlen(filename) - 1] == '/') if (state->filename[strlen(state->filename) - 1] == '/')
{ {
/* /*
* Ends in a slash means directory or symlink to directory * Ends in a slash means directory or symlink to directory
...@@ -1483,34 +1512,34 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1483,34 +1512,34 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
if (copybuf[156] == '5') if (copybuf[156] == '5')
{ {
/* /*
* Directory * Directory. Remove trailing slash first.
*/ */
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ state->filename[strlen(state->filename) - 1] = '\0';
if (mkdir(filename, pg_dir_create_mode) != 0) if (mkdir(state->filename, pg_dir_create_mode) != 0)
{ {
/* /*
* When streaming WAL, pg_wal (or pg_xlog for pre-9.6 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
* clusters) will have been created by the wal * clusters) will have been created by the wal receiver
* receiver process. Also, when the WAL directory * process. Also, when the WAL directory location was
* location was specified, pg_wal (or pg_xlog) has * specified, pg_wal (or pg_xlog) has already been created
* already been created as a symbolic link before * as a symbolic link before starting the actual backup.
* starting the actual backup. So just ignore creation * So just ignore creation failures on related
* failures on related directories. * directories.
*/ */
if (!((pg_str_endswith(filename, "/pg_wal") || if (!((pg_str_endswith(state->filename, "/pg_wal") ||
pg_str_endswith(filename, "/pg_xlog") || pg_str_endswith(state->filename, "/pg_xlog") ||
pg_str_endswith(filename, "/archive_status")) && pg_str_endswith(state->filename, "/archive_status")) &&
errno == EEXIST)) errno == EEXIST))
{ {
pg_log_error("could not create directory \"%s\": %m", pg_log_error("could not create directory \"%s\": %m",
filename); state->filename);
exit(1); exit(1);
} }
} }
#ifndef WIN32 #ifndef WIN32
if (chmod(filename, (mode_t) filemode)) if (chmod(state->filename, (mode_t) filemode))
pg_log_error("could not set permissions on directory \"%s\": %m", pg_log_error("could not set permissions on directory \"%s\": %m",
filename); state->filename);
#endif #endif
} }
else if (copybuf[156] == '2') else if (copybuf[156] == '2')
...@@ -1521,19 +1550,20 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1521,19 +1550,20 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* It's most likely a link in pg_tblspc directory, to the * It's most likely a link in pg_tblspc directory, to the
* location of a tablespace. Apply any tablespace mapping * location of a tablespace. Apply any tablespace mapping
* given on the command line (--tablespace-mapping). (We * given on the command line (--tablespace-mapping). (We
* blindly apply the mapping without checking that the * blindly apply the mapping without checking that the link
* link really is inside pg_tblspc. We don't expect there * really is inside pg_tblspc. We don't expect there to be
* to be other symlinks in a data directory, but if there * other symlinks in a data directory, but if there are, you
* are, you can call it an undocumented feature that you * can call it an undocumented feature that you can map them
* can map them too.) * too.)
*/ */
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */
mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]); state->mapped_tblspc_path =
if (symlink(mapped_tblspc_path, filename) != 0) get_tablespace_mapping(&copybuf[157]);
if (symlink(state->mapped_tblspc_path, state->filename) != 0)
{ {
pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
filename, mapped_tblspc_path); state->filename, state->mapped_tblspc_path);
exit(1); exit(1);
} }
} }
...@@ -1543,33 +1573,33 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1543,33 +1573,33 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
copybuf[156]); copybuf[156]);
exit(1); exit(1);
} }
continue; /* directory or link handled */ return; /* directory or link handled */
} }
/* /*
* regular file * regular file
*/ */
file = fopen(filename, "wb"); state->file = fopen(state->filename, "wb");
if (!file) if (!state->file)
{ {
pg_log_error("could not create file \"%s\": %m", filename); pg_log_error("could not create file \"%s\": %m", state->filename);
exit(1); exit(1);
} }
#ifndef WIN32 #ifndef WIN32
if (chmod(filename, (mode_t) filemode)) if (chmod(state->filename, (mode_t) filemode))
pg_log_error("could not set permissions on file \"%s\": %m", pg_log_error("could not set permissions on file \"%s\": %m",
filename); state->filename);
#endif #endif
if (current_len_left == 0) if (state->current_len_left == 0)
{ {
/* /*
* Done with this file, next one will be a new tar header * Done with this file, next one will be a new tar header
*/ */
fclose(file); fclose(state->file);
file = NULL; state->file = NULL;
continue; return;
} }
} /* new file */ } /* new file */
else else
...@@ -1577,61 +1607,40 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1577,61 +1607,40 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/* /*
* Continuing blocks in existing file * Continuing blocks in existing file
*/ */
if (current_len_left == 0 && r == current_padding) if (state->current_len_left == 0 && r == state->current_padding)
{ {
/* /*
* Received the padding block for this file, ignore it and * Received the padding block for this file, ignore it and close
* close the file, then move on to the next tar header. * the file, then move on to the next tar header.
*/ */
fclose(file); fclose(state->file);
file = NULL; state->file = NULL;
totaldone += r; totaldone += r;
continue; return;
} }
if (fwrite(copybuf, r, 1, file) != 1) if (fwrite(copybuf, r, 1, state->file) != 1)
{ {
pg_log_error("could not write to file \"%s\": %m", filename); pg_log_error("could not write to file \"%s\": %m", state->filename);
exit(1); exit(1);
} }
totaldone += r; totaldone += r;
progress_report(rownum, filename, false); progress_report(state->tablespacenum, state->filename, false);
current_len_left -= r; state->current_len_left -= r;
if (current_len_left == 0 && current_padding == 0) if (state->current_len_left == 0 && state->current_padding == 0)
{ {
/* /*
* Received the last block, and there is no padding to be * Received the last block, and there is no padding to be
* expected. Close the file and move on to the next tar * expected. Close the file and move on to the next tar header.
* header.
*/ */
fclose(file); fclose(state->file);
file = NULL; state->file = NULL;
continue; return;
} }
} /* continuing data in existing file */ } /* continuing data in existing file */
} /* loop over all data blocks */
progress_report(rownum, filename, true);
if (file != NULL)
{
pg_log_error("COPY stream ended before last file was finished");
exit(1);
}
if (copybuf != NULL)
PQfreemem(copybuf);
if (basetablespace && writerecoveryconf)
WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
/*
* No data is synced here, everything is done for all tablespaces at the
* end.
*/
} }
static void static void
BaseBackup(void) BaseBackup(void)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment