Commit 6b65a7fe authored by Andres Freund's avatar Andres Freund

Remove TupleDesc remapping logic from tqueue.c.

With the introduction of a shared memory record typmod registry, it is no
longer necessary to remap record typmods when sending tuples between backends
so most of tqueue.c can be removed.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
parent cc5f8136
...@@ -608,14 +608,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ...@@ -608,14 +608,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
/* /*
* Set up tuple queue readers to read the results of a parallel subplan. * Set up tuple queue readers to read the results of a parallel subplan.
* All the workers are expected to return tuples matching tupDesc.
* *
* This is separate from ExecInitParallelPlan() because we can launch the * This is separate from ExecInitParallelPlan() because we can launch the
* worker processes and let them start doing something before we do this. * worker processes and let them start doing something before we do this.
*/ */
void void
ExecParallelCreateReaders(ParallelExecutorInfo *pei, ExecParallelCreateReaders(ParallelExecutorInfo *pei)
TupleDesc tupDesc)
{ {
int nworkers = pei->pcxt->nworkers_launched; int nworkers = pei->pcxt->nworkers_launched;
int i; int i;
...@@ -631,8 +629,7 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei, ...@@ -631,8 +629,7 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei,
{ {
shm_mq_set_handle(pei->tqueue[i], shm_mq_set_handle(pei->tqueue[i],
pei->pcxt->worker[i].bgwhandle); pei->pcxt->worker[i].bgwhandle);
pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i], pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
tupDesc);
} }
} }
} }
......
...@@ -176,8 +176,7 @@ ExecGather(PlanState *pstate) ...@@ -176,8 +176,7 @@ ExecGather(PlanState *pstate)
/* Set up tuple queue readers to read the results. */ /* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0) if (pcxt->nworkers_launched > 0)
{ {
ExecParallelCreateReaders(node->pei, ExecParallelCreateReaders(node->pei);
fslot->tts_tupleDescriptor);
/* Make a working array showing the active readers */ /* Make a working array showing the active readers */
node->nreaders = pcxt->nworkers_launched; node->nreaders = pcxt->nworkers_launched;
node->reader = (TupleQueueReader **) node->reader = (TupleQueueReader **)
......
...@@ -217,7 +217,7 @@ ExecGatherMerge(PlanState *pstate) ...@@ -217,7 +217,7 @@ ExecGatherMerge(PlanState *pstate)
/* Set up tuple queue readers to read the results. */ /* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0) if (pcxt->nworkers_launched > 0)
{ {
ExecParallelCreateReaders(node->pei, node->tupDesc); ExecParallelCreateReaders(node->pei);
/* Make a working array showing the active readers */ /* Make a working array showing the active readers */
node->nreaders = pcxt->nworkers_launched; node->nreaders = pcxt->nworkers_launched;
node->reader = (TupleQueueReader **) node->reader = (TupleQueueReader **)
......
This diff is collapsed.
...@@ -36,8 +36,7 @@ typedef struct ParallelExecutorInfo ...@@ -36,8 +36,7 @@ typedef struct ParallelExecutorInfo
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers, int64 tuples_needed); EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei, extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei);
TupleDesc tupDesc);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(PlanState *planstate, extern void ExecParallelReinitialize(PlanState *planstate,
......
...@@ -24,8 +24,7 @@ typedef struct TupleQueueReader TupleQueueReader; ...@@ -24,8 +24,7 @@ typedef struct TupleQueueReader TupleQueueReader;
extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */ /* Use these to receive tuples from a shm_mq. */
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
TupleDesc tupledesc);
extern void DestroyTupleQueueReader(TupleQueueReader *reader); extern void DestroyTupleQueueReader(TupleQueueReader *reader);
extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
bool nowait, bool *done); bool nowait, bool *done);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment