Commit f033f6d2 authored by Tom Lane's avatar Tom Lane

Modify parallel pg_restore to track pending and ready items by means of

two new lists, rather than repeatedly rescanning the main TOC list.
This avoids a potential O(N^2) slowdown, although you'd need a *lot*
of tables to make that really significant; and it might simplify future
improvements in the scheduling algorithm by making the set of ready
items more easily inspectable.  The original thought that it would
in itself result in a more efficient job dispatch order doesn't seem
to have been borne out in testing, but it seems worth doing anyway.
parent 05f43650
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -59,12 +59,14 @@ ...@@ -59,12 +59,14 @@
#define thandle HANDLE #define thandle HANDLE
#endif #endif
/* Arguments needed for a worker child */
typedef struct _restore_args typedef struct _restore_args
{ {
ArchiveHandle *AH; ArchiveHandle *AH;
TocEntry *te; TocEntry *te;
} RestoreArgs; } RestoreArgs;
/* State for each parallel activity slot */
typedef struct _parallel_slot typedef struct _parallel_slot
{ {
thandle child_id; thandle child_id;
...@@ -117,11 +119,15 @@ static thandle spawn_restore(RestoreArgs *args); ...@@ -117,11 +119,15 @@ static thandle spawn_restore(RestoreArgs *args);
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
static bool work_in_progress(ParallelSlot *slots, int n_slots); static bool work_in_progress(ParallelSlot *slots, int n_slots);
static int get_next_slot(ParallelSlot *slots, int n_slots); static int get_next_slot(ParallelSlot *slots, int n_slots);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH, static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry **first_unprocessed, TocEntry *ready_list,
ParallelSlot *slots, int n_slots); ParallelSlot *slots, int n_slots);
static parallel_restore_result parallel_restore(RestoreArgs *args); static parallel_restore_result parallel_restore(RestoreArgs *args);
static void mark_work_done(ArchiveHandle *AH, thandle worker, int status, static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
thandle worker, int status,
ParallelSlot *slots, int n_slots); ParallelSlot *slots, int n_slots);
static void fix_dependencies(ArchiveHandle *AH); static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
...@@ -129,7 +135,8 @@ static void repoint_table_dependencies(ArchiveHandle *AH, ...@@ -129,7 +135,8 @@ static void repoint_table_dependencies(ArchiveHandle *AH,
DumpId tableId, DumpId tableDataId); DumpId tableId, DumpId tableDataId);
static void identify_locking_dependencies(TocEntry *te, static void identify_locking_dependencies(TocEntry *te,
TocEntry **tocsByDumpId); TocEntry **tocsByDumpId);
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
static ArchiveHandle *CloneArchive(ArchiveHandle *AH); static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
...@@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
ParallelSlot *slots; ParallelSlot *slots;
int work_status; int work_status;
int next_slot; int next_slot;
TocEntry *first_unprocessed = AH->toc->next; TocEntry pending_list;
TocEntry ready_list;
TocEntry *next_work_item; TocEntry *next_work_item;
thandle ret_child; thandle ret_child;
TocEntry *te; TocEntry *te;
...@@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
* faster in a single connection because we avoid all the connection and * faster in a single connection because we avoid all the connection and
* setup overhead. * setup overhead.
*/ */
while ((next_work_item = get_next_work_item(AH, &first_unprocessed, for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
NULL, 0)) != NULL)
{ {
if (next_work_item->section == SECTION_DATA || if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA) next_work_item->section == SECTION_POST_DATA)
...@@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
(void) restore_toc_entry(AH, next_work_item, ropt, false); (void) restore_toc_entry(AH, next_work_item, ropt, false);
next_work_item->restored = true; /* there should be no touch of ready_list here, so pass NULL */
reduce_dependencies(AH, next_work_item); reduce_dependencies(AH, next_work_item, NULL);
} }
/* /*
...@@ -3128,6 +3135,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3128,6 +3135,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
AH->currTablespace = NULL; AH->currTablespace = NULL;
AH->currWithOids = -1; AH->currWithOids = -1;
/*
* Initialize the lists of pending and ready items. After this setup,
* the pending list is everything that needs to be done but is blocked
* by one or more dependencies, while the ready list contains items that
* have no remaining dependencies. Note: we don't yet filter out entries
* that aren't going to be restored. They might participate in
* dependency chains connecting entries that should be restored, so we
* treat them as live until we actually process them.
*/
par_list_header_init(&pending_list);
par_list_header_init(&ready_list);
for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
{
if (next_work_item->depCount > 0)
par_list_append(&pending_list, next_work_item);
else
par_list_append(&ready_list, next_work_item);
}
/* /*
* main parent loop * main parent loop
* *
...@@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
ahlog(AH, 1, "entering main parallel loop\n"); ahlog(AH, 1, "entering main parallel loop\n");
while ((next_work_item = get_next_work_item(AH, &first_unprocessed, while ((next_work_item = get_next_work_item(AH, &ready_list,
slots, n_slots)) != NULL || slots, n_slots)) != NULL ||
work_in_progress(slots, n_slots)) work_in_progress(slots, n_slots))
{ {
...@@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
next_work_item->dumpId, next_work_item->dumpId,
next_work_item->desc, next_work_item->tag); next_work_item->desc, next_work_item->tag);
next_work_item->restored = true; par_list_remove(next_work_item);
reduce_dependencies(AH, next_work_item); reduce_dependencies(AH, next_work_item, &ready_list);
continue; continue;
} }
...@@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
next_work_item->dumpId, next_work_item->dumpId,
next_work_item->desc, next_work_item->tag); next_work_item->desc, next_work_item->tag);
next_work_item->restored = true; par_list_remove(next_work_item);
/* this memory is dealloced in mark_work_done() */ /* this memory is dealloced in mark_work_done() */
args = malloc(sizeof(RestoreArgs)); args = malloc(sizeof(RestoreArgs));
...@@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
if (WIFEXITED(work_status)) if (WIFEXITED(work_status))
{ {
mark_work_done(AH, ret_child, WEXITSTATUS(work_status), mark_work_done(AH, &ready_list,
ret_child, WEXITSTATUS(work_status),
slots, n_slots); slots, n_slots);
} }
else else
...@@ -3222,15 +3249,12 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ...@@ -3222,15 +3249,12 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
* dependencies, or some other pathological condition. If so, do it in the * dependencies, or some other pathological condition. If so, do it in the
* single parent connection. * single parent connection.
*/ */
for (te = AH->toc->next; te != AH->toc; te = te->next) for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
{
if (!te->restored)
{ {
ahlog(AH, 1, "processing missed item %d %s %s\n", ahlog(AH, 1, "processing missed item %d %s %s\n",
te->dumpId, te->desc, te->tag); te->dumpId, te->desc, te->tag);
(void) restore_toc_entry(AH, te, ropt, false); (void) restore_toc_entry(AH, te, ropt, false);
} }
}
/* The ACLs will be handled back in RestoreArchive. */ /* The ACLs will be handled back in RestoreArchive. */
} }
...@@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2) ...@@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
} }
/*
* Initialize the header of a parallel-processing list.
*
* These are circular lists with a dummy TocEntry as header, just like the
* main TOC list; but we use separate list links so that an entry can be in
* the main TOC list as well as in a parallel-processing list.
*/
static void
par_list_header_init(TocEntry *l)
{
l->par_prev = l->par_next = l;
}
/* Append te to the end of the parallel-processing list headed by l */
static void
par_list_append(TocEntry *l, TocEntry *te)
{
te->par_prev = l->par_prev;
l->par_prev->par_next = te;
l->par_prev = te;
te->par_next = l;
}
/* Remove te from whatever parallel-processing list it's in */
static void
par_list_remove(TocEntry *te)
{
te->par_prev->par_next = te->par_next;
te->par_next->par_prev = te->par_prev;
te->par_prev = NULL;
te->par_next = NULL;
}
/* /*
* Find the next work item (if any) that is capable of being run now. * Find the next work item (if any) that is capable of being run now.
* *
* To qualify, the item must have no remaining dependencies * To qualify, the item must have no remaining dependencies
* and no requirement for locks that is incompatible with * and no requirements for locks that are incompatible with
* items currently running. * items currently running. Items in the ready_list are known to have
* no remaining dependencies, but we have to check for lock conflicts.
* *
* first_unprocessed is state data that tracks the location of the first * Note that the returned item has *not* been removed from ready_list.
* TocEntry that's not marked 'restored'. This avoids O(N^2) search time * The caller must do that after successfully dispatching the item.
* with long TOC lists. (Even though the constant is pretty small, it'd
* get us eventually.)
* *
* pref_non_data is for an alternative selection algorithm that gives * pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running. * preference to non-data items if there is already a data load running.
* It is currently disabled. * It is currently disabled.
*/ */
static TocEntry * static TocEntry *
get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
ParallelSlot *slots, int n_slots) ParallelSlot *slots, int n_slots)
{ {
bool pref_non_data = false; /* or get from AH->ropt */ bool pref_non_data = false; /* or get from AH->ropt */
...@@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, ...@@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
} }
/* /*
* Advance first_unprocessed if possible. * Search the ready_list until we find a suitable item.
*/
for (te = *first_unprocessed; te != AH->toc; te = te->next)
{
if (!te->restored)
break;
}
*first_unprocessed = te;
/*
* Search from first_unprocessed until we find an available item.
*/ */
for (; te != AH->toc; te = te->next) for (te = ready_list->par_next; te != ready_list; te = te->par_next)
{ {
bool conflicts = false; bool conflicts = false;
/* Ignore if already done or still waiting on dependencies */
if (te->restored || te->depCount > 0)
continue;
/* /*
* Check to see if the item would need exclusive lock on something * Check to see if the item would need exclusive lock on something
* that a currently running item also needs lock on, or vice versa. If * that a currently running item also needs lock on, or vice versa. If
...@@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args) ...@@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args)
* update status, and reduce the dependency count of any dependent items. * update status, and reduce the dependency count of any dependent items.
*/ */
static void static void
mark_work_done(ArchiveHandle *AH, thandle worker, int status, mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
thandle worker, int status,
ParallelSlot *slots, int n_slots) ParallelSlot *slots, int n_slots)
{ {
TocEntry *te = NULL; TocEntry *te = NULL;
...@@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status, ...@@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status,
die_horribly(AH, modulename, "worker process failed: exit code %d\n", die_horribly(AH, modulename, "worker process failed: exit code %d\n",
status); status);
reduce_dependencies(AH, te); reduce_dependencies(AH, te, ready_list);
} }
...@@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH) ...@@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH)
* indexes the TOC entries by dump ID, rather than searching the TOC list * indexes the TOC entries by dump ID, rather than searching the TOC list
* repeatedly. Entries for dump IDs not present in the TOC will be NULL. * repeatedly. Entries for dump IDs not present in the TOC will be NULL.
* *
* Also, initialize the depCount fields. * Also, initialize the depCount fields, and make sure all the TOC items
* are marked as not being in any parallel-processing list.
*/ */
tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *)); tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
for (te = AH->toc->next; te != AH->toc; te = te->next) for (te = AH->toc->next; te != AH->toc; te = te->next)
{ {
tocsByDumpId[te->dumpId - 1] = te; tocsByDumpId[te->dumpId - 1] = te;
te->depCount = te->nDeps; te->depCount = te->nDeps;
te->par_prev = NULL;
te->par_next = NULL;
} }
/* /*
...@@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId) ...@@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
/* /*
* Remove the specified TOC entry from the depCounts of items that depend on * Remove the specified TOC entry from the depCounts of items that depend on
* it, thereby possibly making them ready-to-run. * it, thereby possibly making them ready-to-run. Any pending item that
* becomes ready should be moved to the ready list.
*/ */
static void static void
reduce_dependencies(ArchiveHandle *AH, TocEntry *te) reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
{ {
DumpId target = te->dumpId; DumpId target = te->dumpId;
int i; int i;
...@@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te) ...@@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
for (i = 0; i < te->nDeps; i++) for (i = 0; i < te->nDeps; i++)
{ {
if (te->dependencies[i] == target) if (te->dependencies[i] == target)
{
te->depCount--; te->depCount--;
if (te->depCount == 0 && te->par_prev != NULL)
{
/* It must be in the pending list, so remove it ... */
par_list_remove(te);
/* ... and add to ready_list */
par_list_append(ready_list, te);
}
}
} }
} }
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.81 2009/08/04 21:56:09 tgl Exp $ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.82 2009/08/07 22:48:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -314,7 +314,8 @@ typedef struct _tocEntry ...@@ -314,7 +314,8 @@ typedef struct _tocEntry
void *formatData; /* TOC Entry data specific to file format */ void *formatData; /* TOC Entry data specific to file format */
/* working state (needed only for parallel restore) */ /* working state (needed only for parallel restore) */
bool restored; /* item is in progress or done */ struct _tocEntry *par_prev; /* list links for pending/ready items; */
struct _tocEntry *par_next; /* these are NULL if not in either list */
bool created; /* set for DATA member if TABLE was created */ bool created; /* set for DATA member if TABLE was created */
int depCount; /* number of dependencies not yet restored */ int depCount; /* number of dependencies not yet restored */
DumpId *lockDeps; /* dumpIds of objects this one needs lock on */ DumpId *lockDeps; /* dumpIds of objects this one needs lock on */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment