Commit 2fd2effc authored by Robert Haas's avatar Robert Haas

Improve server code to read files as part of a base backup.

Don't use fread(), since that doesn't necessarily set errno. We could
use read() instead, but it's even better to use pg_pread(), which
allows us to avoid some extra calls to seek to the desired location in
the file.

Also, advertise a wait event while reading from a file, as we do for
most other places where we're reading data from files.

Patch by me, reviewed by Hamid Akhtar.

Discussion: http://postgr.es/m/CA+TgmobBw-3573vMosGj06r72ajHsYeKtksT_oTxH8XvTL7DxA@mail.gmail.com
parent 453e0e3f
...@@ -1193,6 +1193,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1193,6 +1193,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</thead> </thead>
<tbody> <tbody>
<row>
<entry><literal>BaseBackupRead</literal></entry>
<entry>Waiting for base backup to read from a file.</entry>
</row>
<row> <row>
<entry><literal>BufFileRead</literal></entry> <entry><literal>BufFileRead</literal></entry>
<entry>Waiting for a read from a buffered file.</entry> <entry>Waiting for a read from a buffered file.</entry>
......
...@@ -3931,6 +3931,9 @@ pgstat_get_wait_io(WaitEventIO w) ...@@ -3931,6 +3931,9 @@ pgstat_get_wait_io(WaitEventIO w)
switch (w) switch (w)
{ {
case WAIT_EVENT_BASEBACKUP_READ:
event_name = "BaseBackupRead";
break;
case WAIT_EVENT_BUFFILE_READ: case WAIT_EVENT_BUFFILE_READ:
event_name = "BufFileRead"; event_name = "BufFileRead";
break; break;
......
...@@ -81,6 +81,8 @@ static int compareWalFileNames(const ListCell *a, const ListCell *b); ...@@ -81,6 +81,8 @@ static int compareWalFileNames(const ListCell *a, const ListCell *b);
static void throttle(size_t increment); static void throttle(size_t increment);
static void update_basebackup_progress(int64 delta); static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename); static bool is_checksummed_file(const char *fullpath, const char *filename);
static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
const char *filename, bool partial_read_ok);
/* Was the backup currently in-progress initiated in recovery mode? */ /* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false; static bool backup_started_in_recovery = false;
...@@ -98,18 +100,6 @@ static char *statrelpath = NULL; ...@@ -98,18 +100,6 @@ static char *statrelpath = NULL;
*/ */
#define THROTTLING_FREQUENCY 8 #define THROTTLING_FREQUENCY 8
/*
* Checks whether we encountered any error in fread(). fread() doesn't give
* any clue what has happened, so we check with ferror(). Also, neither
* fread() nor ferror() set errno, so we just throw a generic error.
*/
#define CHECK_FREAD_ERROR(fp, filename) \
do { \
if (ferror(fp)) \
ereport(ERROR, \
(errmsg("could not read from file \"%s\"", filename))); \
} while (0)
/* The actual number of bytes, transfer of which may cause sleep. */ /* The actual number of bytes, transfer of which may cause sleep. */
static uint64 throttling_sample; static uint64 throttling_sample;
...@@ -600,7 +590,7 @@ perform_base_backup(basebackup_options *opt) ...@@ -600,7 +590,7 @@ perform_base_backup(basebackup_options *opt)
foreach(lc, walFileList) foreach(lc, walFileList)
{ {
char *walFileName = (char *) lfirst(lc); char *walFileName = (char *) lfirst(lc);
FILE *fp; int fd;
char buf[TAR_SEND_SIZE]; char buf[TAR_SEND_SIZE];
size_t cnt; size_t cnt;
pgoff_t len = 0; pgoff_t len = 0;
...@@ -608,8 +598,8 @@ perform_base_backup(basebackup_options *opt) ...@@ -608,8 +598,8 @@ perform_base_backup(basebackup_options *opt)
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName); snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
fp = AllocateFile(pathbuf, "rb"); fd = OpenTransientFile(pathbuf, O_RDONLY | PG_BINARY);
if (fp == NULL) if (fd < 0)
{ {
int save_errno = errno; int save_errno = errno;
...@@ -626,7 +616,7 @@ perform_base_backup(basebackup_options *opt) ...@@ -626,7 +616,7 @@ perform_base_backup(basebackup_options *opt)
errmsg("could not open file \"%s\": %m", pathbuf))); errmsg("could not open file \"%s\": %m", pathbuf)));
} }
if (fstat(fileno(fp), &statbuf) != 0) if (fstat(fd, &statbuf) != 0)
ereport(ERROR, ereport(ERROR,
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", errmsg("could not stat file \"%s\": %m",
...@@ -642,9 +632,10 @@ perform_base_backup(basebackup_options *opt) ...@@ -642,9 +632,10 @@ perform_base_backup(basebackup_options *opt)
/* send the WAL file itself */ /* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf, false); _tarWriteHeader(pathbuf, NULL, &statbuf, false);
while ((cnt = fread(buf, 1, while ((cnt = basebackup_read_file(fd, buf,
Min(sizeof(buf), wal_segment_size - len), Min(sizeof(buf),
fp)) > 0) wal_segment_size - len),
len, pathbuf, true)) > 0)
{ {
CheckXLogRemoved(segno, tli); CheckXLogRemoved(segno, tli);
/* Send the chunk as a CopyData message */ /* Send the chunk as a CopyData message */
...@@ -660,8 +651,6 @@ perform_base_backup(basebackup_options *opt) ...@@ -660,8 +651,6 @@ perform_base_backup(basebackup_options *opt)
break; break;
} }
CHECK_FREAD_ERROR(fp, pathbuf);
if (len != wal_segment_size) if (len != wal_segment_size)
{ {
CheckXLogRemoved(segno, tli); CheckXLogRemoved(segno, tli);
...@@ -676,7 +665,7 @@ perform_base_backup(basebackup_options *opt) ...@@ -676,7 +665,7 @@ perform_base_backup(basebackup_options *opt)
*/ */
Assert(wal_segment_size % TAR_BLOCK_SIZE == 0); Assert(wal_segment_size % TAR_BLOCK_SIZE == 0);
FreeFile(fp); CloseTransientFile(fd);
/* /*
* Mark file as archived, otherwise files can get archived again * Mark file as archived, otherwise files can get archived again
...@@ -1575,7 +1564,7 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1575,7 +1564,7 @@ sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid, struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid) backup_manifest_info *manifest, const char *spcoid)
{ {
FILE *fp; int fd;
BlockNumber blkno = 0; BlockNumber blkno = 0;
bool block_retry = false; bool block_retry = false;
char buf[TAR_SEND_SIZE]; char buf[TAR_SEND_SIZE];
...@@ -1594,8 +1583,8 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1594,8 +1583,8 @@ sendFile(const char *readfilename, const char *tarfilename,
pg_checksum_init(&checksum_ctx, manifest->checksum_type); pg_checksum_init(&checksum_ctx, manifest->checksum_type);
fp = AllocateFile(readfilename, "rb"); fd = OpenTransientFile(readfilename, O_RDONLY | PG_BINARY);
if (fp == NULL) if (fd < 0)
{ {
if (errno == ENOENT && missing_ok) if (errno == ENOENT && missing_ok)
return false; return false;
...@@ -1637,8 +1626,27 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1637,8 +1626,27 @@ sendFile(const char *readfilename, const char *tarfilename,
} }
} }
while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0) /*
* Loop until we read the amount of data the caller told us to expect. The
* file could be longer, if it was extended while we were sending it, but
* for a base backup we can ignore such extended data. It will be restored
* from WAL.
*/
while (len < statbuf->st_size)
{ {
/* Try to read some more data. */
cnt = basebackup_read_file(fd, buf,
Min(sizeof(buf), statbuf->st_size - len),
len, readfilename, true);
/*
* If we hit end-of-file, a concurrent truncation must have occurred.
* That's not an error condition, because WAL replay will fix things
* up.
*/
if (cnt == 0)
break;
/* /*
* The checksums are verified at block level, so we iterate over the * The checksums are verified at block level, so we iterate over the
* buffer in chunks of BLCKSZ, after making sure that * buffer in chunks of BLCKSZ, after making sure that
...@@ -1689,16 +1697,15 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1689,16 +1697,15 @@ sendFile(const char *readfilename, const char *tarfilename,
*/ */
if (block_retry == false) if (block_retry == false)
{ {
/* Reread the failed block */ int reread_cnt;
if (fseek(fp, -(cnt - BLCKSZ * i), SEEK_CUR) == -1)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fseek in file \"%s\": %m",
readfilename)));
}
if (fread(buf + BLCKSZ * i, 1, BLCKSZ, fp) != BLCKSZ) /* Reread the failed block */
reread_cnt =
basebackup_read_file(fd, buf + BLCKSZ * i,
BLCKSZ, len + BLCKSZ * i,
readfilename,
false);
if (reread_cnt == 0)
{ {
/* /*
* If we hit end-of-file, a concurrent * If we hit end-of-file, a concurrent
...@@ -1708,24 +1715,8 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1708,24 +1715,8 @@ sendFile(const char *readfilename, const char *tarfilename,
* code that handles that case. (We must fix * code that handles that case. (We must fix
* up cnt first, though.) * up cnt first, though.)
*/ */
if (feof(fp)) cnt = BLCKSZ * i;
{ break;
cnt = BLCKSZ * i;
break;
}
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not reread block %d of file \"%s\": %m",
blkno, readfilename)));
}
if (fseek(fp, cnt - BLCKSZ * i - BLCKSZ, SEEK_CUR) == -1)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fseek in file \"%s\": %m",
readfilename)));
} }
/* Set flag so we know a retry was attempted */ /* Set flag so we know a retry was attempted */
...@@ -1768,20 +1759,8 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1768,20 +1759,8 @@ sendFile(const char *readfilename, const char *tarfilename,
len += cnt; len += cnt;
throttle(cnt); throttle(cnt);
if (feof(fp) || len >= statbuf->st_size)
{
/*
* Reached end of file. The file could be longer, if it was
* extended while we were sending it, but for a base backup we can
* ignore such extended data. It will be restored from WAL.
*/
break;
}
} }
CHECK_FREAD_ERROR(fp, readfilename);
/* If the file was truncated while we were sending it, pad it with zeros */ /* If the file was truncated while we were sending it, pad it with zeros */
if (len < statbuf->st_size) if (len < statbuf->st_size)
{ {
...@@ -1810,7 +1789,7 @@ sendFile(const char *readfilename, const char *tarfilename, ...@@ -1810,7 +1789,7 @@ sendFile(const char *readfilename, const char *tarfilename,
update_basebackup_progress(pad); update_basebackup_progress(pad);
} }
FreeFile(fp); CloseTransientFile(fd);
if (checksum_failures > 1) if (checksum_failures > 1)
{ {
...@@ -1996,3 +1975,35 @@ update_basebackup_progress(int64 delta) ...@@ -1996,3 +1975,35 @@ update_basebackup_progress(int64 delta)
pgstat_progress_update_multi_param(nparam, index, val); pgstat_progress_update_multi_param(nparam, index, val);
} }
/*
* Read some data from a file, setting a wait event and reporting any error
* encountered.
*
* If partial_read_ok is false, also report an error if the number of bytes
* read is not equal to the number of bytes requested.
*
* Returns the number of bytes read.
*/
static int
basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
const char *filename, bool partial_read_ok)
{
int rc;
pgstat_report_wait_start(WAIT_EVENT_BASEBACKUP_READ);
rc = pg_pread(fd, buf, nbytes, offset);
pgstat_report_wait_end();
if (rc < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", filename)));
if (!partial_read_ok && rc > 0 && rc != nbytes)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": read %d of %zu",
filename, rc, nbytes)));
return rc;
}
...@@ -913,7 +913,8 @@ typedef enum ...@@ -913,7 +913,8 @@ typedef enum
*/ */
typedef enum typedef enum
{ {
WAIT_EVENT_BUFFILE_READ = PG_WAIT_IO, WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO,
WAIT_EVENT_BUFFILE_READ,
WAIT_EVENT_BUFFILE_WRITE, WAIT_EVENT_BUFFILE_WRITE,
WAIT_EVENT_CONTROL_FILE_READ, WAIT_EVENT_CONTROL_FILE_READ,
WAIT_EVENT_CONTROL_FILE_SYNC, WAIT_EVENT_CONTROL_FILE_SYNC,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment