Commit 548e5097 authored by Tom Lane's avatar Tom Lane

Improve parallel scheduling logic in pg_dump/pg_restore.

Previously, the way this worked was that a parallel pg_dump would
re-order the TABLE_DATA items in the dump's TOC into decreasing size
order, and separately re-order (some of) the INDEX items into decreasing
size order.  Then pg_dump would dump the items in that order.  Later,
parallel pg_restore just followed the TOC order.  This method had lots
of deficiencies:

* TOC ordering randomly differed between parallel and non-parallel
dumps, and was hard to predict in the former case, causing problems
for building stable pg_dump test cases.

* Parallel restore only followed a well-chosen order if the dump had
been done in parallel; in particular, this never happened for restore
from custom-format dumps.

* The best order for restore isn't necessarily the same as for dump,
and it's not really static either because of locking considerations.

* TABLE_DATA and INDEX items aren't the only things that might take a lot
of work during restore.  Scheduling was particularly stupid for the BLOBS
item, which might require lots of work during dump as well as restore,
but was left to the end in either case.

This patch removes the logic that changed the TOC order, fixing the
test instability problem.  Instead, we sort the parallelizable items
just before processing them during a parallel dump.  Independently
of that, parallel restore prioritizes the ready-to-execute tasks
based on the size of the underlying table.  In the case of dependent
tasks such as index, constraint, or foreign key creation, the largest
relevant table is used as the metric for estimating the task length.
(This is pretty crude, but it should be enough to avoid the case we
want to avoid, which is ending the run with just a few large tasks
such that we can't make use of all N workers.)

Patch by me, responding to a complaint from Peter Eisentraut,
who also reviewed the patch.

Discussion: https://postgr.es/m/5137fe12-d0a2-4971-61b6-eb4e7e8875f8@2ndquadrant.com
parent 20bef2c3
......@@ -252,18 +252,6 @@ extern void ConnectDatabase(Archive *AH,
extern void DisconnectDatabase(Archive *AHX);
extern PGconn *GetConnection(Archive *AHX);
/* Called to add a TOC entry */
extern void ArchiveEntry(Archive *AHX,
CatalogId catalogId, DumpId dumpId,
const char *tag,
const char *namespace, const char *tablespace,
const char *owner, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
const DumpId *deps, int nDeps,
DataDumperPtr dumpFn, void *dumpArg);
/* Called to write *data* to the archive */
extern void WriteData(Archive *AH, const void *data, size_t dLen);
......
This diff is collapsed.
......@@ -162,12 +162,12 @@ typedef int (*WriteBytePtrType) (ArchiveHandle *AH, const int i);
typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
typedef void (*SaveArchivePtrType) (ArchiveHandle *AH);
typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*PrepParallelRestorePtrType) (ArchiveHandle *AH);
typedef void (*ClonePtrType) (ArchiveHandle *AH);
typedef void (*DeClonePtrType) (ArchiveHandle *AH);
......@@ -297,6 +297,7 @@ struct _archiveHandle
WorkerJobDumpPtrType WorkerJobDumpPtr;
WorkerJobRestorePtrType WorkerJobRestorePtr;
PrepParallelRestorePtrType PrepParallelRestorePtr;
ClonePtrType ClonePtr; /* Clone format-specific fields */
DeClonePtrType DeClonePtr; /* Clean up cloned fields */
......@@ -387,12 +388,13 @@ struct _tocEntry
void *formatData; /* TOC Entry data specific to file format */
/* working state while dumping/restoring */
pgoff_t dataLength; /* item's data size; 0 if none or unknown */
teReqs reqs; /* do we need schema and/or data of object */
bool created; /* set for DATA member if TABLE was created */
/* working state (needed only for parallel restore) */
struct _tocEntry *par_prev; /* list links for pending/ready items; */
struct _tocEntry *par_next; /* these are NULL if not in either list */
struct _tocEntry *pending_prev; /* list links for pending-items list; */
struct _tocEntry *pending_next; /* NULL if not in that list */
int depCount; /* number of dependencies not yet restored */
DumpId *revDeps; /* dumpIds of objects depending on this one */
int nRevDeps; /* number of such dependencies */
......@@ -405,6 +407,18 @@ extern void on_exit_close_archive(Archive *AHX);
extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4);
/* Called to add a TOC entry */
extern TocEntry *ArchiveEntry(Archive *AHX,
CatalogId catalogId, DumpId dumpId,
const char *tag,
const char *namespace, const char *tablespace,
const char *owner, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
const DumpId *deps, int nDeps,
DataDumperPtr dumpFn, void *dumpArg);
extern void WriteTOC(ArchiveHandle *AH);
extern void ReadTOC(ArchiveHandle *AH);
extern void WriteHead(ArchiveHandle *AH);
......
......@@ -59,6 +59,8 @@ static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH, bool drop);
static void _PrepParallelRestore(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
......@@ -129,6 +131,8 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
AH->StartBlobPtr = _StartBlob;
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
AH->PrepParallelRestorePtr = _PrepParallelRestore;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
......@@ -775,6 +779,66 @@ _ReopenArchive(ArchiveHandle *AH)
strerror(errno));
}
/*
* Prepare for parallel restore.
*
* The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
* TOC entries' dataLength fields with appropriate values to guide the
* ordering of restore jobs. The source of said data is format-dependent,
* as is the exact meaning of the values.
*
* A format module might also choose to do other setup here.
*/
static void
_PrepParallelRestore(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
TocEntry *prev_te = NULL;
lclTocEntry *prev_tctx = NULL;
TocEntry *te;
/*
* Knowing that the data items were dumped out in TOC order, we can
* reconstruct the length of each item as the delta to the start offset of
* the next data item.
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
/*
* Ignore entries without a known data offset; if we were unable to
* seek to rewrite the TOC when creating the archive, this'll be all
* of them, and we'll end up with no size estimates.
*/
if (tctx->dataState != K_OFFSET_POS_SET)
continue;
/* Compute previous data item's length */
if (prev_te)
{
if (tctx->dataPos > prev_tctx->dataPos)
prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
}
prev_te = te;
prev_tctx = tctx;
}
/* If OK to seek, we can determine the length of the last item */
if (prev_te && ctx->hasSeek)
{
pgoff_t endpos;
if (fseeko(AH->FH, 0, SEEK_END) != 0)
exit_horribly(modulename, "error during file seek: %s\n",
strerror(errno));
endpos = ftello(AH->FH);
if (endpos > prev_tctx->dataPos)
prev_te->dataLength = endpos - prev_tctx->dataPos;
}
}
/*
* Clone format-specific fields during parallel restoration.
*/
......
......@@ -87,6 +87,7 @@ static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH);
static void _PrepParallelRestore(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
......@@ -132,6 +133,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
AH->PrepParallelRestorePtr = _PrepParallelRestore;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
......@@ -240,13 +242,13 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
char fn[MAXPGPATH];
tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
if (te->dataDumper)
if (strcmp(te->desc, "BLOBS") == 0)
tctx->filename = pg_strdup("blobs.toc");
else if (te->dataDumper)
{
snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
tctx->filename = pg_strdup(fn);
}
else if (strcmp(te->desc, "BLOBS") == 0)
tctx->filename = pg_strdup("blobs.toc");
else
tctx->filename = NULL;
......@@ -726,6 +728,68 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
strcat(buf, relativeFilename);
}
/*
* Prepare for parallel restore.
*
* The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
* TOC entries' dataLength fields with appropriate values to guide the
* ordering of restore jobs. The source of said data is format-dependent,
* as is the exact meaning of the values.
*
* A format module might also choose to do other setup here.
*/
static void
_PrepParallelRestore(ArchiveHandle *AH)
{
TocEntry *te;
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
char fname[MAXPGPATH];
struct stat st;
/*
* A dumpable object has set tctx->filename, any other object has not.
* (see _ArchiveEntry).
*/
if (tctx->filename == NULL)
continue;
/* We may ignore items not due to be restored */
if ((te->reqs & REQ_DATA) == 0)
continue;
/*
* Stat the file and, if successful, put its size in dataLength. When
* using compression, the physical file size might not be a very good
* guide to the amount of work involved in restoring the file, but we
* only need an approximate indicator of that.
*/
setFilePath(AH, fname, tctx->filename);
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
else
{
/* It might be compressed */
strlcat(fname, ".gz", sizeof(fname));
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
}
/*
* If this is the BLOBS entry, what we stat'd was blobs.toc, which
* most likely is a lot smaller than the actual blob data. We don't
* have a cheap way to estimate how much smaller, but fortunately it
* doesn't matter too much as long as we get the blobs processed
* reasonably early. Arbitrarily scale up by a factor of 1K.
*/
if (strcmp(te->desc, "BLOBS") == 0)
te->dataLength *= 1024;
}
}
/*
* Clone format-specific fields during parallel restoration.
*/
......
......@@ -54,6 +54,7 @@
#include "catalog/pg_trigger_d.h"
#include "catalog/pg_type_d.h"
#include "libpq/libpq-fs.h"
#include "storage/block.h"
#include "dumputils.h"
#include "parallel.h"
......@@ -845,10 +846,6 @@ main(int argc, char **argv)
*/
sortDumpableObjectsByTypeName(dobjs, numObjs);
/* If we do a parallel dump, we want the largest tables to go first */
if (archiveFormat == archDirectory && numWorkers > 1)
sortDataAndIndexObjectsBySize(dobjs, numObjs);
sortDumpableObjects(dobjs, numObjs,
boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
......@@ -2156,13 +2153,28 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo)
* See comments for BuildArchiveDependencies.
*/
if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
NULL, tbinfo->rolname,
false, "TABLE DATA", SECTION_DATA,
"", "", copyStmt,
&(tbinfo->dobj.dumpId), 1,
dumpFn, tdinfo);
{
TocEntry *te;
te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
NULL, tbinfo->rolname,
false, "TABLE DATA", SECTION_DATA,
"", "", copyStmt,
&(tbinfo->dobj.dumpId), 1,
dumpFn, tdinfo);
/*
* Set the TocEntry's dataLength in case we are doing a parallel dump
* and want to order dump jobs by table size. We choose to measure
* dataLength in table pages during dump, so no scaling is needed.
* However, relpages is declared as "integer" in pg_class, and hence
* also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
* Cast so that we get the right interpretation of table sizes
* exceeding INT_MAX pages.
*/
te->dataLength = (BlockNumber) tbinfo->relpages;
}
destroyPQExpBuffer(copyBuf);
destroyPQExpBuffer(clistBuf);
......@@ -6759,8 +6771,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
i_conoid,
i_condef,
i_tablespace,
i_indreloptions,
i_relpages;
i_indreloptions;
int ntups;
for (i = 0; i < numTables; i++)
......@@ -6807,7 +6818,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"i.indnkeyatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
"i.indisreplident, t.relpages, "
"i.indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
......@@ -6844,7 +6855,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
"i.indisreplident, t.relpages, "
"i.indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
......@@ -6877,7 +6888,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
"false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
......@@ -6906,7 +6917,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
"false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
......@@ -6938,7 +6949,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"t.relnatts AS indnkeyatts, "
"t.relnatts AS indnatts, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
"false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
......@@ -6974,7 +6985,6 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
i_indkey = PQfnumber(res, "indkey");
i_indisclustered = PQfnumber(res, "indisclustered");
i_indisreplident = PQfnumber(res, "indisreplident");
i_relpages = PQfnumber(res, "relpages");
i_contype = PQfnumber(res, "contype");
i_conname = PQfnumber(res, "conname");
i_condeferrable = PQfnumber(res, "condeferrable");
......@@ -7013,7 +7023,6 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't');
indxinfo[j].indisreplident = (PQgetvalue(res, j, i_indisreplident)[0] == 't');
indxinfo[j].parentidx = atooid(PQgetvalue(res, j, i_parentidx));
indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages));
contype = *(PQgetvalue(res, j, i_contype));
if (contype == 'p' || contype == 'u' || contype == 'x')
......@@ -8206,6 +8215,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"'' AS attfdwoptions,\n");
if (fout->remoteVersion >= 90100)
{
/*
* Since we only want to dump COLLATE clauses for attributes whose
* collation is different from their type's default, we use a CASE
......@@ -8214,6 +8224,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
appendPQExpBuffer(q,
"CASE WHEN a.attcollation <> t.typcollation "
"THEN a.attcollation ELSE 0 END AS attcollation,\n");
}
else
appendPQExpBuffer(q,
"0 AS attcollation,\n");
......@@ -8225,8 +8236,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
appendPQExpBuffer(q,
"'' AS attoptions\n");
/* need left join here to not fail on dropped columns ... */
appendPQExpBuffer(q,
/* need left join here to not fail on dropped columns ... */
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid\n"
"WHERE a.attrelid = '%u'::pg_catalog.oid "
......@@ -9772,12 +9783,31 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
break;
case DO_BLOB_DATA:
if (dobj->dump & DUMP_COMPONENT_DATA)
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
"", "", NULL,
NULL, 0,
dumpBlobs, NULL);
{
TocEntry *te;
te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
"", "", NULL,
NULL, 0,
dumpBlobs, NULL);
/*
* Set the TocEntry's dataLength in case we are doing a
* parallel dump and want to order dump jobs by table size.
* (We need some size estimate for every TocEntry with a
* DataDumper function.) We don't currently have any cheap
* way to estimate the size of blobs, but it doesn't matter;
* let's just set the size to a large value so parallel dumps
* will launch this job first. If there's lots of blobs, we
* win, and if there aren't, we don't lose much. (If you want
* to improve on this, really what you should be thinking
* about is allowing blob dumping to be parallelized, not just
* getting a smarter estimate for the single TOC entry.)
*/
te->dataLength = MaxBlockNumber;
}
break;
case DO_POLICY:
dumpPolicy(fout, (PolicyInfo *) dobj);
......
......@@ -370,7 +370,6 @@ typedef struct _indxInfo
Oid parentidx; /* if partitioned, parent index OID */
/* if there is an associated constraint object, its dumpId: */
DumpId indexconstraint;
int relpages; /* relpages of the underlying table */
} IndxInfo;
typedef struct _indexAttachInfo
......@@ -677,7 +676,6 @@ extern void parseOidArray(const char *str, Oid *array, int arraysize);
extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
DumpId preBoundaryId, DumpId postBoundaryId);
extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
/*
* version specific routines
......
......@@ -35,10 +35,6 @@ static const char *modulename = gettext_noop("sorter");
* pg_dump.c; that is, PRE_DATA objects must sort before DO_PRE_DATA_BOUNDARY,
* POST_DATA objects must sort after DO_POST_DATA_BOUNDARY, and DATA objects
* must sort between them.
*
* Note: sortDataAndIndexObjectsBySize wants to have all DO_TABLE_DATA and
* DO_INDEX objects in contiguous chunks, so do not reuse the values for those
* for other object types.
*/
static const int dbObjectTypePriority[] =
{
......@@ -111,96 +107,6 @@ static void repairDependencyLoop(DumpableObject **loop,
static void describeDumpableObject(DumpableObject *obj,
char *buf, int bufsize);
static int DOSizeCompare(const void *p1, const void *p2);
static int
findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
{
int i;
for (i = 0; i < numObjs; i++)
if (objs[i]->objType == type)
return i;
return -1;
}
static int
findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
{
int i;
for (i = start; i < numObjs; i++)
if (objs[i]->objType != type)
return i;
return numObjs - 1;
}
/*
* When we do a parallel dump, we want to start with the largest items first.
*
* Say we have the objects in this order:
* ....DDDDD....III....
*
* with D = Table data, I = Index, . = other object
*
* This sorting function now takes each of the D or I blocks and sorts them
* according to their size.
*/
void
sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
{
int startIdx,
endIdx;
void *startPtr;
if (numObjs <= 1)
return;
startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
if (startIdx >= 0)
{
endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
startPtr = objs + startIdx;
qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
DOSizeCompare);
}
startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
if (startIdx >= 0)
{
endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
startPtr = objs + startIdx;
qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
DOSizeCompare);
}
}
static int
DOSizeCompare(const void *p1, const void *p2)
{
DumpableObject *obj1 = *(DumpableObject **) p1;
DumpableObject *obj2 = *(DumpableObject **) p2;
int obj1_size = 0;
int obj2_size = 0;
if (obj1->objType == DO_TABLE_DATA)
obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
if (obj1->objType == DO_INDEX)
obj1_size = ((IndxInfo *) obj1)->relpages;
if (obj2->objType == DO_TABLE_DATA)
obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
if (obj2->objType == DO_INDEX)
obj2_size = ((IndxInfo *) obj2)->relpages;
/* we want to see the biggest item go first */
if (obj1_size > obj2_size)
return -1;
if (obj2_size > obj1_size)
return 1;
return 0;
}
/*
* Sort the given objects into a type/name-based ordering
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment