Commit f009591d authored by Tom Lane's avatar Tom Lane

Cope with data-offset-less archive files during out-of-order restores.

pg_dump produces custom-format archive files that lack data offsets
when it is unable to seek its output.  Up to now that's been a hazard
for pg_restore.  But if pg_restore is able to seek in the archive
file, there is no reason to throw up our hands when asked to restore
data blocks out of order.  Instead, whenever we are searching for a
data block, record the locations of the blocks we passed over (that
is, fill in the missing data-offset fields in our in-memory copy of
the TOC data).  Then, when we hit a case that requires going
backwards, we can just seek back.

Also track the furthest point that we've searched to, and seek back
to there when beginning a search for a new data block.  This avoids
possible O(N^2) time consumption, by ensuring that each data block
is examined at most twice.  (On Unix systems, that's at most twice
per parallel-restore job; but since Windows uses threads here, the
threads can share block location knowledge, reducing the amount of
duplicated work.)

We can also improve the code a bit by using fseeko() to skip over
data blocks during the search.

This is all of some use even in simple restores, but it's really
significant for parallel pg_restore.  In that case, we require
seekability of the input already, and we will very probably need
to do out-of-order restores.

Back-patch to v12, as this fixes a regression introduced by commit
548e5097.  Before that, parallel restore avoided requesting
out-of-order restores, so it would work on a data-offset-less
archive.  Now it will again.

Ideally this patch would include some test coverage, but there are
other open bugs that need to be fixed before we can extend our
coverage of parallel restore very much.  Plan to revisit that later.

David Gilman and Tom Lane; reviewed by Justin Pryzby

Discussion: https://postgr.es/m/CALBH9DDuJ+scZc4MEvw5uO-=vRyR2=QF9+Yh=3hPEnKHWfS81A@mail.gmail.com
parent a8d0732a
...@@ -246,12 +246,14 @@ PostgreSQL documentation ...@@ -246,12 +246,14 @@ PostgreSQL documentation
<term><option>--jobs=<replaceable class="parameter">number-of-jobs</replaceable></option></term> <term><option>--jobs=<replaceable class="parameter">number-of-jobs</replaceable></option></term>
<listitem> <listitem>
<para> <para>
Run the most time-consuming parts Run the most time-consuming steps
of <application>pg_restore</application> &mdash; those which load data, of <application>pg_restore</application> &mdash; those that load data,
create indexes, or create constraints &mdash; using multiple create indexes, or create constraints &mdash; concurrently, using up
concurrent jobs. This option can dramatically reduce the time to <replaceable class="parameter">number-of-jobs</replaceable>
concurrent sessions. This option can dramatically reduce the time
to restore a large database to a server running on a to restore a large database to a server running on a
multiprocessor machine. multiprocessor machine. This option is ignored when emitting a script
rather than connecting directly to a database server.
</para> </para>
<para> <para>
...@@ -274,8 +276,7 @@ PostgreSQL documentation ...@@ -274,8 +276,7 @@ PostgreSQL documentation
Only the custom and directory archive formats are supported Only the custom and directory archive formats are supported
with this option. with this option.
The input must be a regular file or directory (not, for example, a The input must be a regular file or directory (not, for example, a
pipe). This option is ignored when emitting a script rather pipe or standard input). Also, multiple
than connecting directly to a database server. Also, multiple
jobs cannot be used together with the jobs cannot be used together with the
option <option>--single-transaction</option>. option <option>--single-transaction</option>.
</para> </para>
......
...@@ -70,6 +70,8 @@ typedef struct ...@@ -70,6 +70,8 @@ typedef struct
{ {
CompressorState *cs; CompressorState *cs;
int hasSeek; int hasSeek;
/* lastFilePos is used only when reading, and may be invalid if !hasSeek */
pgoff_t lastFilePos; /* position after last data block we've read */
} lclContext; } lclContext;
typedef struct typedef struct
...@@ -181,8 +183,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) ...@@ -181,8 +183,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
ReadHead(AH); ReadHead(AH);
ReadToc(AH); ReadToc(AH);
}
/*
* Remember location of first data block (i.e., the point after TOC)
* in case we have to search for desired data blocks.
*/
ctx->lastFilePos = _getFilePos(AH, ctx);
}
} }
/* /*
...@@ -418,13 +425,62 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) ...@@ -418,13 +425,62 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
{ {
/* /*
* We cannot seek directly to the desired block. Instead, skip over * We cannot seek directly to the desired block. Instead, skip over
* block headers until we find the one we want. This could fail if we * block headers until we find the one we want. Remember the
* are asked to restore items out-of-order. * positions of skipped-over blocks, so that if we later decide we
* need to read one, we'll be able to seek to it.
*
* When our input file is seekable, we can do the search starting from
* the point after the last data block we scanned in previous
* iterations of this function.
*/ */
if (ctx->hasSeek)
{
if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
fatal("error during file seek: %m");
}
for (;;)
{
pgoff_t thisBlkPos = _getFilePos(AH, ctx);
_readBlockHeader(AH, &blkType, &id); _readBlockHeader(AH, &blkType, &id);
while (blkType != EOF && id != te->dumpId) if (blkType == EOF || id == te->dumpId)
break;
/* Remember the block position, if we got one */
if (thisBlkPos >= 0)
{
TocEntry *otherte = getTocEntryByDumpId(AH, id);
if (otherte && otherte->formatData)
{
lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
/*
* Note: on Windows, multiple threads might access/update
* the same lclTocEntry concurrently, but that should be
* safe as long as we update dataPos before dataState.
* Ideally, we'd use pg_write_barrier() to enforce that,
* but the needed infrastructure doesn't exist in frontend
* code. But Windows only runs on machines with strong
* store ordering, so it should be okay for now.
*/
if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
{ {
othertctx->dataPos = thisBlkPos;
othertctx->dataState = K_OFFSET_POS_SET;
}
else if (othertctx->dataPos != thisBlkPos ||
othertctx->dataState != K_OFFSET_POS_SET)
{
/* sanity check */
pg_log_warning("data block %d has wrong seek position",
id);
}
}
}
switch (blkType) switch (blkType)
{ {
case BLK_DATA: case BLK_DATA:
...@@ -440,7 +496,6 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) ...@@ -440,7 +496,6 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
blkType); blkType);
break; break;
} }
_readBlockHeader(AH, &blkType, &id);
} }
} }
else else
...@@ -452,20 +507,18 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) ...@@ -452,20 +507,18 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
_readBlockHeader(AH, &blkType, &id); _readBlockHeader(AH, &blkType, &id);
} }
/* Produce suitable failure message if we fell off end of file */ /*
* If we reached EOF without finding the block we want, then either it
* doesn't exist, or it does but we lack the ability to seek back to it.
*/
if (blkType == EOF) if (blkType == EOF)
{ {
if (tctx->dataState == K_OFFSET_POS_NOT_SET) if (!ctx->hasSeek)
fatal("could not find block ID %d in archive -- "
"possibly due to out-of-order restore request, "
"which cannot be handled due to lack of data offsets in archive",
te->dumpId);
else if (!ctx->hasSeek)
fatal("could not find block ID %d in archive -- " fatal("could not find block ID %d in archive -- "
"possibly due to out-of-order restore request, " "possibly due to out-of-order restore request, "
"which cannot be handled due to non-seekable input file", "which cannot be handled due to non-seekable input file",
te->dumpId); te->dumpId);
else /* huh, the dataPos led us to EOF? */ else
fatal("could not find block ID %d in archive -- " fatal("could not find block ID %d in archive -- "
"possibly corrupt archive", "possibly corrupt archive",
te->dumpId); te->dumpId);
...@@ -491,6 +544,20 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) ...@@ -491,6 +544,20 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
blkType); blkType);
break; break;
} }
/*
* If our input file is seekable but lacks data offsets, update our
* knowledge of where to start future searches from. (Note that we did
* not update the current TE's dataState/dataPos. We could have, but
* there is no point since it will not be visited again.)
*/
if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
{
pgoff_t curPos = _getFilePos(AH, ctx);
if (curPos > ctx->lastFilePos)
ctx->lastFilePos = curPos;
}
} }
/* /*
...@@ -548,6 +615,7 @@ _skipBlobs(ArchiveHandle *AH) ...@@ -548,6 +615,7 @@ _skipBlobs(ArchiveHandle *AH)
static void static void
_skipData(ArchiveHandle *AH) _skipData(ArchiveHandle *AH)
{ {
lclContext *ctx = (lclContext *) AH->formatData;
size_t blkLen; size_t blkLen;
char *buf = NULL; char *buf = NULL;
int buflen = 0; int buflen = 0;
...@@ -555,6 +623,13 @@ _skipData(ArchiveHandle *AH) ...@@ -555,6 +623,13 @@ _skipData(ArchiveHandle *AH)
blkLen = ReadInt(AH); blkLen = ReadInt(AH);
while (blkLen != 0) while (blkLen != 0)
{
if (ctx->hasSeek)
{
if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
fatal("error during file seek: %m");
}
else
{ {
if (blkLen > buflen) if (blkLen > buflen)
{ {
...@@ -570,6 +645,7 @@ _skipData(ArchiveHandle *AH) ...@@ -570,6 +645,7 @@ _skipData(ArchiveHandle *AH)
else else
fatal("could not read from input file: %m"); fatal("could not read from input file: %m");
} }
}
blkLen = ReadInt(AH); blkLen = ReadInt(AH);
} }
...@@ -804,6 +880,9 @@ _Clone(ArchiveHandle *AH) ...@@ -804,6 +880,9 @@ _Clone(ArchiveHandle *AH)
{ {
lclContext *ctx = (lclContext *) AH->formatData; lclContext *ctx = (lclContext *) AH->formatData;
/*
* Each thread must have private lclContext working state.
*/
AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
memcpy(AH->formatData, ctx, sizeof(lclContext)); memcpy(AH->formatData, ctx, sizeof(lclContext));
ctx = (lclContext *) AH->formatData; ctx = (lclContext *) AH->formatData;
...@@ -813,10 +892,13 @@ _Clone(ArchiveHandle *AH) ...@@ -813,10 +892,13 @@ _Clone(ArchiveHandle *AH)
fatal("compressor active"); fatal("compressor active");
/* /*
* We intentionally do not clone TOC-entry-local state: it's useful to
* share knowledge about where the data blocks are across threads.
* _PrintTocData has to be careful about the order of operations on that
* state, though.
*
* Note: we do not make a local lo_buf because we expect at most one BLOBS * Note: we do not make a local lo_buf because we expect at most one BLOBS
* entry per archive, so no parallelism is possible. Likewise, * entry per archive, so no parallelism is possible.
* TOC-entry-local state isn't an issue because any one TOC entry is
* touched by just one worker child.
*/ */
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment