Commit cdc71695 authored by Thomas Munro's avatar Thomas Munro

Use MinimalTuple for tuple queues.

This representation saves 8 bytes per tuple compared to HeapTuple, and
avoids the need to allocate, copy and free on the receiving side.

Gather can emit the returned MinimalTuple directly, but GatherMerge now
needs to make an explicit copy because it buffers multiple tuples at a
time.  That should be no worse than before.
Reviewed-by: default avatarSoumyadeep Chakraborty <soumyadeep2007@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
parent d2bddc25
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
static TupleTableSlot *ExecGather(PlanState *pstate); static TupleTableSlot *ExecGather(PlanState *pstate);
static TupleTableSlot *gather_getnext(GatherState *gatherstate); static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static HeapTuple gather_readnext(GatherState *gatherstate); static MinimalTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node); static void ExecShutdownGatherWorkers(GatherState *node);
...@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ...@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* Initialize funnel slot to same tuple descriptor as outer plan. * Initialize funnel slot to same tuple descriptor as outer plan.
*/ */
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc, gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
&TTSOpsHeapTuple); &TTSOpsMinimalTuple);
/* /*
* Gather doesn't support checking a qual (it's always more efficient to * Gather doesn't support checking a qual (it's always more efficient to
...@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate); PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot; TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot; TupleTableSlot *fslot = gatherstate->funnel_slot;
HeapTuple tup; MinimalTuple tup;
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{ {
...@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate) ...@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
ExecStoreHeapTuple(tup, /* tuple to store */ ExecStoreMinimalTuple(tup, /* tuple to store */
fslot, /* slot to store the tuple */ fslot, /* slot to store the tuple */
true); /* pfree tuple when done with it */ false); /* don't pfree tuple */
return fslot; return fslot;
} }
} }
...@@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate)
/* /*
* Attempt to read a tuple from one of our parallel workers. * Attempt to read a tuple from one of our parallel workers.
*/ */
static HeapTuple static MinimalTuple
gather_readnext(GatherState *gatherstate) gather_readnext(GatherState *gatherstate)
{ {
int nvisited = 0; int nvisited = 0;
...@@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate) ...@@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate)
for (;;) for (;;)
{ {
TupleQueueReader *reader; TupleQueueReader *reader;
HeapTuple tup; MinimalTuple tup;
bool readerdone; bool readerdone;
/* Check for async events, particularly messages from workers. */ /* Check for async events, particularly messages from workers. */
......
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
*/ */
typedef struct GMReaderTupleBuffer typedef struct GMReaderTupleBuffer
{ {
HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */ MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
int nTuples; /* number of tuples currently stored */ int nTuples; /* number of tuples currently stored */
int readCounter; /* index of next tuple to extract */ int readCounter; /* index of next tuple to extract */
bool done; /* true if reader is known exhausted */ bool done; /* true if reader is known exhausted */
...@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer ...@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
static TupleTableSlot *ExecGatherMerge(PlanState *pstate); static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg); static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
bool nowait, bool *done); bool nowait, bool *done);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static void gather_merge_setup(GatherMergeState *gm_state); static void gather_merge_setup(GatherMergeState *gm_state);
static void gather_merge_init(GatherMergeState *gm_state); static void gather_merge_init(GatherMergeState *gm_state);
...@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state) ...@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
{ {
/* Allocate the tuple array with length MAX_TUPLE_STORE */ /* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple = gm_state->gm_tuple_buffers[i].tuple =
(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
/* Initialize tuple slot for worker */ /* Initialize tuple slot for worker */
gm_state->gm_slots[i + 1] = gm_state->gm_slots[i + 1] =
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc, ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
&TTSOpsHeapTuple); &TTSOpsMinimalTuple);
} }
/* Allocate the resources for the merge */ /* Allocate the resources for the merge */
...@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state) ...@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
while (tuple_buffer->readCounter < tuple_buffer->nTuples) while (tuple_buffer->readCounter < tuple_buffer->nTuples)
heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]); pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
ExecClearTuple(gm_state->gm_slots[i + 1]); ExecClearTuple(gm_state->gm_slots[i + 1]);
} }
...@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader) ...@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
/* Try to fill additional slots in the array. */ /* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{ {
HeapTuple tuple; MinimalTuple tuple;
tuple = gm_readnext_tuple(gm_state, tuple = gm_readnext_tuple(gm_state,
reader, reader,
true, true,
&tuple_buffer->done); &tuple_buffer->done);
if (!HeapTupleIsValid(tuple)) if (!tuple)
break; break;
tuple_buffer->tuple[i] = tuple; tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++; tuple_buffer->nTuples++;
...@@ -637,7 +637,7 @@ static bool ...@@ -637,7 +637,7 @@ static bool
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{ {
GMReaderTupleBuffer *tuple_buffer; GMReaderTupleBuffer *tuple_buffer;
HeapTuple tup; MinimalTuple tup;
/* /*
* If we're being asked to generate a tuple from the leader, then we just * If we're being asked to generate a tuple from the leader, then we just
...@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ...@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
reader, reader,
nowait, nowait,
&tuple_buffer->done); &tuple_buffer->done);
if (!HeapTupleIsValid(tup)) if (!tup)
return false; return false;
/* /*
...@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ...@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
load_tuple_array(gm_state, reader); load_tuple_array(gm_state, reader);
} }
Assert(HeapTupleIsValid(tup)); Assert(tup);
/* Build the TupleTableSlot for the given tuple */ /* Build the TupleTableSlot for the given tuple */
ExecStoreHeapTuple(tup, /* tuple to store */ ExecStoreMinimalTuple(tup, /* tuple to store */
gm_state->gm_slots[reader], /* slot in which to store gm_state->gm_slots[reader], /* slot in which to store
* the tuple */ * the tuple */
true); /* pfree tuple when done with it */ true); /* pfree tuple when done with it */
return true; return true;
} }
...@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ...@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
/* /*
* Attempt to read a tuple from given worker. * Attempt to read a tuple from given worker.
*/ */
static HeapTuple static MinimalTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool *done) bool *done)
{ {
TupleQueueReader *reader; TupleQueueReader *reader;
HeapTuple tup; MinimalTuple tup;
/* Check for async events, particularly messages from workers. */ /* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, ...@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state->reader[nreader - 1]; reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done); tup = TupleQueueReaderNext(reader, nowait, done);
return tup; /*
* Since we'll be buffering these across multiple calls, we need to make a
* copy.
*/
return tup ? heap_copy_minimal_tuple(tup) : NULL;
} }
/* /*
......
...@@ -54,16 +54,16 @@ static bool ...@@ -54,16 +54,16 @@ static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{ {
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
HeapTuple tuple; MinimalTuple tuple;
shm_mq_result result; shm_mq_result result;
bool should_free; bool should_free;
/* Send the tuple itself. */ /* Send the tuple itself. */
tuple = ExecFetchSlotHeapTuple(slot, true, &should_free); tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
if (should_free) if (should_free)
heap_freetuple(tuple); pfree(tuple);
/* Check for failure. */ /* Check for failure. */
if (result == SHM_MQ_DETACHED) if (result == SHM_MQ_DETACHED)
...@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader) ...@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* nowait = true and no tuple is ready to return. *done, if not NULL, * nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false. * is set to true when there are no remaining tuples and otherwise to false.
* *
* The returned tuple, if any, is allocated in CurrentMemoryContext. * The returned tuple, if any, is either in shared memory or a private buffer
* Note that this routine must not leak memory! (We used to allow that, * and should not be freed. The pointer is invalid after the next call to
* but not any more.) * TupleQueueReaderNext().
* *
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call * accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned. * this with nowait = true even if nothing is returned.
*/ */
HeapTuple MinimalTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{ {
HeapTupleData htup; MinimalTuple tuple;
shm_mq_result result; shm_mq_result result;
Size nbytes; Size nbytes;
void *data; void *data;
...@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) ...@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Assert(result == SHM_MQ_SUCCESS); Assert(result == SHM_MQ_SUCCESS);
/* /*
* Set up a dummy HeapTupleData pointing to the data from the shm_mq * Return a pointer to the queue memory directly (which had better be
* (which had better be sufficiently aligned). * sufficiently aligned).
*/ */
ItemPointerSetInvalid(&htup.t_self); tuple = (MinimalTuple) data;
htup.t_tableOid = InvalidOid; Assert(tuple->t_len == nbytes);
htup.t_len = nbytes;
htup.t_data = data;
return heap_copytuple(&htup); return tuple;
} }
...@@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) ...@@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
List *tlist; List *tlist;
/* /*
* Although the Gather node can project, we prefer to push down such work * Push projection down to the child node. That way, the projection work
* to its child node, so demand an exact tlist from the child. * is parallelized, and there can be no system columns in the result (they
* can't travel through a tuple queue because it uses MinimalTuple
* representation).
*/ */
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
...@@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path) ...@@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
List *pathkeys = best_path->path.pathkeys; List *pathkeys = best_path->path.pathkeys;
List *tlist = build_path_tlist(root, &best_path->path); List *tlist = build_path_tlist(root, &best_path->path);
/* As with Gather, it's best to project away columns in the workers. */ /* As with Gather, project away columns in the workers. */
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
/* Create a shell for a GatherMerge plan. */ /* Create a shell for a GatherMerge plan. */
......
...@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); ...@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */ /* Use these to receive tuples from a shm_mq. */
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle); extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
extern void DestroyTupleQueueReader(TupleQueueReader *reader); extern void DestroyTupleQueueReader(TupleQueueReader *reader);
extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
bool nowait, bool *done); bool nowait, bool *done);
#endif /* TQUEUE_H */ #endif /* TQUEUE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment