Commit 84f0ea3f authored by Heikki Linnakangas's avatar Heikki Linnakangas

Refactor pgbench log-writing code to a separate function.

The doCustom function was incredibly long, this makes it a little bit more
readable.
parent 5a6c168c
...@@ -347,6 +347,9 @@ static char *select_only = { ...@@ -347,6 +347,9 @@ static char *select_only = {
static void setalarm(int seconds); static void setalarm(int seconds);
static void *threadRun(void *arg); static void *threadRun(void *arg);
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
AggVals *agg);
static void static void
usage(void) usage(void)
{ {
...@@ -1016,6 +1019,16 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa ...@@ -1016,6 +1019,16 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
PGresult *res; PGresult *res;
Command **commands; Command **commands;
bool trans_needs_throttle = false; bool trans_needs_throttle = false;
instr_time now;
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this
* function after that. This also ensures that e.g. the calculated latency
* reported in the log file and in the totals are the same. Zero means
* "not set yet".
*/
INSTR_TIME_SET_ZERO(now);
top: top:
commands = sql_files[st->use_file]; commands = sql_files[st->use_file];
...@@ -1049,9 +1062,9 @@ top: ...@@ -1049,9 +1062,9 @@ top:
if (st->sleeping) if (st->sleeping)
{ /* are we sleeping? */ { /* are we sleeping? */
instr_time now;
int64 now_us; int64 now_us;
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now); INSTR_TIME_SET_CURRENT(now);
now_us = INSTR_TIME_GET_MICROSEC(now); now_us = INSTR_TIME_GET_MICROSEC(now);
if (st->txn_scheduled <= now_us) if (st->txn_scheduled <= now_us)
...@@ -1074,11 +1087,6 @@ top: ...@@ -1074,11 +1087,6 @@ top:
if (st->listen) if (st->listen)
{ /* are we receiver? */ { /* are we receiver? */
instr_time now;
bool now_valid = false;
INSTR_TIME_SET_ZERO(now); /* initialize to keep compiler quiet */
if (commands[st->state]->type == SQL_COMMAND) if (commands[st->state]->type == SQL_COMMAND)
{ {
if (debug) if (debug)
...@@ -1100,181 +1108,40 @@ top: ...@@ -1100,181 +1108,40 @@ top:
{ {
int cnum = commands[st->state]->command_num; int cnum = commands[st->state]->command_num;
if (!now_valid) if (INSTR_TIME_IS_ZERO(now))
{
INSTR_TIME_SET_CURRENT(now); INSTR_TIME_SET_CURRENT(now);
now_valid = true;
}
INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum], INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
now, st->stmt_begin); now, st->stmt_begin);
thread->exec_count[cnum]++; thread->exec_count[cnum]++;
} }
/* transaction finished: record latency under progress or throttling */ /* transaction finished: calculate latency and log the transaction */
if ((progress || throttle_delay) && commands[st->state + 1] == NULL) if (commands[st->state + 1] == NULL)
{
/* only calculate latency if an option is used that needs it */
if (progress || throttle_delay)
{ {
int64 latency; int64 latency;
if (!now_valid) if (INSTR_TIME_IS_ZERO(now))
{
INSTR_TIME_SET_CURRENT(now); INSTR_TIME_SET_CURRENT(now);
now_valid = true;
}
latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled; latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
st->txn_latencies += latency; st->txn_latencies += latency;
/* /*
* XXX In a long benchmark run of high-latency transactions, this * XXX In a long benchmark run of high-latency transactions,
* int64 addition eventually overflows. For example, 100 threads * this int64 addition eventually overflows. For example, 100
* running 10s transactions will overflow it in 2.56 hours. With * threads running 10s transactions will overflow it in 2.56
* a more-typical OLTP workload of .1s transactions, overflow * hours. With a more-typical OLTP workload of .1s
* would take 256 hours. * transactions, overflow would take 256 hours.
*/ */
st->txn_sqlats += latency * latency; st->txn_sqlats += latency * latency;
} }
/* /* record the time it took in the log */
* if transaction finished, record the time it took in the log if (logfile)
*/ doLog(thread, st, logfile, &now, agg);
if (logfile && commands[st->state + 1] == NULL)
{
double lag;
double latency;
/*
* write the log entry if this row belongs to the random sample,
* or no sampling rate was given which means log everything.
*/
if (sample_rate == 0.0 ||
pg_erand48(thread->random_state) <= sample_rate)
{
if (!now_valid)
{
INSTR_TIME_SET_CURRENT(now);
now_valid = true;
}
latency = (double) (INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled);
lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
/* should we aggregate the results or not? */
if (agg_interval > 0)
{
/*
* are we still in the same interval? if yes, accumulate
* the values (print them otherwise)
*/
if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
{
agg->cnt += 1;
agg->sum_latency += latency;
agg->sum2_latency += latency * latency;
/* first in this aggregation interval */
if ((agg->cnt == 1) || (latency < agg->min_latency))
agg->min_latency = latency;
if ((agg->cnt == 1) || (latency > agg->max_latency))
agg->max_latency = latency;
/* and the same for schedule lag */
if (throttle_delay)
{
agg->sum_lag += lag;
agg->sum2_lag += lag * lag;
if ((agg->cnt == 1) || (lag < agg->min_lag))
agg->min_lag = lag;
if ((agg->cnt == 1) || (lag > agg->max_lag))
agg->max_lag = lag;
}
}
else
{
/*
* Loop until we reach the interval of the current
* transaction (and print all the empty intervals in
* between).
*/
while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
{
/*
* This is a non-Windows branch (thanks to the
* ifdef in usage), so we don't need to handle
* this in a special way (see below).
*/
fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
agg->start_time,
agg->cnt,
agg->sum_latency,
agg->sum2_latency,
agg->min_latency,
agg->max_latency);
if (throttle_delay)
fprintf(logfile, " %.0f %.0f %.0f %.0f",
agg->sum_lag,
agg->sum2_lag,
agg->min_lag,
agg->max_lag);
fputc('\n', logfile);
/* move to the next inteval */
agg->start_time = agg->start_time + agg_interval;
/* reset for "no transaction" intervals */
agg->cnt = 0;
agg->min_latency = 0;
agg->max_latency = 0;
agg->sum_latency = 0;
agg->sum2_latency = 0;
agg->min_lag = 0;
agg->max_lag = 0;
agg->sum_lag = 0;
agg->sum2_lag = 0;
}
/*
* and now update the reset values (include the
* current)
*/
agg->cnt = 1;
agg->min_latency = latency;
agg->max_latency = latency;
agg->sum_latency = latency;
agg->sum2_latency = latency * latency;
agg->min_lag = lag;
agg->max_lag = lag;
agg->sum_lag = lag;
agg->sum2_lag = lag * lag;
}
}
else
{
/* no, print raw transactions */
#ifndef WIN32
/*
* This is more than we really ought to know about
* instr_time
*/
fprintf(logfile, "%d %d %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now.tv_sec, (long) now.tv_usec);
#else
/*
* On Windows, instr_time doesn't provide a timestamp
* anyway
*/
fprintf(logfile, "%d %d %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
fputc('\n', logfile);
}
}
} }
if (commands[st->state]->type == SQL_COMMAND) if (commands[st->state]->type == SQL_COMMAND)
...@@ -1734,6 +1601,137 @@ top: ...@@ -1734,6 +1601,137 @@ top:
return true; return true;
} }
/*
* print log entry after completing one transaction.
*/
static void
doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
{
double lag;
double latency;
/*
* Skip the log entry if sampling is enabled and this row doesn't belong
* to the random sample.
*/
if (sample_rate != 0.0 &&
pg_erand48(thread->random_state) > sample_rate)
return;
if (INSTR_TIME_IS_ZERO(*now))
INSTR_TIME_SET_CURRENT(*now);
latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
/* should we aggregate the results or not? */
if (agg_interval > 0)
{
/*
* Are we still in the same interval? If yes, accumulate the values
* (print them otherwise)
*/
if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
{
agg->cnt += 1;
agg->sum_latency += latency;
agg->sum2_latency += latency * latency;
/* first in this aggregation interval */
if ((agg->cnt == 1) || (latency < agg->min_latency))
agg->min_latency = latency;
if ((agg->cnt == 1) || (latency > agg->max_latency))
agg->max_latency = latency;
/* and the same for schedule lag */
if (throttle_delay)
{
agg->sum_lag += lag;
agg->sum2_lag += lag * lag;
if ((agg->cnt == 1) || (lag < agg->min_lag))
agg->min_lag = lag;
if ((agg->cnt == 1) || (lag > agg->max_lag))
agg->max_lag = lag;
}
}
else
{
/*
* Loop until we reach the interval of the current transaction
* (and print all the empty intervals in between).
*/
while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
{
/*
* This is a non-Windows branch (thanks to the
* ifdef in usage), so we don't need to handle
* this in a special way (see below).
*/
fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
agg->start_time,
agg->cnt,
agg->sum_latency,
agg->sum2_latency,
agg->min_latency,
agg->max_latency);
if (throttle_delay)
fprintf(logfile, " %.0f %.0f %.0f %.0f",
agg->sum_lag,
agg->sum2_lag,
agg->min_lag,
agg->max_lag);
fputc('\n', logfile);
/* move to the next inteval */
agg->start_time = agg->start_time + agg_interval;
/* reset for "no transaction" intervals */
agg->cnt = 0;
agg->min_latency = 0;
agg->max_latency = 0;
agg->sum_latency = 0;
agg->sum2_latency = 0;
agg->min_lag = 0;
agg->max_lag = 0;
agg->sum_lag = 0;
agg->sum2_lag = 0;
}
/* reset the values to include only the current transaction. */
agg->cnt = 1;
agg->min_latency = latency;
agg->max_latency = latency;
agg->sum_latency = latency;
agg->sum2_latency = latency * latency;
agg->min_lag = lag;
agg->max_lag = lag;
agg->sum_lag = lag;
agg->sum2_lag = lag * lag;
}
}
else
{
/* no, print raw transactions */
#ifndef WIN32
/* This is more than we really ought to know about instr_time */
fprintf(logfile, "%d %d %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
fprintf(logfile, "%d %d %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
fputc('\n', logfile);
}
}
/* discard connections */ /* discard connections */
static void static void
disconnect_all(CState *state, int length) disconnect_all(CState *state, int length)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment