Commit bb684c82 authored by Etsuro Fujita's avatar Etsuro Fujita

Minor code cleanup in asynchronous execution support.

This is cleanup for commit 27e1f145:

* ExecAppendAsyncEventWait(), which was modified a bit further by commit
  a8af856d, duplicated the same nevents calculation.  Simplify the code
  a little bit to avoid the duplication.  Update comments there.
* Add an assertion to ExecAppendAsyncRequest().
* Update a comment about merging the async_capable options from input
  relations in merge_fdw_options(), per complaint from Kyotaro Horiguchi.
* Add a comment for fetch_more_data_begin().

Author: Etsuro Fujita
Discussion: https://postgr.es/m/CAPmGK1637W30Wx3MnrReewhafn6F_0J76mrJGoFXFnpPq4QfvA%40mail.gmail.com
parent d479d002
...@@ -5835,7 +5835,10 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, ...@@ -5835,7 +5835,10 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
/* /*
* We'll prefer to consider this join async-capable if any table from * We'll prefer to consider this join async-capable if any table from
* either side of the join is considered async-capable. * either side of the join is considered async-capable. This would be
* reasonable because in that case the foreign server would have its
* own resources to scan that table asynchronously, and the join could
* also be computed asynchronously using the resources.
*/ */
fpinfo->async_capable = fpinfo_o->async_capable || fpinfo->async_capable = fpinfo_o->async_capable ||
fpinfo_i->async_capable; fpinfo_i->async_capable;
...@@ -6893,6 +6896,9 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) ...@@ -6893,6 +6896,9 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
/* /*
* Begin an asynchronous data fetch. * Begin an asynchronous data fetch.
* *
* Note: this function assumes there is no currently-in-progress asynchronous
* data fetch.
*
* Note: fetch_more_data must be called to fetch the result. * Note: fetch_more_data must be called to fetch the result.
*/ */
static void static void
......
...@@ -952,7 +952,10 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) ...@@ -952,7 +952,10 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
/* Nothing to do if there are no async subplans needing a new request. */ /* Nothing to do if there are no async subplans needing a new request. */
if (bms_is_empty(node->as_needrequest)) if (bms_is_empty(node->as_needrequest))
{
Assert(node->as_nasyncresults == 0);
return false; return false;
}
/* /*
* If there are any asynchronously-generated results that have not yet * If there are any asynchronously-generated results that have not yet
...@@ -998,17 +1001,16 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) ...@@ -998,17 +1001,16 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
static void static void
ExecAppendAsyncEventWait(AppendState *node) ExecAppendAsyncEventWait(AppendState *node)
{ {
int nevents = node->as_nasyncplans + 1;
long timeout = node->as_syncdone ? -1 : 0; long timeout = node->as_syncdone ? -1 : 0;
WaitEvent occurred_event[EVENT_BUFFER_SIZE]; WaitEvent occurred_event[EVENT_BUFFER_SIZE];
int noccurred; int noccurred;
int nevents;
int i; int i;
/* We should never be called when there are no valid async subplans. */ /* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0); Assert(node->as_nasyncremain > 0);
node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
node->as_nasyncplans + 1);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL); NULL, NULL);
...@@ -1022,8 +1024,14 @@ ExecAppendAsyncEventWait(AppendState *node) ...@@ -1022,8 +1024,14 @@ ExecAppendAsyncEventWait(AppendState *node)
ExecAsyncConfigureWait(areq); ExecAsyncConfigureWait(areq);
} }
/* Wait for at least one event to occur. */ /* We wait on at most EVENT_BUFFER_SIZE events. */
nevents = Min(node->as_nasyncplans + 1, EVENT_BUFFER_SIZE); if (nevents > EVENT_BUFFER_SIZE)
nevents = EVENT_BUFFER_SIZE;
/*
* If the timeout is -1, wait until at least one event occurs. If the
* timeout is 0, poll for events, but do not wait at all.
*/
noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
nevents, WAIT_EVENT_APPEND_READY); nevents, WAIT_EVENT_APPEND_READY);
FreeWaitEventSet(node->as_eventset); FreeWaitEventSet(node->as_eventset);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment