Commit 12788ae4 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Refactor script execution state machine in pgbench.

The doCustom() function had grown into quite a mess. Rewrite it, in a more
explicit state machine style, for readability.

This also fixes one minor bug: if a script consisted entirely of meta
commands, doCustom() never returned to the caller, so progress reports
with the -P option were not printed. I don't want to backpatch this
refactoring, and the bug is quite insignificant, so only commit this to
master, and leave the bug unfixed in back-branches.

Review and original bug report by Fabien Coelho.

Discussion: <alpine.DEB.2.20.1607090850120.3412@sto>
parent da6c4f6c
...@@ -235,25 +235,95 @@ typedef struct StatsData ...@@ -235,25 +235,95 @@ typedef struct StatsData
} StatsData; } StatsData;
/* /*
* Connection state * Connection state machine states.
*/
typedef enum
{
/*
* The client must first choose a script to execute. Once chosen, it can
* either be throttled (state CSTATE_START_THROTTLE under --rate) or start
* right away (state CSTATE_START_TX).
*/
CSTATE_CHOOSE_SCRIPT,
/*
* In CSTATE_START_THROTTLE state, we calculate when to begin the next
* transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
* sleeps until that moment. (If throttling is not enabled, doCustom()
* falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
*/
CSTATE_START_THROTTLE,
CSTATE_THROTTLE,
/*
* CSTATE_START_TX performs start-of-transaction processing. Establishes
* a new connection for the transaction, in --connect mode, and records
* the transaction start time.
*/
CSTATE_START_TX,
/*
* We loop through these states, to process each command in the script:
*
* CSTATE_START_COMMAND starts the execution of a command. On a SQL
* command, the command is sent to the server, and we move to
* CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other
* meta-commands are executed immediately.
*
* CSTATE_WAIT_RESULT waits until we get a result set back from the server
* for the current command.
*
* CSTATE_SLEEP waits until the end of \sleep.
*
* CSTATE_END_COMMAND records the end-of-command timestamp, increments the
* command counter, and loops back to CSTATE_START_COMMAND state.
*/
CSTATE_START_COMMAND,
CSTATE_WAIT_RESULT,
CSTATE_SLEEP,
CSTATE_END_COMMAND,
/*
* CSTATE_END_TX performs end-of-transaction processing. Calculates
* latency, and logs the transaction. In --connect mode, closes the
* current connection. Chooses the next script to execute and starts over
* in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
* more work to do.
*/
CSTATE_END_TX,
/*
* Final states. CSTATE_ABORTED means that the script execution was
* aborted because a command failed, CSTATE_FINISHED means success.
*/
CSTATE_ABORTED,
CSTATE_FINISHED
} ConnectionStateEnum;
/*
* Connection state.
*/ */
typedef struct typedef struct
{ {
PGconn *con; /* connection handle to DB */ PGconn *con; /* connection handle to DB */
int id; /* client No. */ int id; /* client No. */
int state; /* state No. */ ConnectionStateEnum state; /* state machine's current state. */
bool listen; /* whether an async query has been sent */
bool sleeping; /* whether the client is napping */ int use_file; /* index in sql_script for this client */
bool throttling; /* whether nap is for throttling */ int command; /* command number in script */
bool is_throttled; /* whether transaction throttling is done */
/* client variables */
Variable *variables; /* array of variable definitions */ Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */ int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */ bool vars_sorted; /* are variables sorted by name? */
/* various times about current transaction */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */ int64 txn_scheduled; /* scheduled start time of transaction (usec) */
int64 sleep_until; /* scheduled start time of next cmd (usec) */ int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */ instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */ instr_time stmt_begin; /* used for measuring statement latencies */
int use_file; /* index in sql_scripts for this client */
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */ /* per client collected stats */
...@@ -1382,7 +1452,7 @@ evalFunc(TState *thread, CState *st, ...@@ -1382,7 +1452,7 @@ evalFunc(TState *thread, CState *st,
Assert(nargs == 1); Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ", fprintf(stderr, "debug(script=%d,command=%d): ",
st->use_file, st->state + 1); st->use_file, st->command + 1);
if (varg->type == PGBT_INT) if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival); fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
...@@ -1733,15 +1803,12 @@ preparedStatementName(char *buffer, int file, int state) ...@@ -1733,15 +1803,12 @@ preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state); sprintf(buffer, "P%d_%d", file, state);
} }
static bool static void
clientDone(CState *st) commandFailed(CState *st, char *message)
{ {
if (st->con != NULL) fprintf(stderr,
{ "client %d aborted in command %d of script %d; %s\n",
PQfinish(st->con); st->id, st->command, st->use_file, message);
st->con = NULL;
}
return false; /* always false */
} }
/* return a script number with a weighted choice. */ /* return a script number with a weighted choice. */
...@@ -1763,56 +1830,209 @@ chooseScript(TState *thread) ...@@ -1763,56 +1830,209 @@ chooseScript(TState *thread)
return i - 1; return i - 1;
} }
/* return false iff client should be disconnected */ /* Send a SQL command, using the chosen querymode */
static bool static bool
sendCommand(CState *st, Command *command)
{
int r;
if (querymode == QUERY_SIMPLE)
{
char *sql;
sql = pg_strdup(command->argv[0]);
sql = assignVariables(st, sql);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQuery(st->con, sql);
free(sql);
}
else if (querymode == QUERY_EXTENDED)
{
const char *sql = command->argv[0];
const char *params[MAX_ARGS];
getQueryParams(st, command, params);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQueryParams(st->con, sql, command->argc - 1,
NULL, params, NULL, NULL, 0);
}
else if (querymode == QUERY_PREPARED)
{
char name[MAX_PREPARE_NAME];
const char *params[MAX_ARGS];
if (!st->prepared[st->use_file])
{
int j;
Command **commands = sql_script[st->use_file].commands;
for (j = 0; commands[j] != NULL; j++)
{
PGresult *res;
char name[MAX_PREPARE_NAME];
if (commands[j]->type != SQL_COMMAND)
continue;
preparedStatementName(name, st->use_file, j);
res = PQprepare(st->con, name,
commands[j]->argv[0], commands[j]->argc - 1, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
fprintf(stderr, "%s", PQerrorMessage(st->con));
PQclear(res);
}
st->prepared[st->use_file] = true;
}
getQueryParams(st, command, params);
preparedStatementName(name, st->use_file, st->command);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, name);
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
params, NULL, NULL, 0);
}
else /* unknown sql mode */
r = 0;
if (r == 0)
{
if (debug)
fprintf(stderr, "client %d could not send %s\n",
st->id, command->argv[0]);
st->ecnt++;
return false;
}
else
return true;
}
/*
* Parse the argument to a \sleep command, and return the requested amount
* of delay, in microseconds. Returns true on success, false on error.
*/
static bool
evaluateSleep(CState *st, int argc, char **argv, int *usecs)
{
char *var;
int usec;
if (*argv[1] == ':')
{
if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable \"%s\"\n",
argv[0], argv[1]);
return false;
}
usec = atoi(var);
}
else
usec = atoi(argv[1]);
if (argc > 2)
{
if (pg_strcasecmp(argv[2], "ms") == 0)
usec *= 1000;
else if (pg_strcasecmp(argv[2], "s") == 0)
usec *= 1000000;
}
else
usec *= 1000000;
*usecs = usec;
return true;
}
/*
* Advance the state machine of a connection, if possible.
*/
static void
doCustom(TState *thread, CState *st, StatsData *agg) doCustom(TState *thread, CState *st, StatsData *agg)
{ {
PGresult *res; PGresult *res;
Command **commands; Command *command;
bool trans_needs_throttle = false;
instr_time now; instr_time now;
bool end_tx_processed = false;
int64 wait;
/* /*
* gettimeofday() isn't free, so we get the current timestamp lazily the * gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this * first time it's needed, and reuse the same value throughout this
* function after that. This also ensures that e.g. the calculated latency * function after that. This also ensures that e.g. the calculated
* reported in the log file and in the totals are the same. Zero means * latency reported in the log file and in the totals are the same. Zero
* "not set yet". Reset "now" when we step to the next command with "goto * means "not set yet". Reset "now" when we execute shell commands or
* top", though. * expressions, which might take a non-negligible amount of time, though.
*/ */
top:
INSTR_TIME_SET_ZERO(now); INSTR_TIME_SET_ZERO(now);
commands = sql_script[st->use_file].commands;
/* /*
* Handle throttling once per transaction by sleeping. It is simpler to * Loop in the state machine, until we have to wait for a result from the
* do this here rather than at the end, because so much complicated logic * server (or have to sleep, for throttling or for \sleep).
* happens below when statements finish. *
* Note: In the switch-statement below, 'break' will loop back here,
* meaning "continue in the state machine". Return is used to return to
* the caller.
*/ */
if (throttle_delay && !st->is_throttled) for (;;)
{
switch (st->state)
{ {
/* /*
* Generate a delay such that the series of delays will approximate a * Select transaction to run.
* Poisson distribution centered on the throttle_delay time. */
case CSTATE_CHOOSE_SCRIPT:
st->use_file = chooseScript(thread);
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
if (throttle_delay > 0)
st->state = CSTATE_START_THROTTLE;
else
st->state = CSTATE_START_TX;
break;
/*
* Handle throttling once per transaction by sleeping.
*/
case CSTATE_START_THROTTLE:
/*
* Generate a delay such that the series of delays will
* approximate a Poisson distribution centered on the
* throttle_delay time.
* *
* If transactions are too slow or a given wait is shorter than a * If transactions are too slow or a given wait is shorter
* transaction, the next transaction will start right away. * than a transaction, the next transaction will start right
* away.
*/ */
int64 wait = getPoissonRand(thread, throttle_delay); Assert(throttle_delay > 0);
wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait; thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger; st->txn_scheduled = thread->throttle_trigger;
/* stop client if next transaction is beyond pgbench end of execution */ /*
* stop client if next transaction is beyond pgbench end of
* execution
*/
if (duration > 0 && st->txn_scheduled > end_time) if (duration > 0 && st->txn_scheduled > end_time)
return clientDone(st); {
st->state = CSTATE_FINISHED;
break;
}
/* /*
* If this --latency-limit is used, and this slot is already late so * If this --latency-limit is used, and this slot is already
* that the transaction will miss the latency limit even if it * late so that the transaction will miss the latency limit
* completed immediately, we skip this time slot and iterate till the * even if it completed immediately, we skip this time slot
* next slot that isn't late yet. * and iterate till the next slot that isn't late yet.
*/ */
if (latency_limit) if (latency_limit)
{ {
...@@ -1831,259 +2051,123 @@ top: ...@@ -1831,259 +2051,123 @@ top:
} }
} }
st->sleep_until = st->txn_scheduled; st->state = CSTATE_THROTTLE;
st->sleeping = true;
st->throttling = true;
st->is_throttled = true;
if (debug) if (debug)
fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n", fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
st->id, wait); st->id, wait);
} break;
if (st->sleeping)
{ /* are we sleeping? */
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
return true; /* Still sleeping, nothing to do here */
/* Else done sleeping, go ahead with next command */
st->sleeping = false;
st->throttling = false;
}
if (st->listen)
{ /* are we receiver? */
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
return clientDone(st);
}
if (PQisBusy(st->con))
return true; /* don't have the whole result yet */
}
/* /*
* command finished: accumulate per-command execution times in * Wait until it's time to start next transaction.
* thread-local data structure, if per-command latencies are requested
*/ */
if (is_latencies) case CSTATE_THROTTLE:
{
if (INSTR_TIME_IS_ZERO(now)) if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now); INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
return; /* Still sleeping, nothing to do here */
/* XXX could use a mutex here, but we choose not to */ /* Else done sleeping, start the transaction */
addToSimpleStats(&commands[st->state]->stats, st->state = CSTATE_START_TX;
INSTR_TIME_GET_DOUBLE(now) - break;
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
/* transaction finished: calculate latency and log the transaction */
if (commands[st->state + 1] == NULL)
{
if (progress || throttle_delay || latency_limit ||
per_script_stats || use_log)
processXactStats(thread, st, &now, false, agg);
else
thread->stats.cnt++;
}
if (commands[st->state]->type == SQL_COMMAND)
{
/*
* Read and discard the query result; note this is not included in
* the statement latency numbers.
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_EMPTY_QUERY:
break; /* OK */
default:
fprintf(stderr, "client %d aborted in state %d: %s",
st->id, st->state, PQerrorMessage(st->con));
PQclear(res);
return clientDone(st);
}
PQclear(res);
discard_response(st);
}
if (commands[st->state + 1] == NULL)
{
if (is_connect)
{
PQfinish(st->con);
st->con = NULL;
}
++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
return clientDone(st); /* exit success */
}
/* increment state counter */ /* Start new transaction */
st->state++; case CSTATE_START_TX:
if (commands[st->state] == NULL)
{
st->state = 0;
st->use_file = chooseScript(thread);
commands = sql_script[st->use_file].commands;
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
st->is_throttled = false;
/* /*
* No transaction is underway anymore, which means there is * Establish connection on first call, or if is_connect is
* nothing to listen to right now. When throttling rate limits * true.
* are active, a sleep will happen next, as the next transaction
* starts. And then in any case the next SQL command will set
* listen back to true.
*/ */
st->listen = false;
trans_needs_throttle = (throttle_delay > 0);
}
}
if (st->con == NULL) if (st->con == NULL)
{ {
instr_time start, instr_time start;
end;
INSTR_TIME_SET_CURRENT(start); if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
start = now;
if ((st->con = doConnect()) == NULL) if ((st->con = doConnect()) == NULL)
{ {
fprintf(stderr, "client %d aborted while establishing connection\n", fprintf(stderr, "client %d aborted while establishing connection\n",
st->id); st->id);
return clientDone(st); st->state = CSTATE_ABORTED;
break;
} }
INSTR_TIME_SET_CURRENT(end); INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
/* Reset session-local state */ /* Reset session-local state */
st->listen = false;
st->sleeping = false;
st->throttling = false;
st->is_throttled = false;
memset(st->prepared, 0, sizeof(st->prepared)); memset(st->prepared, 0, sizeof(st->prepared));
} }
/* /*
* This ensures that a throttling delay is inserted before proceeding with * Record transaction start time under logging, progress or
* sql commands, after the first transaction. The first transaction * throttling.
* throttling is performed when first entering doCustom.
*/ */
if (trans_needs_throttle) if (use_log || progress || throttle_delay || latency_limit ||
{ per_script_stats)
trans_needs_throttle = false;
goto top;
}
/* Record transaction start time under logging, progress or throttling */
if ((use_log || progress || throttle_delay || latency_limit ||
per_script_stats) && st->state == 0)
{ {
INSTR_TIME_SET_CURRENT(st->txn_begin); if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
st->txn_begin = now;
/* /*
* When not throttling, this is also the transaction's scheduled start * When not throttling, this is also the transaction's
* time. * scheduled start time.
*/ */
if (!throttle_delay) if (!throttle_delay)
st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin); st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
}
/* Record statement start time if per-command latencies are requested */
if (is_latencies)
INSTR_TIME_SET_CURRENT(st->stmt_begin);
if (commands[st->state]->type == SQL_COMMAND)
{
const Command *command = commands[st->state];
int r;
if (querymode == QUERY_SIMPLE)
{
char *sql;
sql = pg_strdup(command->argv[0]);
sql = assignVariables(st, sql);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQuery(st->con, sql);
free(sql);
}
else if (querymode == QUERY_EXTENDED)
{
const char *sql = command->argv[0];
const char *params[MAX_ARGS];
getQueryParams(st, command, params);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQueryParams(st->con, sql, command->argc - 1,
NULL, params, NULL, NULL, 0);
} }
else if (querymode == QUERY_PREPARED)
{
char name[MAX_PREPARE_NAME];
const char *params[MAX_ARGS];
if (!st->prepared[st->use_file]) /* Begin with the first command */
{ st->command = 0;
int j; st->state = CSTATE_START_COMMAND;
break;
for (j = 0; commands[j] != NULL; j++) /*
{ * Send a command to server (or execute a meta-command)
PGresult *res; */
char name[MAX_PREPARE_NAME]; case CSTATE_START_COMMAND:
command = sql_script[st->use_file].commands[st->command];
if (commands[j]->type != SQL_COMMAND) /*
continue; * If we reached the end of the script, move to end-of-xact
preparedStatementName(name, st->use_file, j); * processing.
res = PQprepare(st->con, name, */
commands[j]->argv[0], commands[j]->argc - 1, NULL); if (command == NULL)
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "%s", PQerrorMessage(st->con)); st->state = CSTATE_END_TX;
PQclear(res); break;
}
st->prepared[st->use_file] = true;
} }
getQueryParams(st, command, params); /*
preparedStatementName(name, st->use_file, st->state); * Record statement start time if per-command latencies are
* requested
if (debug) */
fprintf(stderr, "client %d sending %s\n", st->id, name); if (is_latencies)
r = PQsendQueryPrepared(st->con, name, command->argc - 1, {
params, NULL, NULL, 0); if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
st->stmt_begin = now;
} }
else /* unknown sql mode */
r = 0;
if (r == 0) if (command->type == SQL_COMMAND)
{ {
if (debug) if (!sendCommand(st, command))
fprintf(stderr, "client %d could not send %s\n", {
st->id, command->argv[0]); /*
st->ecnt++; * Failed. Stay in CSTATE_START_COMMAND state, to
* retry. ??? What the point or retrying? Should
* rather abort?
*/
return;
} }
else else
st->listen = true; /* flags that should be listened */ st->state = CSTATE_WAIT_RESULT;
} }
else if (commands[st->state]->type == META_COMMAND) else if (command->type == META_COMMAND)
{ {
int argc = commands[st->state]->argc, int argc = command->argc,
i; i;
char **argv = commands[st->state]->argv; char **argv = command->argv;
if (debug) if (debug)
{ {
...@@ -2093,95 +2177,248 @@ top: ...@@ -2093,95 +2177,248 @@ top:
fprintf(stderr, "\n"); fprintf(stderr, "\n");
} }
if (pg_strcasecmp(argv[0], "sleep") == 0)
{
/*
* A \sleep doesn't execute anything, we just get the
* delay from the argument, and enter the CSTATE_SLEEP
* state. (The per-command latency will be recorded
* in CSTATE_SLEEP state, not here, after the delay
* has elapsed.)
*/
int usec;
if (!evaluateSleep(st, argc, argv, &usec))
{
commandFailed(st, "execution of meta-command 'sleep' failed");
st->state = CSTATE_ABORTED;
break;
}
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->state = CSTATE_SLEEP;
break;
}
else
{
if (pg_strcasecmp(argv[0], "set") == 0) if (pg_strcasecmp(argv[0], "set") == 0)
{ {
PgBenchExpr *expr = commands[st->state]->expr; PgBenchExpr *expr = command->expr;
PgBenchValue result; PgBenchValue result;
if (!evaluateExpr(thread, st, expr, &result)) if (!evaluateExpr(thread, st, expr, &result))
{ {
st->ecnt++; commandFailed(st, "evaluation of meta-command 'set' failed");
return true; st->state = CSTATE_ABORTED;
break;
} }
if (!putVariableNumber(st, argv[0], argv[1], &result)) if (!putVariableNumber(st, argv[0], argv[1], &result))
{ {
st->ecnt++; commandFailed(st, "assignment of meta-command 'set' failed");
return true; st->state = CSTATE_ABORTED;
break;
} }
st->listen = true;
} }
else if (pg_strcasecmp(argv[0], "sleep") == 0) else if (pg_strcasecmp(argv[0], "setshell") == 0)
{ {
char *var; bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
int usec;
instr_time now;
if (*argv[1] == ':') if (timer_exceeded) /* timeout */
{
if ((var = getVariable(st, argv[1] + 1)) == NULL)
{ {
fprintf(stderr, "%s: undefined variable \"%s\"\n", st->state = CSTATE_FINISHED;
argv[0], argv[1]); break;
st->ecnt++;
return true;
} }
usec = atoi(var); else if (!ret) /* on error */
{
commandFailed(st, "execution of meta-command 'setshell' failed");
st->state = CSTATE_ABORTED;
break;
} }
else else
usec = atoi(argv[1]);
if (argc > 2)
{ {
if (pg_strcasecmp(argv[2], "ms") == 0) /* succeeded */
usec *= 1000;
else if (pg_strcasecmp(argv[2], "s") == 0)
usec *= 1000000;
} }
else
usec *= 1000000;
INSTR_TIME_SET_CURRENT(now);
st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->sleeping = true;
st->listen = true;
} }
else if (pg_strcasecmp(argv[0], "setshell") == 0) else if (pg_strcasecmp(argv[0], "shell") == 0)
{ {
bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
if (timer_exceeded) /* timeout */ if (timer_exceeded) /* timeout */
return clientDone(st); {
st->state = CSTATE_FINISHED;
break;
}
else if (!ret) /* on error */ else if (!ret) /* on error */
{ {
st->ecnt++; commandFailed(st, "execution of meta-command 'shell' failed");
return true; st->state = CSTATE_ABORTED;
break;
}
else
{
/* succeeded */
} }
else /* succeeded */
st->listen = true;
} }
else if (pg_strcasecmp(argv[0], "shell") == 0)
/*
* executing the expression or shell command might
* take a non-negligible amount of time, so reset
* 'now'
*/
INSTR_TIME_SET_ZERO(now);
st->state = CSTATE_END_COMMAND;
}
}
break;
/*
* Wait for the current SQL command to complete
*/
case CSTATE_WAIT_RESULT:
command = sql_script[st->use_file].commands[st->command];
if (debug)
fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
commandFailed(st, "perhaps the backend died while processing");
st->state = CSTATE_ABORTED;
break;
}
if (PQisBusy(st->con))
return; /* don't have the whole result yet */
/*
* Read and discard the query result;
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
{ {
bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_EMPTY_QUERY:
/* OK */
PQclear(res);
discard_response(st);
st->state = CSTATE_END_COMMAND;
break;
default:
commandFailed(st, PQerrorMessage(st->con));
PQclear(res);
st->state = CSTATE_ABORTED;
break;
}
break;
if (timer_exceeded) /* timeout */ /*
return clientDone(st); * Wait until sleep is done. This state is entered after a
else if (!ret) /* on error */ * \sleep metacommand. The behavior is similar to
* CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
* instead of CSTATE_START_TX.
*/
case CSTATE_SLEEP:
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
return; /* Still sleeping, nothing to do here */
/* Else done sleeping. */
st->state = CSTATE_END_COMMAND;
break;
/*
* End of command: record stats and proceed to next command.
*/
case CSTATE_END_COMMAND:
/*
* command completed: accumulate per-command execution times
* in thread-local data structure, if per-command latencies
* are requested.
*/
if (is_latencies)
{ {
st->ecnt++; if (INSTR_TIME_IS_ZERO(now))
return true; INSTR_TIME_SET_CURRENT(now);
/* XXX could use a mutex here, but we choose not to */
command = sql_script[st->use_file].commands[st->command];
addToSimpleStats(&command->stats,
INSTR_TIME_GET_DOUBLE(now) -
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
/* Go ahead with next command */
st->command++;
st->state = CSTATE_START_COMMAND;
break;
/*
* End of transaction.
*/
case CSTATE_END_TX:
/*
* transaction finished: calculate latency and log the
* transaction
*/
if (progress || throttle_delay || latency_limit ||
per_script_stats || use_log)
processXactStats(thread, st, &now, false, agg);
else
thread->stats.cnt++;
if (is_connect)
{
PQfinish(st->con);
st->con = NULL;
INSTR_TIME_SET_ZERO(now);
} }
else /* succeeded */
st->listen = true; ++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
{
/* exit success */
st->state = CSTATE_FINISHED;
break;
} }
/* after a meta command, immediately proceed with next command */ /*
goto top; * No transaction is underway anymore.
*/
st->state = CSTATE_CHOOSE_SCRIPT;
/*
* If we paced through all commands in the script in this
* loop, without returning to the caller even once, do it now.
* This gives the thread a chance to process other
* connections, and to do progress reporting. This can
* currently only happen if the script consists entirely of
* meta-commands.
*/
if (end_tx_processed)
return;
else
{
end_tx_processed = true;
break;
} }
return true; /*
* Final states. Close the connection if it's still open.
*/
case CSTATE_ABORTED:
case CSTATE_FINISHED:
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return;
}
}
} }
/* /*
...@@ -4183,29 +4420,10 @@ threadRun(void *arg) ...@@ -4183,29 +4420,10 @@ threadRun(void *arg)
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs; last = aggs;
/* send start up queries in async manner */ /* initialize explicitely the state machines */
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
CState *st = &state[i]; state[i].state = CSTATE_CHOOSE_SCRIPT;
int prev_ecnt = st->ecnt;
Command **commands;
st->use_file = chooseScript(thread);
commands = sql_script[st->use_file].commands;
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
if (!doCustom(thread, st, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
i, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
}
} }
while (remains > 0) while (remains > 0)
...@@ -4222,27 +4440,20 @@ threadRun(void *arg) ...@@ -4222,27 +4440,20 @@ threadRun(void *arg)
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
CState *st = &state[i]; CState *st = &state[i];
Command **commands = sql_script[st->use_file].commands;
int sock; int sock;
if (st->con == NULL) if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
continue;
}
else if (st->sleeping)
{
if (st->throttling && timer_exceeded)
{ {
/* interrupt client which has not started a transaction */ /* interrupt client which has not started a transaction */
st->state = CSTATE_FINISHED;
remains--; remains--;
st->sleeping = false;
st->throttling = false;
PQfinish(st->con); PQfinish(st->con);
st->con = NULL; st->con = NULL;
continue; continue;
} }
else /* just a nap from the script */ else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{ {
/* a nap from the script, or under throttling */
int this_usec; int this_usec;
if (min_usec == PG_INT64_MAX) if (min_usec == PG_INT64_MAX)
...@@ -4253,17 +4464,17 @@ threadRun(void *arg) ...@@ -4253,17 +4464,17 @@ threadRun(void *arg)
now_usec = INSTR_TIME_GET_MICROSEC(now); now_usec = INSTR_TIME_GET_MICROSEC(now);
} }
this_usec = st->txn_scheduled - now_usec; this_usec = (st->state == CSTATE_SLEEP ?
st->sleep_until : st->txn_scheduled) - now_usec;
if (min_usec > this_usec) if (min_usec > this_usec)
min_usec = this_usec; min_usec = this_usec;
} }
} else if (st->state == CSTATE_WAIT_RESULT)
else if (commands[st->state]->type == META_COMMAND)
{ {
min_usec = 0; /* the connection is ready to run */ /*
break; * waiting for result from server - nothing to do unless the
} * socket is readable
*/
sock = PQsocket(st->con); sock = PQsocket(st->con);
if (sock < 0) if (sock < 0)
{ {
...@@ -4275,6 +4486,14 @@ threadRun(void *arg) ...@@ -4275,6 +4486,14 @@ threadRun(void *arg)
if (maxsock < sock) if (maxsock < sock)
maxsock = sock; maxsock = sock;
break;
}
else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
{
/* the connection is ready to run */
min_usec = 0;
break;
}
} }
/* also wake up to print the next progress report on time */ /* also wake up to print the next progress report on time */
...@@ -4324,14 +4543,13 @@ threadRun(void *arg) ...@@ -4324,14 +4543,13 @@ threadRun(void *arg)
} }
} }
/* ok, backend returns reply */ /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
CState *st = &state[i]; CState *st = &state[i];
Command **commands = sql_script[st->use_file].commands; bool ready;
int prev_ecnt = st->ecnt;
if (st->con) if (st->state == CSTATE_WAIT_RESULT && st->con)
{ {
int sock = PQsocket(st->con); int sock = PQsocket(st->con);
...@@ -4341,21 +4559,19 @@ threadRun(void *arg) ...@@ -4341,21 +4559,19 @@ threadRun(void *arg)
PQerrorMessage(st->con)); PQerrorMessage(st->con));
goto done; goto done;
} }
if (FD_ISSET(sock, &input_mask) ||
commands[st->state]->type == META_COMMAND) ready = FD_ISSET(sock, &input_mask);
{
if (!doCustom(thread, st, &aggs))
remains--; /* I've aborted */
}
} }
else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
ready = false;
else
ready = true;
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) if (ready)
{ {
fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n", doCustom(thread, st, &aggs);
i, st->state); if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
remains--; /* I've aborted */ remains--;
PQfinish(st->con);
st->con = NULL;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment