Commit c6dbf1fe authored by Robert Haas's avatar Robert Haas

Stop the executor if no more tuples can be sent from worker to leader.

If a Gather node has read as many tuples as it needs (for example, due
to Limit) it may detach the queue connecting it to the worker before
reading all of the worker's tuples.  Rather than let the worker
continue to generate and send all of the results, have it stop after
sending the next tuple.

More could be done here to stop the worker even quicker, but this is
about as well as we can hope to do for 9.6.

This is in response to a problem report from Andreas Seltenreich.
Commit 44339b89 should be actually be
sufficient to fix that example even without this change, but it seems
better to do this, too, since we might otherwise waste quite a large
amount of effort in one or more workers.

Discussion: CAA4eK1KOKGqmz9bGu+Z42qhRwMbm4R5rfnqsLCNqFs9j14jzEA@mail.gmail.com

Amit Kapila
parent 44339b89
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
static void printtup_startup(DestReceiver *self, int operation, static void printtup_startup(DestReceiver *self, int operation,
TupleDesc typeinfo); TupleDesc typeinfo);
static void printtup(TupleTableSlot *slot, DestReceiver *self); static bool printtup(TupleTableSlot *slot, DestReceiver *self);
static void printtup_20(TupleTableSlot *slot, DestReceiver *self); static bool printtup_20(TupleTableSlot *slot, DestReceiver *self);
static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self); static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
static void printtup_shutdown(DestReceiver *self); static void printtup_shutdown(DestReceiver *self);
static void printtup_destroy(DestReceiver *self); static void printtup_destroy(DestReceiver *self);
...@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs) ...@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
* printtup --- print a tuple in protocol 3.0 * printtup --- print a tuple in protocol 3.0
* ---------------- * ----------------
*/ */
static void static bool
printtup(TupleTableSlot *slot, DestReceiver *self) printtup(TupleTableSlot *slot, DestReceiver *self)
{ {
TupleDesc typeinfo = slot->tts_tupleDescriptor; TupleDesc typeinfo = slot->tts_tupleDescriptor;
...@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self) ...@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */ /* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext); MemoryContextReset(myState->tmpcontext);
return true;
} }
/* ---------------- /* ----------------
* printtup_20 --- print a tuple in protocol 2.0 * printtup_20 --- print a tuple in protocol 2.0
* ---------------- * ----------------
*/ */
static void static bool
printtup_20(TupleTableSlot *slot, DestReceiver *self) printtup_20(TupleTableSlot *slot, DestReceiver *self)
{ {
TupleDesc typeinfo = slot->tts_tupleDescriptor; TupleDesc typeinfo = slot->tts_tupleDescriptor;
...@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self) ...@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */ /* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext); MemoryContextReset(myState->tmpcontext);
return true;
} }
/* ---------------- /* ----------------
...@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
* debugtup - print one tuple for an interactive backend * debugtup - print one tuple for an interactive backend
* ---------------- * ----------------
*/ */
void bool
debugtup(TupleTableSlot *slot, DestReceiver *self) debugtup(TupleTableSlot *slot, DestReceiver *self)
{ {
TupleDesc typeinfo = slot->tts_tupleDescriptor; TupleDesc typeinfo = slot->tts_tupleDescriptor;
...@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self) ...@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
printatt((unsigned) i + 1, typeinfo->attrs[i], value); printatt((unsigned) i + 1, typeinfo->attrs[i], value);
} }
printf("\t----\n"); printf("\t----\n");
return true;
} }
/* ---------------- /* ----------------
...@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self) ...@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
* This is largely same as printtup_20, except we use binary formatting. * This is largely same as printtup_20, except we use binary formatting.
* ---------------- * ----------------
*/ */
static void static bool
printtup_internal_20(TupleTableSlot *slot, DestReceiver *self) printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
{ {
TupleDesc typeinfo = slot->tts_tupleDescriptor; TupleDesc typeinfo = slot->tts_tupleDescriptor;
...@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self) ...@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
/* Return to caller's context, and flush row's temporary memory */ /* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext); MemoryContextReset(myState->tmpcontext);
return true;
} }
...@@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/* /*
* copy_dest_receive --- receive one tuple * copy_dest_receive --- receive one tuple
*/ */
static void static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{ {
DR_copy *myState = (DR_copy *) self; DR_copy *myState = (DR_copy *) self;
...@@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* And send the data */ /* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
myState->processed++; myState->processed++;
return true;
} }
/* /*
......
...@@ -62,7 +62,7 @@ typedef struct ...@@ -62,7 +62,7 @@ typedef struct
static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0}; static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0};
static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void intorel_receive(TupleTableSlot *slot, DestReceiver *self); static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
static void intorel_shutdown(DestReceiver *self); static void intorel_shutdown(DestReceiver *self);
static void intorel_destroy(DestReceiver *self); static void intorel_destroy(DestReceiver *self);
...@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/* /*
* intorel_receive --- receive one tuple * intorel_receive --- receive one tuple
*/ */
static void static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self) intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{ {
DR_intorel *myState = (DR_intorel *) self; DR_intorel *myState = (DR_intorel *) self;
...@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
myState->bistate); myState->bistate);
/* We know this is a newly created relation, so there are no indexes */ /* We know this is a newly created relation, so there are no indexes */
return true;
} }
/* /*
......
...@@ -56,7 +56,7 @@ typedef struct ...@@ -56,7 +56,7 @@ typedef struct
static int matview_maintenance_depth = 0; static int matview_maintenance_depth = 0;
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self); static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self); static void transientrel_destroy(DestReceiver *self);
static void refresh_matview_datafill(DestReceiver *dest, Query *query, static void refresh_matview_datafill(DestReceiver *dest, Query *query,
...@@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/* /*
* transientrel_receive --- receive one tuple * transientrel_receive --- receive one tuple
*/ */
static void static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self) transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{ {
DR_transientrel *myState = (DR_transientrel *) self; DR_transientrel *myState = (DR_transientrel *) self;
...@@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
myState->bistate); myState->bistate);
/* We know this is a newly created relation, so there are no indexes */ /* We know this is a newly created relation, so there are no indexes */
return true;
} }
/* /*
......
...@@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate, ...@@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate,
* practice, this is probably always the case at this point.) * practice, this is probably always the case at this point.)
*/ */
if (sendTuples) if (sendTuples)
(*dest->receiveSlot) (slot, dest); {
/*
* If we are not able to send the tuple, we assume the destination
* has closed and no more tuples can be sent. If that's the case,
* end the loop.
*/
if (!((*dest->receiveSlot) (slot, dest)))
break;
}
/* /*
* Count tuples processed, if this is a SELECT. (For other operation * Count tuples processed, if this is a SELECT. (For other operation
......
...@@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull) ...@@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
ExecStoreVirtualTuple(slot); ExecStoreVirtualTuple(slot);
/* send the tuple to the receiver */ /* send the tuple to the receiver */
(*tstate->dest->receiveSlot) (slot, tstate->dest); (void) (*tstate->dest->receiveSlot) (slot, tstate->dest);
/* clean up */ /* clean up */
ExecClearTuple(slot); ExecClearTuple(slot);
......
...@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot, ...@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot,
static void sql_exec_error_callback(void *arg); static void sql_exec_error_callback(void *arg);
static void ShutdownSQLFunction(Datum arg); static void ShutdownSQLFunction(Datum arg);
static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
static void sqlfunction_shutdown(DestReceiver *self); static void sqlfunction_shutdown(DestReceiver *self);
static void sqlfunction_destroy(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self);
...@@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
/* /*
* sqlfunction_receive --- receive one tuple * sqlfunction_receive --- receive one tuple
*/ */
static void static bool
sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
{ {
DR_sqlfunction *myState = (DR_sqlfunction *) self; DR_sqlfunction *myState = (DR_sqlfunction *) self;
...@@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
/* Store the filtered tuple into the tuplestore */ /* Store the filtered tuple into the tuplestore */
tuplestore_puttupleslot(myState->tstore, slot); tuplestore_puttupleslot(myState->tstore, slot);
return true;
} }
/* /*
......
...@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* store tuple retrieved by Executor into SPITupleTable * store tuple retrieved by Executor into SPITupleTable
* of current SPI procedure * of current SPI procedure
*/ */
void bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self) spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{ {
SPITupleTable *tuptable; SPITupleTable *tuptable;
...@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self) ...@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
(tuptable->free)--; (tuptable->free)--;
MemoryContextSwitchTo(oldcxt); MemoryContextSwitchTo(oldcxt);
return true;
} }
/* /*
......
...@@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); ...@@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
* type over a range type over a range type over an array type over a record, * type over a range type over a range type over an array type over a record,
* or something like that. * or something like that.
*/ */
static void static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{ {
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
TupleDesc tupledesc = slot->tts_tupleDescriptor; TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple; HeapTuple tuple;
shm_mq_result result;
/* /*
* Test to see whether the tupledesc has changed; if so, set up for the * Test to see whether the tupledesc has changed; if so, set up for the
...@@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) ...@@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
} }
/* Send the tuple itself. */ /* Send the tuple itself. */
shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
if (result == SHM_MQ_DETACHED)
return false;
else if (result != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("unable to send tuples")));
return true;
} }
/* /*
......
...@@ -37,8 +37,8 @@ typedef struct ...@@ -37,8 +37,8 @@ typedef struct
} TStoreState; } TStoreState;
static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
/* /*
...@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) ...@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
* Receive a tuple from the executor and store it in the tuplestore. * Receive a tuple from the executor and store it in the tuplestore.
* This is for the easy case where we don't have to detoast. * This is for the easy case where we don't have to detoast.
*/ */
static void static bool
tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
{ {
TStoreState *myState = (TStoreState *) self; TStoreState *myState = (TStoreState *) self;
tuplestore_puttupleslot(myState->tstore, slot); tuplestore_puttupleslot(myState->tstore, slot);
return true;
} }
/* /*
* Receive a tuple from the executor and store it in the tuplestore. * Receive a tuple from the executor and store it in the tuplestore.
* This is for the case where we have to detoast any toasted values. * This is for the case where we have to detoast any toasted values.
*/ */
static void static bool
tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
{ {
TStoreState *myState = (TStoreState *) self; TStoreState *myState = (TStoreState *) self;
...@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) ...@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
/* And release any temporary detoasted values */ /* And release any temporary detoasted values */
for (i = 0; i < nfree; i++) for (i = 0; i < nfree; i++)
pfree(DatumGetPointer(myState->tofree[i])); pfree(DatumGetPointer(myState->tofree[i]));
return true;
} }
/* /*
......
...@@ -45,9 +45,10 @@ ...@@ -45,9 +45,10 @@
* dummy DestReceiver functions * dummy DestReceiver functions
* ---------------- * ----------------
*/ */
static void static bool
donothingReceive(TupleTableSlot *slot, DestReceiver *self) donothingReceive(TupleTableSlot *slot, DestReceiver *self)
{ {
return true;
} }
static void static void
......
...@@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count, ...@@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count,
if (!ok) if (!ok)
break; break;
(*dest->receiveSlot) (slot, dest); /*
* If we are not able to send the tuple, we assume the destination
* has closed and no more tuples can be sent. If that's the case,
* end the loop.
*/
if (!((*dest->receiveSlot) (slot, dest)))
break;
ExecClearTuple(slot); ExecClearTuple(slot);
......
...@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist, ...@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist,
extern void debugStartup(DestReceiver *self, int operation, extern void debugStartup(DestReceiver *self, int operation,
TupleDesc typeinfo); TupleDesc typeinfo);
extern void debugtup(TupleTableSlot *slot, DestReceiver *self); extern bool debugtup(TupleTableSlot *slot, DestReceiver *self);
/* XXX these are really in executor/spi.c */ /* XXX these are really in executor/spi.c */
extern void spi_dest_startup(DestReceiver *self, int operation, extern void spi_dest_startup(DestReceiver *self, int operation,
TupleDesc typeinfo); TupleDesc typeinfo);
extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self); extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self);
#endif /* PRINTTUP_H */ #endif /* PRINTTUP_H */
...@@ -104,7 +104,9 @@ typedef enum ...@@ -104,7 +104,9 @@ typedef enum
* pointers that the executor must call. * pointers that the executor must call.
* *
* Note: the receiveSlot routine must be passed a slot containing a TupleDesc * Note: the receiveSlot routine must be passed a slot containing a TupleDesc
* identical to the one given to the rStartup routine. * identical to the one given to the rStartup routine. It returns bool where
* a "true" value means "continue processing" and a "false" value means
* "stop early, just as if we'd reached the end of the scan".
* ---------------- * ----------------
*/ */
typedef struct _DestReceiver DestReceiver; typedef struct _DestReceiver DestReceiver;
...@@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver; ...@@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver;
struct _DestReceiver struct _DestReceiver
{ {
/* Called for each tuple to be output: */ /* Called for each tuple to be output: */
void (*receiveSlot) (TupleTableSlot *slot, bool (*receiveSlot) (TupleTableSlot *slot,
DestReceiver *self); DestReceiver *self);
/* Per-executor-run initialization and shutdown: */ /* Per-executor-run initialization and shutdown: */
void (*rStartup) (DestReceiver *self, void (*rStartup) (DestReceiver *self,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment