Commit 7082e614 authored by Andres Freund's avatar Andres Freund

Provide DSM segment to ExecXXXInitializeWorker functions.

Previously, executor nodes running in parallel worker processes didn't
have access to the dsm_segment object used for parallel execution.  In
order to support resource management based on DSM segment lifetime,
they need that.  So create a ParallelWorkerContext object to hold it
and pass it to all InitializeWorker functions.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
parent 09a77744
...@@ -1122,7 +1122,7 @@ ExecParallelReportInstrumentation(PlanState *planstate, ...@@ -1122,7 +1122,7 @@ ExecParallelReportInstrumentation(PlanState *planstate,
* is allocated and initialized by executor; that is, after ExecutorStart(). * is allocated and initialized by executor; that is, after ExecutorStart().
*/ */
static bool static bool
ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
{ {
if (planstate == NULL) if (planstate == NULL)
return false; return false;
...@@ -1131,40 +1131,44 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ...@@ -1131,40 +1131,44 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
{ {
case T_SeqScanState: case T_SeqScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
break; break;
case T_IndexScanState: case T_IndexScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc); ExecIndexScanInitializeWorker((IndexScanState *) planstate,
pwcxt);
break; break;
case T_IndexOnlyScanState: case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc); ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
pwcxt);
break; break;
case T_ForeignScanState: case T_ForeignScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate, ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc); pwcxt);
break; break;
case T_CustomScanState: case T_CustomScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate, ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc); pwcxt);
break; break;
case T_BitmapHeapScanState: case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc); ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break; break;
case T_SortState: case T_SortState:
/* even when not parallel-aware */ /* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc); ExecSortInitializeWorker((SortState *) planstate, pwcxt);
break; break;
default: default:
break; break;
} }
return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc); return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
pwcxt);
} }
/* /*
...@@ -1194,6 +1198,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -1194,6 +1198,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
int instrument_options = 0; int instrument_options = 0;
void *area_space; void *area_space;
dsa_area *area; dsa_area *area;
ParallelWorkerContext pwcxt;
/* Get fixed-size state. */ /* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
...@@ -1231,7 +1236,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -1231,7 +1236,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
RestoreParamExecParams(paramexec_space, queryDesc->estate); RestoreParamExecParams(paramexec_space, queryDesc->estate);
} }
ExecParallelInitializeWorker(queryDesc->planstate, toc); pwcxt.toc = toc;
pwcxt.seg = seg;
ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
/* Pass down any tuple bound */ /* Pass down any tuple bound */
ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
......
...@@ -1102,12 +1102,13 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ...@@ -1102,12 +1102,13 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
ParallelWorkerContext *pwcxt)
{ {
ParallelBitmapHeapState *pstate; ParallelBitmapHeapState *pstate;
Snapshot snapshot; Snapshot snapshot;
pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); pstate = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->pstate = pstate; node->pstate = pstate;
snapshot = RestoreSnapshot(pstate->phs_snapshot_data); snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
......
...@@ -210,7 +210,8 @@ ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) ...@@ -210,7 +210,8 @@ ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
} }
void void
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) ExecCustomScanInitializeWorker(CustomScanState *node,
ParallelWorkerContext *pwcxt)
{ {
const CustomExecMethods *methods = node->methods; const CustomExecMethods *methods = node->methods;
...@@ -219,8 +220,8 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) ...@@ -219,8 +220,8 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
int plan_node_id = node->ss.ps.plan->plan_node_id; int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate; void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id, false); coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
methods->InitializeWorkerCustomScan(node, toc, coordinate); methods->InitializeWorkerCustomScan(node, pwcxt->toc, coordinate);
} }
} }
......
...@@ -359,7 +359,8 @@ ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) ...@@ -359,7 +359,8 @@ ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt)
{ {
FdwRoutine *fdwroutine = node->fdwroutine; FdwRoutine *fdwroutine = node->fdwroutine;
...@@ -368,8 +369,8 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) ...@@ -368,8 +369,8 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
int plan_node_id = node->ss.ps.plan->plan_node_id; int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate; void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id, false); coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
} }
} }
......
...@@ -678,11 +678,12 @@ ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ...@@ -678,11 +678,12 @@ ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc) ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt)
{ {
ParallelIndexScanDesc piscan; ParallelIndexScanDesc piscan;
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ioss_ScanDesc = node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation, index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc, node->ioss_RelationDesc,
......
...@@ -1716,11 +1716,12 @@ ExecIndexScanReInitializeDSM(IndexScanState *node, ...@@ -1716,11 +1716,12 @@ ExecIndexScanReInitializeDSM(IndexScanState *node,
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc) ExecIndexScanInitializeWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt)
{ {
ParallelIndexScanDesc piscan; ParallelIndexScanDesc piscan;
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->iss_ScanDesc = node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation, index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc, node->iss_RelationDesc,
......
...@@ -348,11 +348,12 @@ ExecSeqScanReInitializeDSM(SeqScanState *node, ...@@ -348,11 +348,12 @@ ExecSeqScanReInitializeDSM(SeqScanState *node,
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) ExecSeqScanInitializeWorker(SeqScanState *node,
ParallelWorkerContext *pwcxt)
{ {
ParallelHeapScanDesc pscan; ParallelHeapScanDesc pscan;
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc = node->ss.ss_currentScanDesc =
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
} }
...@@ -420,10 +420,10 @@ ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt) ...@@ -420,10 +420,10 @@ ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecSortInitializeWorker(SortState *node, shm_toc *toc) ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
{ {
node->shared_info = node->shared_info =
shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true); shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
node->am_worker = true; node->am_worker = true;
} }
......
...@@ -45,6 +45,12 @@ typedef struct ParallelContext ...@@ -45,6 +45,12 @@ typedef struct ParallelContext
ParallelWorkerInfo *worker; ParallelWorkerInfo *worker;
} ParallelContext; } ParallelContext;
typedef struct ParallelWorkerContext
{
dsm_segment *seg;
shm_toc *toc;
} ParallelWorkerContext;
extern volatile bool ParallelMessagePending; extern volatile bool ParallelMessagePending;
extern int ParallelWorkerNumber; extern int ParallelWorkerNumber;
extern bool InitializingParallelWorker; extern bool InitializingParallelWorker;
......
...@@ -27,6 +27,6 @@ extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ...@@ -27,6 +27,6 @@ extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
shm_toc *toc); ParallelWorkerContext *pwcxt);
#endif /* NODEBITMAPHEAPSCAN_H */ #endif /* NODEBITMAPHEAPSCAN_H */
...@@ -37,7 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node, ...@@ -37,7 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node,
extern void ExecCustomScanReInitializeDSM(CustomScanState *node, extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node, extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc); ParallelWorkerContext *pwcxt);
extern void ExecShutdownCustomScan(CustomScanState *node); extern void ExecShutdownCustomScan(CustomScanState *node);
#endif /* NODECUSTOM_H */ #endif /* NODECUSTOM_H */
...@@ -28,7 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node, ...@@ -28,7 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc); ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node); extern void ExecShutdownForeignScan(ForeignScanState *node);
#endif /* NODEFOREIGNSCAN_H */ #endif /* NODEFOREIGNSCAN_H */
...@@ -31,6 +31,6 @@ extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ...@@ -31,6 +31,6 @@ extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
shm_toc *toc); ParallelWorkerContext *pwcxt);
#endif /* NODEINDEXONLYSCAN_H */ #endif /* NODEINDEXONLYSCAN_H */
...@@ -25,7 +25,8 @@ extern void ExecReScanIndexScan(IndexScanState *node); ...@@ -25,7 +25,8 @@ extern void ExecReScanIndexScan(IndexScanState *node);
extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc); extern void ExecIndexScanInitializeWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt);
/* /*
* These routines are exported to share code with nodeIndexonlyscan.c and * These routines are exported to share code with nodeIndexonlyscan.c and
......
...@@ -25,6 +25,7 @@ extern void ExecReScanSeqScan(SeqScanState *node); ...@@ -25,6 +25,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); extern void ExecSeqScanInitializeWorker(SeqScanState *node,
ParallelWorkerContext *pwcxt);
#endif /* NODESEQSCAN_H */ #endif /* NODESEQSCAN_H */
...@@ -27,7 +27,7 @@ extern void ExecReScanSort(SortState *node); ...@@ -27,7 +27,7 @@ extern void ExecReScanSort(SortState *node);
extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc); extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt);
extern void ExecSortRetrieveInstrumentation(SortState *node); extern void ExecSortRetrieveInstrumentation(SortState *node);
#endif /* NODESORT_H */ #endif /* NODESORT_H */
...@@ -1534,6 +1534,7 @@ ParallelHeapScanDesc ...@@ -1534,6 +1534,7 @@ ParallelHeapScanDesc
ParallelIndexScanDesc ParallelIndexScanDesc
ParallelSlot ParallelSlot
ParallelState ParallelState
ParallelWorkerContext
ParallelWorkerInfo ParallelWorkerInfo
Param Param
ParamExecData ParamExecData
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment