Commit 3bac77c4 authored by Alvaro Herrera's avatar Alvaro Herrera

Rework the pgbench state machine code for clarity

This commit continues the code improvements started by commit
12788ae4.  With this commit, state machine transitions are better
contained in the routine that was called doCustom() and is now called
advanceConnectionState -- the resulting code is easier to reason about,
since there are no state changes occuring in the outer layer.

This change is prompted by future patches to add more features to
pgbench, which will need to effect some more surgery to this code.

Fabien's original had all the machine state changes inside one routine,
but I (Álvaro) thought that a subroutine to handle command execution is
more straightforward to review, so this commit does not match Fabien's
submission closely.  If something is broken, it's probably my fault.

Author: Fabien Coelho, Álvaro Herrera
Reviewed-by: Kirk Jamison
Discussion: https://postgr.es/m/alpine.DEB.2.21.1808111104320.1705@lancre
parent 03e10b96
...@@ -216,7 +216,7 @@ bool progress_timestamp = false; /* progress report with Unix time */ ...@@ -216,7 +216,7 @@ bool progress_timestamp = false; /* progress report with Unix time */
int nclients = 1; /* number of clients */ int nclients = 1; /* number of clients */
int nthreads = 1; /* number of threads */ int nthreads = 1; /* number of threads */
bool is_connect; /* establish connection for each transaction */ bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */ bool report_per_command; /* report per-command latencies */
int main_pid; /* main process id used in log filename */ int main_pid; /* main process id used in log filename */
char *pghost = ""; char *pghost = "";
...@@ -294,26 +294,33 @@ typedef enum ...@@ -294,26 +294,33 @@ typedef enum
{ {
/* /*
* The client must first choose a script to execute. Once chosen, it can * The client must first choose a script to execute. Once chosen, it can
* either be throttled (state CSTATE_START_THROTTLE under --rate) or start * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
* right away (state CSTATE_START_TX). * right away (state CSTATE_START_TX) or not start at all if the timer was
* exceeded (state CSTATE_FINISHED).
*/ */
CSTATE_CHOOSE_SCRIPT, CSTATE_CHOOSE_SCRIPT,
/* /*
* In CSTATE_START_THROTTLE state, we calculate when to begin the next * CSTATE_START_TX performs start-of-transaction processing. Establishes
* transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state * a new connection for the transaction in --connect mode, records the
* sleeps until that moment. (If throttling is not enabled, doCustom() * transaction start time, and proceed to the first command.
* falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.) *
* Note: once a script is started, it will either error or run till its
* end, where it may be interrupted. It is not interrupted while running,
* so pgbench --time is to be understood as tx are allowed to start in
* that time, and will finish when their work is completed.
*/ */
CSTATE_START_THROTTLE, CSTATE_START_TX,
CSTATE_THROTTLE,
/* /*
* CSTATE_START_TX performs start-of-transaction processing. Establishes * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
* a new connection for the transaction, in --connect mode, and records * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
* the transaction start time. * sleeps until that moment, then advances to CSTATE_START_TX, or
* CSTATE_FINISHED if the next transaction would start beyond the end of
* the run.
*/ */
CSTATE_START_TX, CSTATE_PREPARE_THROTTLE,
CSTATE_THROTTLE,
/* /*
* We loop through these states, to process each command in the script: * We loop through these states, to process each command in the script:
...@@ -322,10 +329,8 @@ typedef enum ...@@ -322,10 +329,8 @@ typedef enum
* command, the command is sent to the server, and we move to * command, the command is sent to the server, and we move to
* CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
* meta-commands are executed immediately. * meta-commands are executed immediately. If the command about to start
* * is actually beyond the end of the script, advance to CSTATE_END_TX.
* CSTATE_SKIP_COMMAND for conditional branches which are not executed,
* quickly skip commands that do not need any evaluation.
* *
* CSTATE_WAIT_RESULT waits until we get a result set back from the server * CSTATE_WAIT_RESULT waits until we get a result set back from the server
* for the current command. * for the current command.
...@@ -334,19 +339,25 @@ typedef enum ...@@ -334,19 +339,25 @@ typedef enum
* *
* CSTATE_END_COMMAND records the end-of-command timestamp, increments the * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
* command counter, and loops back to CSTATE_START_COMMAND state. * command counter, and loops back to CSTATE_START_COMMAND state.
*
* CSTATE_SKIP_COMMAND is used by conditional branches which are not
* executed. It quickly skip commands that do not need any evaluation.
* This state can move forward several commands, till there is something
* to do or the end of the script.
*/ */
CSTATE_START_COMMAND, CSTATE_START_COMMAND,
CSTATE_SKIP_COMMAND,
CSTATE_WAIT_RESULT, CSTATE_WAIT_RESULT,
CSTATE_SLEEP, CSTATE_SLEEP,
CSTATE_END_COMMAND, CSTATE_END_COMMAND,
CSTATE_SKIP_COMMAND,
/* /*
* CSTATE_END_TX performs end-of-transaction processing. Calculates * CSTATE_END_TX performs end-of-transaction processing. It calculates
* latency, and logs the transaction. In --connect mode, closes the * latency, and logs the transaction. In --connect mode, it closes the
* current connection. Chooses the next script to execute and starts over * current connection.
* in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no *
* more work to do. * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
* CSTATE_FINISHED if we have no more work to do.
*/ */
CSTATE_END_TX, CSTATE_END_TX,
...@@ -567,7 +578,10 @@ static void setNullValue(PgBenchValue *pv); ...@@ -567,7 +578,10 @@ static void setNullValue(PgBenchValue *pv);
static void setBoolValue(PgBenchValue *pv, bool bval); static void setBoolValue(PgBenchValue *pv, bool bval);
static void setIntValue(PgBenchValue *pv, int64 ival); static void setIntValue(PgBenchValue *pv, int64 ival);
static void setDoubleValue(PgBenchValue *pv, double dval); static void setDoubleValue(PgBenchValue *pv, double dval);
static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *); static bool evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr,
PgBenchValue *retval);
static instr_time doExecuteCommand(TState *thread, CState *st,
instr_time now);
static void doLog(TState *thread, CState *st, static void doLog(TState *thread, CState *st,
StatsData *agg, bool skipped, double latency, double lag); StatsData *agg, bool skipped, double latency, double lag);
static void processXactStats(TState *thread, CState *st, instr_time *now, static void processXactStats(TState *thread, CState *st, instr_time *now,
...@@ -2820,16 +2834,12 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs) ...@@ -2820,16 +2834,12 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
} }
/* /*
* Advance the state machine of a connection, if possible. * Advance the state machine of a connection.
*/ */
static void static void
doCustom(TState *thread, CState *st, StatsData *agg) advanceConnectionState(TState *thread, CState *st, StatsData *agg)
{ {
PGresult *res;
Command *command;
instr_time now; instr_time now;
bool end_tx_processed = false;
int64 wait;
/* /*
* gettimeofday() isn't free, so we get the current timestamp lazily the * gettimeofday() isn't free, so we get the current timestamp lazily the
...@@ -2843,39 +2853,80 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -2843,39 +2853,80 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* /*
* Loop in the state machine, until we have to wait for a result from the * Loop in the state machine, until we have to wait for a result from the
* server (or have to sleep, for throttling or for \sleep). * server or have to sleep for throttling or \sleep.
* *
* Note: In the switch-statement below, 'break' will loop back here, * Note: In the switch-statement below, 'break' will loop back here,
* meaning "continue in the state machine". Return is used to return to * meaning "continue in the state machine". Return is used to return to
* the caller. * the caller, giving the thread the opportunity to advance another
* client.
*/ */
for (;;) for (;;)
{ {
PGresult *res;
switch (st->state) switch (st->state)
{ {
/* /* Select transaction (script) to run. */
* Select transaction to run.
*/
case CSTATE_CHOOSE_SCRIPT: case CSTATE_CHOOSE_SCRIPT:
st->use_file = chooseScript(thread); st->use_file = chooseScript(thread);
Assert(conditional_stack_empty(st->cstack));
if (debug) if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id, fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc); sql_script[st->use_file].desc);
if (throttle_delay > 0) /*
st->state = CSTATE_START_THROTTLE; * If time is over, we're done; otherwise, get ready to start
else * a new transaction, or to get throttled if that's requested.
st->state = CSTATE_START_TX; */
/* check consistency */ st->state = timer_exceeded ? CSTATE_FINISHED :
Assert(conditional_stack_empty(st->cstack)); throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
break;
/* Start new transaction (script) */
case CSTATE_START_TX:
/* establish connection if needed, i.e. under --connect */
if (st->con == NULL)
{
instr_time start;
INSTR_TIME_SET_CURRENT_LAZY(now);
start = now;
if ((st->con = doConnect()) == NULL)
{
fprintf(stderr, "client %d aborted while establishing connection\n",
st->id);
st->state = CSTATE_ABORTED;
break;
}
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
}
/* record transaction start time */
INSTR_TIME_SET_CURRENT_LAZY(now);
st->txn_begin = now;
/*
* When not throttling, this is also the transaction's
* scheduled start time.
*/
if (!throttle_delay)
st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
/* Begin with the first command */
st->state = CSTATE_START_COMMAND;
st->command = 0;
break; break;
/* /*
* Handle throttling once per transaction by sleeping. * Handle throttling once per transaction by sleeping.
*/ */
case CSTATE_START_THROTTLE: case CSTATE_PREPARE_THROTTLE:
/* /*
* Generate a delay such that the series of delays will * Generate a delay such that the series of delays will
...@@ -2887,46 +2938,39 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -2887,46 +2938,39 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* away. * away.
*/ */
Assert(throttle_delay > 0); Assert(throttle_delay > 0);
wait = getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
thread->throttle_trigger += wait; thread->throttle_trigger +=
getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
st->txn_scheduled = thread->throttle_trigger; st->txn_scheduled = thread->throttle_trigger;
/*
* stop client if next transaction is beyond pgbench end of
* execution
*/
if (duration > 0 && st->txn_scheduled > end_time)
{
st->state = CSTATE_FINISHED;
break;
}
/* /*
* If --latency-limit is used, and this slot is already late * If --latency-limit is used, and this slot is already late
* so that the transaction will miss the latency limit even if * so that the transaction will miss the latency limit even if
* it completed immediately, we skip this time slot and * it completed immediately, skip this time slot and schedule
* iterate till the next slot that isn't late yet. But don't * to continue running on the next slot that isn't late yet.
* iterate beyond the -t limit, if one is given. * But don't iterate beyond the -t limit, if one is given.
*/ */
if (latency_limit) if (latency_limit)
{ {
int64 now_us; int64 now_us;
if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT_LAZY(now);
INSTR_TIME_SET_CURRENT(now);
now_us = INSTR_TIME_GET_MICROSEC(now); now_us = INSTR_TIME_GET_MICROSEC(now);
while (thread->throttle_trigger < now_us - latency_limit && while (thread->throttle_trigger < now_us - latency_limit &&
(nxacts <= 0 || st->cnt < nxacts)) (nxacts <= 0 || st->cnt < nxacts))
{ {
processXactStats(thread, st, &now, true, agg); processXactStats(thread, st, &now, true, agg);
/* next rendez-vous */ /* next rendez-vous */
wait = getPoissonRand(&thread->ts_throttle_rs, thread->throttle_trigger +=
throttle_delay); getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger; st->txn_scheduled = thread->throttle_trigger;
} }
/* stop client if -t exceeded */
/*
* stop client if -t was exceeded in the previous skip
* loop
*/
if (nxacts > 0 && st->cnt >= nxacts) if (nxacts > 0 && st->cnt >= nxacts)
{ {
st->state = CSTATE_FINISHED; st->state = CSTATE_FINISHED;
...@@ -2934,282 +2978,55 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -2934,282 +2978,55 @@ doCustom(TState *thread, CState *st, StatsData *agg)
} }
} }
st->state = CSTATE_THROTTLE;
if (debug)
fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
st->id, wait);
break;
/* /*
* Wait until it's time to start next transaction. * stop client if next transaction is beyond pgbench end of
* execution; otherwise, throttle it.
*/ */
case CSTATE_THROTTLE: st->state = end_time > 0 && st->txn_scheduled > end_time ?
if (INSTR_TIME_IS_ZERO(now)) CSTATE_FINISHED : CSTATE_THROTTLE;
INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
return; /* Still sleeping, nothing to do here */
/* Else done sleeping, start the transaction */
st->state = CSTATE_START_TX;
break; break;
/* Start new transaction */
case CSTATE_START_TX:
/*
* Establish connection on first call, or if is_connect is
* true.
*/
if (st->con == NULL)
{
instr_time start;
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
start = now;
if ((st->con = doConnect()) == NULL)
{
fprintf(stderr, "client %d aborted while establishing connection\n",
st->id);
st->state = CSTATE_ABORTED;
break;
}
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
}
/* /*
* Record transaction start time under logging, progress or * Wait until it's time to start next transaction.
* throttling.
*/ */
if (use_log || progress || throttle_delay || latency_limit || case CSTATE_THROTTLE:
per_script_stats) INSTR_TIME_SET_CURRENT_LAZY(now);
{
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
st->txn_begin = now;
/* if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
* When not throttling, this is also the transaction's return; /* still sleeping, nothing to do here */
* scheduled start time.
*/
if (!throttle_delay)
st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
}
/* Begin with the first command */ /* done sleeping, but don't start transaction if we're done */
st->command = 0; st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
st->state = CSTATE_START_COMMAND;
break; break;
/* /*
* Send a command to server (or execute a meta-command) * Send a command to server (or execute a meta-command)
*/ */
case CSTATE_START_COMMAND: case CSTATE_START_COMMAND:
command = sql_script[st->use_file].commands[st->command]; /* Transition to script end processing if done */
if (sql_script[st->use_file].commands[st->command] == NULL)
/*
* If we reached the end of the script, move to end-of-xact
* processing.
*/
if (command == NULL)
{ {
st->state = CSTATE_END_TX; st->state = CSTATE_END_TX;
break; break;
} }
/* /* record begin time of next command, and initiate it */
* Record statement start time if per-command latencies are if (report_per_command)
* requested
*/
if (is_latencies)
{ {
if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT_LAZY(now);
INSTR_TIME_SET_CURRENT(now);
st->stmt_begin = now; st->stmt_begin = now;
} }
now = doExecuteCommand(thread, st, now);
if (command->type == SQL_COMMAND) /*
{ * We're now waiting for an SQL command to complete, or
if (!sendCommand(st, command)) * finished processing a metacommand, or need to sleep, or
{ * something bad happened.
commandFailed(st, "SQL", "SQL command send failed"); */
st->state = CSTATE_ABORTED; Assert(st->state == CSTATE_WAIT_RESULT ||
} st->state == CSTATE_END_COMMAND ||
else st->state == CSTATE_SLEEP ||
st->state = CSTATE_WAIT_RESULT; st->state == CSTATE_ABORTED);
}
else if (command->type == META_COMMAND)
{
int argc = command->argc,
i;
char **argv = command->argv;
if (debug)
{
fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
}
if (command->meta == META_SLEEP)
{
/*
* A \sleep doesn't execute anything, we just get the
* delay from the argument, and enter the CSTATE_SLEEP
* state. (The per-command latency will be recorded
* in CSTATE_SLEEP state, not here, after the delay
* has elapsed.)
*/
int usec;
if (!evaluateSleep(st, argc, argv, &usec))
{
commandFailed(st, "sleep", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->state = CSTATE_SLEEP;
break;
}
else if (command->meta == META_SET ||
command->meta == META_IF ||
command->meta == META_ELIF)
{
/* backslash commands with an expression to evaluate */
PgBenchExpr *expr = command->expr;
PgBenchValue result;
if (command->meta == META_ELIF &&
conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
{
/*
* elif after executed block, skip eval and wait
* for endif
*/
conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
goto move_to_end_command;
}
if (!evaluateExpr(thread, st, expr, &result))
{
commandFailed(st, argv[0], "evaluation of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
if (command->meta == META_SET)
{
if (!putVariableValue(st, argv[0], argv[1], &result))
{
commandFailed(st, "set", "assignment of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
}
else /* if and elif evaluated cases */
{
bool cond = valueTruth(&result);
/* execute or not depending on evaluated condition */
if (command->meta == META_IF)
{
conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
else /* elif */
{
/*
* we should get here only if the "elif"
* needed evaluation
*/
Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
}
}
else if (command->meta == META_ELSE)
{
switch (conditional_stack_peek(st->cstack))
{
case IFSTATE_TRUE:
conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
break;
case IFSTATE_FALSE: /* inconsistent if active */
case IFSTATE_IGNORED: /* inconsistent if active */
case IFSTATE_NONE: /* else without if */
case IFSTATE_ELSE_TRUE: /* else after else */
case IFSTATE_ELSE_FALSE: /* else after else */
default:
/* dead code if conditional check is ok */
Assert(false);
}
goto move_to_end_command;
}
else if (command->meta == META_ENDIF)
{
Assert(!conditional_stack_empty(st->cstack));
conditional_stack_pop(st->cstack);
goto move_to_end_command;
}
else if (command->meta == META_SETSHELL)
{
bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
if (timer_exceeded) /* timeout */
{
st->state = CSTATE_FINISHED;
break;
}
else if (!ret) /* on error */
{
commandFailed(st, "setshell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
else
{
/* succeeded */
}
}
else if (command->meta == META_SHELL)
{
bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
if (timer_exceeded) /* timeout */
{
st->state = CSTATE_FINISHED;
break;
}
else if (!ret) /* on error */
{
commandFailed(st, "shell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
else
{
/* succeeded */
}
}
move_to_end_command:
/*
* executing the expression or shell command might take a
* non-negligible amount of time, so reset 'now'
*/
INSTR_TIME_SET_ZERO(now);
st->state = CSTATE_END_COMMAND;
}
break; break;
/* /*
...@@ -3220,6 +3037,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3220,6 +3037,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* quickly skip commands until something to do... */ /* quickly skip commands until something to do... */
while (true) while (true)
{ {
Command *command;
command = sql_script[st->use_file].commands[st->command]; command = sql_script[st->use_file].commands[st->command];
/* cannot reach end of script in that state */ /* cannot reach end of script in that state */
...@@ -3238,7 +3057,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3238,7 +3057,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
switch (conditional_stack_peek(st->cstack)) switch (conditional_stack_peek(st->cstack))
{ {
case IFSTATE_FALSE: case IFSTATE_FALSE:
if (command->meta == META_IF || command->meta == META_ELIF) if (command->meta == META_IF ||
command->meta == META_ELIF)
{ {
/* we must evaluate the condition */ /* we must evaluate the condition */
st->state = CSTATE_START_COMMAND; st->state = CSTATE_START_COMMAND;
...@@ -3246,7 +3066,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3246,7 +3066,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
else if (command->meta == META_ELSE) else if (command->meta == META_ELSE)
{ {
/* we must execute next command */ /* we must execute next command */
conditional_stack_poke(st->cstack, IFSTATE_ELSE_TRUE); conditional_stack_poke(st->cstack,
IFSTATE_ELSE_TRUE);
st->state = CSTATE_START_COMMAND; st->state = CSTATE_START_COMMAND;
st->command++; st->command++;
} }
...@@ -3299,6 +3120,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3299,6 +3120,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
} }
if (st->state != CSTATE_SKIP_COMMAND) if (st->state != CSTATE_SKIP_COMMAND)
/* out of quick skip command loop */
break; break;
} }
break; break;
...@@ -3307,11 +3129,11 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3307,11 +3129,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* Wait for the current SQL command to complete * Wait for the current SQL command to complete
*/ */
case CSTATE_WAIT_RESULT: case CSTATE_WAIT_RESULT:
command = sql_script[st->use_file].commands[st->command];
if (debug) if (debug)
fprintf(stderr, "client %d receiving\n", st->id); fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con)) if (!PQconsumeInput(st->con))
{ /* there's something wrong */ {
/* there's something wrong */
commandFailed(st, "SQL", "perhaps the backend died while processing"); commandFailed(st, "SQL", "perhaps the backend died while processing");
st->state = CSTATE_ABORTED; st->state = CSTATE_ABORTED;
break; break;
...@@ -3319,9 +3141,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3319,9 +3141,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if (PQisBusy(st->con)) if (PQisBusy(st->con))
return; /* don't have the whole result yet */ return; /* don't have the whole result yet */
/* /* Read and discard the query result */
* Read and discard the query result;
*/
res = PQgetResult(st->con); res = PQgetResult(st->con);
switch (PQresultStatus(res)) switch (PQresultStatus(res))
{ {
...@@ -3348,10 +3168,9 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3348,10 +3168,9 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* instead of CSTATE_START_TX. * instead of CSTATE_START_TX.
*/ */
case CSTATE_SLEEP: case CSTATE_SLEEP:
if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT_LAZY(now);
INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until) if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
return; /* Still sleeping, nothing to do here */ return; /* still sleeping, nothing to do here */
/* Else done sleeping. */ /* Else done sleeping. */
st->state = CSTATE_END_COMMAND; st->state = CSTATE_END_COMMAND;
break; break;
...@@ -3366,13 +3185,14 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3366,13 +3185,14 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* in thread-local data structure, if per-command latencies * in thread-local data structure, if per-command latencies
* are requested. * are requested.
*/ */
if (is_latencies) if (report_per_command)
{ {
if (INSTR_TIME_IS_ZERO(now)) Command *command;
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_SET_CURRENT_LAZY(now);
command = sql_script[st->use_file].commands[st->command];
/* XXX could use a mutex here, but we choose not to */ /* XXX could use a mutex here, but we choose not to */
command = sql_script[st->use_file].commands[st->command];
addToSimpleStats(&command->stats, addToSimpleStats(&command->stats,
INSTR_TIME_GET_DOUBLE(now) - INSTR_TIME_GET_DOUBLE(now) -
INSTR_TIME_GET_DOUBLE(st->stmt_begin)); INSTR_TIME_GET_DOUBLE(st->stmt_begin));
...@@ -3385,19 +3205,18 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3385,19 +3205,18 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break; break;
/* /*
* End of transaction. * End of transaction (end of script, really).
*/ */
case CSTATE_END_TX: case CSTATE_END_TX:
/* transaction finished: calculate latency and do log */ /* transaction finished: calculate latency and do log */
processXactStats(thread, st, &now, false, agg); processXactStats(thread, st, &now, false, agg);
/* conditional stack must be empty */ /*
if (!conditional_stack_empty(st->cstack)) * missing \endif... cannot happen if CheckConditional was
{ * okay
fprintf(stderr, "end of script reached within a conditional, missing \\endif\n"); */
exit(1); Assert(conditional_stack_empty(st->cstack));
}
if (is_connect) if (is_connect)
{ {
...@@ -3407,31 +3226,20 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3407,31 +3226,20 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
{ {
/* exit success */ /* script completed */
st->state = CSTATE_FINISHED; st->state = CSTATE_FINISHED;
break; break;
} }
/* /* next transaction (script) */
* No transaction is underway anymore.
*/
st->state = CSTATE_CHOOSE_SCRIPT; st->state = CSTATE_CHOOSE_SCRIPT;
/* /*
* If we paced through all commands in the script in this * Ensure that we always return on this point, so as to
* loop, without returning to the caller even once, do it now. * avoid an infinite loop if the script only contains meta
* This gives the thread a chance to process other * commands.
* connections, and to do progress reporting. This can
* currently only happen if the script consists entirely of
* meta-commands.
*/ */
if (end_tx_processed) return;
return;
else
{
end_tx_processed = true;
break;
}
/* /*
* Final states. Close the connection if it's still open. * Final states. Close the connection if it's still open.
...@@ -3444,6 +3252,182 @@ doCustom(TState *thread, CState *st, StatsData *agg) ...@@ -3444,6 +3252,182 @@ doCustom(TState *thread, CState *st, StatsData *agg)
} }
} }
/*
* Subroutine for advanceConnectionState -- execute or initiate the current
* command, and transition to next state appropriately.
*
* Returns an updated timestamp from 'now', used to update 'now' at callsite.
*/
static instr_time
doExecuteCommand(TState *thread, CState *st, instr_time now)
{
Command *command = sql_script[st->use_file].commands[st->command];
/* execute the command */
if (command->type == SQL_COMMAND)
{
if (!sendCommand(st, command))
{
commandFailed(st, "SQL", "SQL command send failed");
st->state = CSTATE_ABORTED;
}
else
st->state = CSTATE_WAIT_RESULT;
}
else if (command->type == META_COMMAND)
{
int argc = command->argc;
char **argv = command->argv;
if (debug)
{
fprintf(stderr, "client %d executing \\%s",
st->id, argv[0]);
for (int i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
}
if (command->meta == META_SLEEP)
{
int usec;
/*
* A \sleep doesn't execute anything, we just get the delay from
* the argument, and enter the CSTATE_SLEEP state. (The
* per-command latency will be recorded in CSTATE_SLEEP state, not
* here, after the delay has elapsed.)
*/
if (!evaluateSleep(st, argc, argv, &usec))
{
commandFailed(st, "sleep", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
INSTR_TIME_SET_CURRENT_LAZY(now);
st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->state = CSTATE_SLEEP;
return now;
}
else if (command->meta == META_SET)
{
PgBenchExpr *expr = command->expr;
PgBenchValue result;
if (!evaluateExpr(thread, st, expr, &result))
{
commandFailed(st, argv[0], "evaluation of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
if (!putVariableValue(st, argv[0], argv[1], &result))
{
commandFailed(st, "set", "assignment of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
}
else if (command->meta == META_IF)
{
/* backslash commands with an expression to evaluate */
PgBenchExpr *expr = command->expr;
PgBenchValue result;
bool cond;
if (!evaluateExpr(thread, st, expr, &result))
{
commandFailed(st, argv[0], "evaluation of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
cond = valueTruth(&result);
conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
else if (command->meta == META_ELIF)
{
/* backslash commands with an expression to evaluate */
PgBenchExpr *expr = command->expr;
PgBenchValue result;
bool cond;
if (conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
{
/*
* elif after executed block, skip eval and wait for endif.
*/
conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
st->state = CSTATE_END_COMMAND;
return now;
}
if (!evaluateExpr(thread, st, expr, &result))
{
commandFailed(st, argv[0], "evaluation of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
cond = valueTruth(&result);
Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
else if (command->meta == META_ELSE)
{
switch (conditional_stack_peek(st->cstack))
{
case IFSTATE_TRUE:
conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
break;
case IFSTATE_FALSE: /* inconsistent if active */
case IFSTATE_IGNORED: /* inconsistent if active */
case IFSTATE_NONE: /* else without if */
case IFSTATE_ELSE_TRUE: /* else after else */
case IFSTATE_ELSE_FALSE: /* else after else */
default:
/* dead code if conditional check is ok */
Assert(false);
}
}
else if (command->meta == META_ENDIF)
{
Assert(!conditional_stack_empty(st->cstack));
conditional_stack_pop(st->cstack);
}
else if (command->meta == META_SETSHELL)
{
if (!runShellCommand(st, argv[1], argv + 2, argc - 2))
{
commandFailed(st, "setshell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
}
else if (command->meta == META_SHELL)
{
if (!runShellCommand(st, NULL, argv + 1, argc - 1))
{
commandFailed(st, "shell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
return now;
}
}
/*
* executing the expression or shell command might have taken a
* non-negligible amount of time, so reset 'now'
*/
INSTR_TIME_SET_ZERO(now);
st->state = CSTATE_END_COMMAND;
}
return now;
}
/* /*
* Print log entry after completing one transaction. * Print log entry after completing one transaction.
* *
...@@ -3544,8 +3528,7 @@ processXactStats(TState *thread, CState *st, instr_time *now, ...@@ -3544,8 +3528,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
if (detailed && !skipped) if (detailed && !skipped)
{ {
if (INSTR_TIME_IS_ZERO(*now)) INSTR_TIME_SET_CURRENT_LAZY(*now);
INSTR_TIME_SET_CURRENT(*now);
/* compute latency & lag */ /* compute latency & lag */
latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled; latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
...@@ -4789,7 +4772,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time, ...@@ -4789,7 +4772,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
printf("tps = %f (excluding connections establishing)\n", tps_exclude); printf("tps = %f (excluding connections establishing)\n", tps_exclude);
/* Report per-script/command statistics */ /* Report per-script/command statistics */
if (per_script_stats || is_latencies) if (per_script_stats || report_per_command)
{ {
int i; int i;
...@@ -4818,7 +4801,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time, ...@@ -4818,7 +4801,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
} }
/* Report per-command latencies */ /* Report per-command latencies */
if (is_latencies) if (report_per_command)
{ {
Command **commands; Command **commands;
...@@ -5095,7 +5078,7 @@ main(int argc, char **argv) ...@@ -5095,7 +5078,7 @@ main(int argc, char **argv)
break; break;
case 'r': case 'r':
benchmarking_option_set = true; benchmarking_option_set = true;
is_latencies = true; report_per_command = true;
break; break;
case 's': case 's':
scale_given = true; scale_given = true;
...@@ -5809,7 +5792,7 @@ threadRun(void *arg) ...@@ -5809,7 +5792,7 @@ threadRun(void *arg)
if (!is_connect) if (!is_connect)
{ {
/* make connections to the database */ /* make connections to the database before starting */
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
if ((state[i].con = doConnect()) == NULL) if ((state[i].con = doConnect()) == NULL)
...@@ -5845,14 +5828,7 @@ threadRun(void *arg) ...@@ -5845,14 +5828,7 @@ threadRun(void *arg)
{ {
CState *st = &state[i]; CState *st = &state[i];
if (st->state == CSTATE_THROTTLE && timer_exceeded) if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
finishCon(st);
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{ {
/* a nap from the script, or under throttling */ /* a nap from the script, or under throttling */
int64 this_usec; int64 this_usec;
...@@ -5971,7 +5947,7 @@ threadRun(void *arg) ...@@ -5971,7 +5947,7 @@ threadRun(void *arg)
if (st->state == CSTATE_WAIT_RESULT) if (st->state == CSTATE_WAIT_RESULT)
{ {
/* don't call doCustom unless data is available */ /* don't call advanceConnectionState unless data is available */
int sock = PQsocket(st->con); int sock = PQsocket(st->con);
if (sock < 0) if (sock < 0)
...@@ -5991,9 +5967,12 @@ threadRun(void *arg) ...@@ -5991,9 +5967,12 @@ threadRun(void *arg)
continue; continue;
} }
doCustom(thread, st, &aggs); advanceConnectionState(thread, st, &aggs);
/* If doCustom changed client to finished state, reduce remains */ /*
* If advanceConnectionState changed client to finished state,
* that's one less client that remains.
*/
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
remains--; remains--;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment