Commit 9779bda8 authored by Tom Lane's avatar Tom Lane

Fix newly-introduced issues in pgbench.

The result of FD_ISSET() doesn't necessarily fit in a bool, though
assigning it to one might accidentally work depending on platform and which
socket FD number is being inquired of.  Rewrite to test it with if(),
rather than making any specific assumption about the result width,
to match the way every other such call in PG is written.

Don't break out of the input_mask-filling loop after finding the first
client that we're waiting for results from.  That mostly breaks parallel
query management.

Also, if we choose not to call select(), be sure to clear out any bits
the mask-filling loop might have set, so that we don't accidentally call
doCustom for clients we don't know have input.  Doing so would likely
be harmless, but it's a waste of cycles and doesn't seem to be intended.

Make this_usec wide enough.  (Yeah, the value would usually fit in an
int, but then why are we using int64 everywhere else?)

Minor cosmetic adjustments, mostly comment improvements.

Problems introduced by commit 12788ae4.  The first issue was discovered
by buildfarm testing, the others by code review.
parent fdc9186f
......@@ -299,7 +299,7 @@ typedef enum
*/
CSTATE_ABORTED,
CSTATE_FINISHED
} ConnectionStateEnum;
} ConnectionStateEnum;
/*
* Connection state.
......@@ -4420,43 +4420,43 @@ threadRun(void *arg)
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
/* initialize explicitely the state machines */
/* explicitly initialize the state machines */
for (i = 0; i < nstate; i++)
{
state[i].state = CSTATE_CHOOSE_SCRIPT;
}
/* loop till all clients have terminated */
while (remains > 0)
{
fd_set input_mask;
int maxsock; /* max socket number to be waited */
int64 now_usec = 0;
int maxsock; /* max socket number to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
/* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
maxsock = -1;
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
int sock;
if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
/* interrupt client which has not started a transaction */
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
remains--;
PQfinish(st->con);
st->con = NULL;
continue;
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
/* a nap from the script, or under throttling */
int this_usec;
int64 this_usec;
if (min_usec == PG_INT64_MAX)
/* get current time if needed */
if (now_usec == 0)
{
instr_time now;
......@@ -4464,6 +4464,7 @@ threadRun(void *arg)
now_usec = INSTR_TIME_GET_MICROSEC(now);
}
/* min_usec should be the minimum delay across all clients */
this_usec = (st->state == CSTATE_SLEEP ?
st->sleep_until : st->txn_scheduled) - now_usec;
if (min_usec > this_usec)
......@@ -4475,22 +4476,26 @@ threadRun(void *arg)
* waiting for result from server - nothing to do unless the
* socket is readable
*/
sock = PQsocket(st->con);
int sock = PQsocket(st->con);
if (sock < 0)
{
fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
fprintf(stderr, "invalid socket: %s",
PQerrorMessage(st->con));
goto done;
}
FD_SET(sock, &input_mask);
if (maxsock < sock)
maxsock = sock;
break;
}
else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
else if (st->state != CSTATE_ABORTED &&
st->state != CSTATE_FINISHED)
{
/* the connection is ready to run */
/*
* This client thread is ready to do something, so we don't
* want to wait. No need to examine additional clients.
*/
min_usec = 0;
break;
}
......@@ -4515,9 +4520,10 @@ threadRun(void *arg)
}
/*
* Sleep until we receive data from the server, or a nap-time
* specified in the script ends, or it's time to print a progress
* report.
* If no clients are ready to execute actions, sleep until we receive
* data from the server, or a nap-time specified in the script ends,
* or it's time to print a progress report. Update input_mask to show
* which client(s) received data.
*/
if (min_usec > 0 && maxsock != -1)
{
......@@ -4536,21 +4542,29 @@ threadRun(void *arg)
if (nsocks < 0)
{
if (errno == EINTR)
{
/* On EINTR, go back to top of loop */
continue;
}
/* must be something wrong */
fprintf(stderr, "select() failed: %s\n", strerror(errno));
goto done;
}
}
else
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
}
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
bool ready;
if (st->state == CSTATE_WAIT_RESULT && st->con)
if (st->state == CSTATE_WAIT_RESULT)
{
/* don't call doCustom unless data is available */
int sock = PQsocket(st->con);
if (sock < 0)
......@@ -4560,22 +4574,24 @@ threadRun(void *arg)
goto done;
}
ready = FD_ISSET(sock, &input_mask);
if (!FD_ISSET(sock, &input_mask))
continue;
}
else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
ready = false;
else
ready = true;
if (ready)
else if (st->state == CSTATE_FINISHED ||
st->state == CSTATE_ABORTED)
{
doCustom(thread, st, &aggs);
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
remains--;
/* this client is done, no need to consider it anymore */
continue;
}
doCustom(thread, st, &aggs);
/* If doCustom changed client to finished state, reduce remains */
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
remains--;
}
/* progress report by thread 0 for all threads */
/* progress report is made by thread 0 for all threads */
if (progress && thread->tid == 0)
{
instr_time now_time;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment