Commit 0109ab27 authored by Tom Lane's avatar Tom Lane

Make struct ParallelSlot private within pg_dump/parallel.c.

The only field of this struct that other files have any need to touch
is the pointer to the TocEntry a worker is working on.  (Well,
pg_backup_archiver.c is actually looking at workerStatus too, but that
can be finessed by specifying that the TocEntry pointer is NULL for a
non-busy worker.)

Hence, move out the TocEntry pointers to a separate array within
struct ParallelState, and then we can make struct ParallelSlot private.

I noted the possibility of this previously, but hadn't got round to
actually doing it.

Discussion: <1188.1464544443@sss.pgh.pa.us>
parent fb03d08a
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
* WRKR_IDLE: it's waiting for a command * WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's working on a command * WRKR_WORKING: it's working on a command
* WRKR_TERMINATED: process ended * WRKR_TERMINATED: process ended
* The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
* state, and must be NULL in other states.
*/ */
#include "postgres_fe.h" #include "postgres_fe.h"
...@@ -71,6 +73,45 @@ ...@@ -71,6 +73,45 @@
#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
/* Worker process statuses */
typedef enum
{
WRKR_IDLE,
WRKR_WORKING,
WRKR_TERMINATED
} T_WorkerStatus;
/*
* Private per-parallel-worker state (typedef for this is in parallel.h).
*
* Much of this is valid only in the master process (or, on Windows, should
* be touched only by the master thread). But the AH field should be touched
* only by workers. The pipe descriptors are valid everywhere.
*/
struct ParallelSlot
{
T_WorkerStatus workerStatus; /* see enum above */
/* These fields are valid if workerStatus == WRKR_WORKING: */
ParallelCompletionPtr callback; /* function to call on completion */
void *callback_data; /* passthru data for it */
ArchiveHandle *AH; /* Archive data worker is using */
int pipeRead; /* master's end of the pipes */
int pipeWrite;
int pipeRevRead; /* child's end of the pipes */
int pipeRevWrite;
/* Child process/thread identity info: */
#ifdef WIN32
uintptr_t hThread;
unsigned int threadId;
#else
pid_t pid;
#endif
};
#ifdef WIN32 #ifdef WIN32
/* /*
...@@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate) ...@@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate)
} }
#endif /* WIN32 */ #endif /* WIN32 */
/* On all platforms, update workerStatus as well */ /* On all platforms, update workerStatus and te[] as well */
Assert(j < pstate->numWorkers); Assert(j < pstate->numWorkers);
slot->workerStatus = WRKR_TERMINATED; slot->workerStatus = WRKR_TERMINATED;
pstate->te[j] = NULL;
} }
} }
...@@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH) ...@@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH)
{ {
ParallelState *pstate; ParallelState *pstate;
int i; int i;
const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
Assert(AH->public.numWorkers > 0); Assert(AH->public.numWorkers > 0);
pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
pstate->numWorkers = AH->public.numWorkers; pstate->numWorkers = AH->public.numWorkers;
pstate->te = NULL;
pstate->parallelSlot = NULL; pstate->parallelSlot = NULL;
if (AH->public.numWorkers == 1) if (AH->public.numWorkers == 1)
return pstate; return pstate;
pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); pstate->te = (TocEntry **)
memset((void *) pstate->parallelSlot, 0, slotSize); pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
pstate->parallelSlot = (ParallelSlot *)
pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
#ifdef WIN32 #ifdef WIN32
/* Make fmtId() and fmtQualifiedId() use thread-local storage */ /* Make fmtId() and fmtQualifiedId() use thread-local storage */
...@@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH) ...@@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH)
"could not create communication channels: %s\n", "could not create communication channels: %s\n",
strerror(errno)); strerror(errno));
pstate->te[i] = NULL; /* just for safety */
slot->workerStatus = WRKR_IDLE; slot->workerStatus = WRKR_IDLE;
slot->AH = NULL; slot->AH = NULL;
slot->te = NULL;
slot->callback = NULL; slot->callback = NULL;
slot->callback_data = NULL; slot->callback_data = NULL;
...@@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) ...@@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
set_cancel_pstate(NULL); set_cancel_pstate(NULL);
/* Release state (mere neatnik-ism, since we're about to terminate) */ /* Release state (mere neatnik-ism, since we're about to terminate) */
free(pstate->te);
free(pstate->parallelSlot); free(pstate->parallelSlot);
free(pstate); free(pstate);
} }
...@@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ...@@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH,
/* Remember worker is busy, and which TocEntry it's working on */ /* Remember worker is busy, and which TocEntry it's working on */
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
pstate->parallelSlot[worker].te = te;
pstate->parallelSlot[worker].callback = callback; pstate->parallelSlot[worker].callback = callback;
pstate->parallelSlot[worker].callback_data = callback_data; pstate->parallelSlot[worker].callback_data = callback_data;
pstate->te[worker] = te;
} }
/* /*
...@@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) ...@@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
if (messageStartsWith(msg, "OK ")) if (messageStartsWith(msg, "OK "))
{ {
ParallelSlot *slot = &pstate->parallelSlot[worker]; ParallelSlot *slot = &pstate->parallelSlot[worker];
TocEntry *te = slot->te; TocEntry *te = pstate->te[worker];
int status; int status;
status = parseWorkerResponse(AH, te, msg); status = parseWorkerResponse(AH, te, msg);
slot->callback(AH, te, status, slot->callback_data); slot->callback(AH, te, status, slot->callback_data);
slot->workerStatus = WRKR_IDLE; slot->workerStatus = WRKR_IDLE;
slot->te = NULL; pstate->te[worker] = NULL;
} }
else else
exit_horribly(modulename, exit_horribly(modulename,
......
...@@ -33,51 +33,16 @@ typedef enum ...@@ -33,51 +33,16 @@ typedef enum
WFW_ALL_IDLE WFW_ALL_IDLE
} WFW_WaitOption; } WFW_WaitOption;
/* Worker process statuses */ /* ParallelSlot is an opaque struct known only within parallel.c */
typedef enum typedef struct ParallelSlot ParallelSlot;
{
WRKR_IDLE,
WRKR_WORKING,
WRKR_TERMINATED
} T_WorkerStatus;
/*
* Per-parallel-worker state of parallel.c.
*
* Much of this is valid only in the master process (or, on Windows, should
* be touched only by the master thread). But the AH field should be touched
* only by workers. The pipe descriptors are valid everywhere.
*/
typedef struct ParallelSlot
{
T_WorkerStatus workerStatus; /* see enum above */
/* These fields are valid if workerStatus == WRKR_WORKING: */
TocEntry *te; /* item being worked on */
ParallelCompletionPtr callback; /* function to call on completion */
void *callback_data; /* passthru data for it */
ArchiveHandle *AH; /* Archive data worker is using */
int pipeRead; /* master's end of the pipes */
int pipeWrite;
int pipeRevRead; /* child's end of the pipes */
int pipeRevWrite;
/* Child process/thread identity info: */
#ifdef WIN32
uintptr_t hThread;
unsigned int threadId;
#else
pid_t pid;
#endif
} ParallelSlot;
/* Overall state for parallel.c */ /* Overall state for parallel.c */
typedef struct ParallelState typedef struct ParallelState
{ {
int numWorkers; /* allowed number of workers */ int numWorkers; /* allowed number of workers */
ParallelSlot *parallelSlot; /* array of numWorkers slots */ /* these arrays have numWorkers entries, one per worker: */
TocEntry **te; /* item being worked on, or NULL */
ParallelSlot *parallelSlot; /* private info about each worker */
} ParallelState; } ParallelState;
#ifdef WIN32 #ifdef WIN32
......
...@@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ...@@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
for (k = 0; k < pstate->numWorkers; k++) for (k = 0; k < pstate->numWorkers; k++)
{ {
if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING && TocEntry *running_te = pstate->te[k];
pstate->parallelSlot[k].te->section == SECTION_DATA)
if (running_te != NULL &&
running_te->section == SECTION_DATA)
count++; count++;
} }
if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
...@@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ...@@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
*/ */
for (i = 0; i < pstate->numWorkers; i++) for (i = 0; i < pstate->numWorkers; i++)
{ {
TocEntry *running_te; TocEntry *running_te = pstate->te[i];
if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) if (running_te == NULL)
continue; continue;
running_te = pstate->parallelSlot[i].te;
if (has_lock_conflicts(te, running_te) || if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te)) has_lock_conflicts(running_te, te))
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment