Commit 3da0dfb4 authored by Tatsuo Ishii's avatar Tatsuo Ishii

Multi-threaded version of pgbench contributed by ITAGAKI Takahiro,

reviewed by Greg Smith and Josh Williams.

Following is the proposal from ITAGAKI Takahiro:

Pgbench is a famous tool to measure postgres performance, but nowadays
it does not work well because it cannot use multiple CPUs. On the other
hand, postgres server can use CPUs very well, so the bottle-neck of
workload is *in pgbench*.

Multi-threading would be a solution. The attached patch adds -j
(number of jobs) option to pgbench. If the value N is greater than 1,
pgbench runs with N threads. Connections are equally-divided into
them (ex. -c64 -j4 => 4 threads with 16 connections each). It can
run on POSIX platforms with pthread and on Windows with win32 threads.

Here are results of multi-threaded pgbench runs on Fedora 11 with intel
core i7 (8 logical cores = 4 physical cores * HT). -j8 (8 threads) was
the best and the tps is 4.5 times of -j1, that is a traditional result.

$ pgbench -i -s10
$ pgbench -n -S -c64 -j1   =>  tps = 11600.158593
$ pgbench -n -S -c64 -j2   =>  tps = 17947.100954
$ pgbench -n -S -c64 -j4   =>  tps = 26571.124001
$ pgbench -n -S -c64 -j8   =>  tps = 52725.470403
$ pgbench -n -S -c64 -j16  =>  tps = 38976.675319
$ pgbench -n -S -c64 -j32  =>  tps = 28998.499601
$ pgbench -n -S -c64 -j64  =>  tps = 26701.877815

Is it acceptable to use pthread in contrib module?
If ok, I will add the patch to the next commitfest.
parent 90725929
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
* A simple benchmark program for PostgreSQL * A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors. * Originally written by Tatsuo Ishii and enhanced by many contributors.
* *
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $
* Copyright (c) 2000-2009, PostgreSQL Global Development Group * Copyright (c) 2000-2009, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "libpq-fe.h" #include "libpq-fe.h"
#include "pqsignal.h" #include "pqsignal.h"
#include "portability/instr_time.h"
#include <ctype.h> #include <ctype.h>
...@@ -58,6 +59,40 @@ ...@@ -58,6 +59,40 @@
#include <sys/resource.h> /* for getrlimit */ #include <sys/resource.h> /* for getrlimit */
#endif #endif
#ifndef INT64_MAX
#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
#endif
/*
* Multi-platform pthread implementations
*/
#ifdef WIN32
/* Use native win32 threads on Windows */
typedef struct win32_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
#elif defined(ENABLE_THREAD_SAFETY)
/* Use platform-dependent pthread */
#include <pthread.h>
#else
#include <sys/wait.h>
/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */
#define pthread_t pg_pthread_t
#define pthread_attr_t pg_pthread_attr_t
#define pthread_create pg_pthread_create
#define pthread_join pg_pthread_join
typedef struct fork_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
#endif
extern char *optarg; extern char *optarg;
extern int optind; extern int optind;
...@@ -74,7 +109,6 @@ extern int optind; ...@@ -74,7 +109,6 @@ extern int optind;
#define DEFAULT_NXACTS 10 /* default nxacts */ #define DEFAULT_NXACTS 10 /* default nxacts */
int nclients = 1; /* default number of simulated clients */
int nxacts = 0; /* number of transactions per client */ int nxacts = 0; /* number of transactions per client */
int duration = 0; /* duration in seconds */ int duration = 0; /* duration in seconds */
...@@ -102,8 +136,6 @@ FILE *LOGFILE = NULL; ...@@ -102,8 +136,6 @@ FILE *LOGFILE = NULL;
bool use_log; /* log transaction latencies to a file */ bool use_log; /* log transaction latencies to a file */
int remains; /* number of remaining clients */
int is_connect; /* establish connection for each transaction */ int is_connect; /* establish connection for each transaction */
char *pghost = ""; char *pghost = "";
...@@ -138,14 +170,33 @@ typedef struct ...@@ -138,14 +170,33 @@ typedef struct
int listen; /* 0 indicates that an async query has been int listen; /* 0 indicates that an async query has been
* sent */ * sent */
int sleeping; /* 1 indicates that the client is napping */ int sleeping; /* 1 indicates that the client is napping */
struct timeval until; /* napping until */ int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */ Variable *variables; /* array of variable definitions */
int nvariables; int nvariables;
struct timeval txn_begin; /* used for measuring latencies */ instr_time txn_begin; /* used for measuring latencies */
int use_file; /* index in sql_files for this client */ int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES]; bool prepared[MAX_FILES];
} CState; } CState;
/*
* Thread state and result
*/
typedef struct
{
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state */
instr_time start_time; /* thread start time */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
typedef struct
{
instr_time conn_time;
int xacts;
} TResult;
/* /*
* queries read from files * queries read from files
*/ */
...@@ -171,8 +222,9 @@ typedef struct ...@@ -171,8 +222,9 @@ typedef struct
char *argv[MAX_ARGS]; /* command list */ char *argv[MAX_ARGS]; /* command list */
} Command; } Command;
Command **sql_files[MAX_FILES]; /* SQL script files */ static Command **sql_files[MAX_FILES]; /* SQL script files */
int num_files; /* number of script files */ static int num_files; /* number of script files */
static int debug = 0; /* debug flag */
/* default scenario */ /* default scenario */
static char *tpc_b = { static char *tpc_b = {
...@@ -215,44 +267,9 @@ static char *select_only = { ...@@ -215,44 +267,9 @@ static char *select_only = {
"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
}; };
/* Connection overhead time */
static struct timeval conn_total_time = {0, 0};
/* Function prototypes */ /* Function prototypes */
static void setalarm(int seconds); static void setalarm(int seconds);
static void* threadRun(void *arg);
/* Calculate total time */
static void
addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
{
int sec = t1->tv_sec + t2->tv_sec;
int usec = t1->tv_usec + t2->tv_usec;
if (usec >= 1000000)
{
usec -= 1000000;
sec++;
}
result->tv_sec = sec;
result->tv_usec = usec;
}
/* Calculate time difference */
static void
diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
{
int sec = t1->tv_sec - t2->tv_sec;
int usec = t1->tv_usec - t2->tv_usec;
if (usec < 0)
{
usec += 1000000;
sec--;
}
result->tv_sec = sec;
result->tv_usec = usec;
}
static void static void
usage(const char *progname) usage(const char *progname)
...@@ -270,6 +287,7 @@ usage(const char *progname) ...@@ -270,6 +287,7 @@ usage(const char *progname)
" -D VARNAME=VALUE\n" " -D VARNAME=VALUE\n"
" define variable for use by custom script\n" " define variable for use by custom script\n"
" -f FILENAME read transaction script from FILENAME\n" " -f FILENAME read transaction script from FILENAME\n"
" -j NUM number of threads (default: 1)\n"
" -l write transaction times to log file\n" " -l write transaction times to log file\n"
" -M {simple|extended|prepared}\n" " -M {simple|extended|prepared}\n"
" protocol for submitting queries to server (default: simple)\n" " protocol for submitting queries to server (default: simple)\n"
...@@ -379,29 +397,6 @@ discard_response(CState *state) ...@@ -379,29 +397,6 @@ discard_response(CState *state)
} while (res); } while (res);
} }
/* check to see if the SQL result was good */
static int
check(CState *state, PGresult *res, int n)
{
CState *st = &state[n];
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
/* OK */
break;
default:
fprintf(stderr, "Client %d aborted in state %d: %s",
n, st->state, PQerrorMessage(st->con));
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return (-1);
}
return (0); /* OK */
}
static int static int
compareVariables(const void *v1, const void *v2) compareVariables(const void *v1, const void *v2)
{ {
...@@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state) ...@@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state); sprintf(buffer, "P%d_%d", file, state);
} }
static void static bool
doCustom(CState *state, int n, int debug) clientDone(CState *st, bool ok)
{
(void) ok; /* unused */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return false; /* always false */
}
/* return false iff client should be disconnected */
static bool
doCustom(CState *st, instr_time *conn_time)
{ {
PGresult *res; PGresult *res;
CState *st = &state[n];
Command **commands; Command **commands;
top: top:
...@@ -610,16 +618,13 @@ top: ...@@ -610,16 +618,13 @@ top:
if (st->sleeping) if (st->sleeping)
{ /* are we sleeping? */ { /* are we sleeping? */
int usec; instr_time now;
struct timeval now;
gettimeofday(&now, NULL); INSTR_TIME_SET_CURRENT(now);
usec = (st->until.tv_sec - now.tv_sec) * 1000000 + if (st->until <= INSTR_TIME_GET_MICROSEC(now))
st->until.tv_usec - now.tv_usec;
if (usec <= 0)
st->sleeping = 0; /* Done sleeping, go ahead with next command */ st->sleeping = 0; /* Done sleeping, go ahead with next command */
else else
return; /* Still sleeping, nothing to do here */ return true; /* Still sleeping, nothing to do here */
} }
if (st->listen) if (st->listen)
...@@ -627,17 +632,14 @@ top: ...@@ -627,17 +632,14 @@ top:
if (commands[st->state]->type == SQL_COMMAND) if (commands[st->state]->type == SQL_COMMAND)
{ {
if (debug) if (debug)
fprintf(stderr, "client %d receiving\n", n); fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con)) if (!PQconsumeInput(st->con))
{ /* there's something wrong */ { /* there's something wrong */
fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
remains--; /* I've aborted */ return clientDone(st, false);
PQfinish(st->con);
st->con = NULL;
return;
} }
if (PQisBusy(st->con)) if (PQisBusy(st->con))
return; /* don't have the whole result yet */ return true; /* don't have the whole result yet */
} }
/* /*
...@@ -645,25 +647,35 @@ top: ...@@ -645,25 +647,35 @@ top:
*/ */
if (use_log && commands[st->state + 1] == NULL) if (use_log && commands[st->state + 1] == NULL)
{ {
double diff; instr_time diff;
struct timeval now; double sec;
double msec;
double usec;
gettimeofday(&now, NULL); INSTR_TIME_SET_CURRENT(diff);
diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + INSTR_TIME_SUBTRACT(diff, st->txn_begin);
(int) (now.tv_usec - st->txn_begin.tv_usec); sec = INSTR_TIME_GET_DOUBLE(diff);
msec = INSTR_TIME_GET_MILLISEC(diff);
usec = (double) INSTR_TIME_GET_MICROSEC(diff);
fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
st->id, st->cnt, diff, st->use_file, st->id, st->cnt, usec, st->use_file,
(long) now.tv_sec, (long) now.tv_usec); sec, usec - sec * 1000.0);
} }
if (commands[st->state]->type == SQL_COMMAND) if (commands[st->state]->type == SQL_COMMAND)
{ {
res = PQgetResult(st->con); res = PQgetResult(st->con);
if (check(state, res, n)) switch (PQresultStatus(res))
{ {
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
break; /* OK */
default:
fprintf(stderr, "Client %d aborted in state %d: %s",
st->id, st->state, PQerrorMessage(st->con));
PQclear(res); PQclear(res);
return; return clientDone(st, false);
} }
PQclear(res); PQclear(res);
discard_response(st); discard_response(st);
...@@ -679,15 +691,7 @@ top: ...@@ -679,15 +691,7 @@ top:
++st->cnt; ++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
{ return clientDone(st, true); /* exit success */
remains--; /* I've done */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return;
}
} }
/* increment state counter */ /* increment state counter */
...@@ -702,27 +706,20 @@ top: ...@@ -702,27 +706,20 @@ top:
if (st->con == NULL) if (st->con == NULL)
{ {
struct timeval t1, instr_time start, end;
t2,
t3;
gettimeofday(&t1, NULL); INSTR_TIME_SET_CURRENT(start);
if ((st->con = doConnect()) == NULL) if ((st->con = doConnect()) == NULL)
{ {
fprintf(stderr, "Client %d aborted in establishing connection.\n", fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
n); return clientDone(st, false);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return;
} }
gettimeofday(&t2, NULL); INSTR_TIME_SET_CURRENT(end);
diffTime(&t2, &t1, &t3); INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
addTime(&conn_total_time, &t3, &conn_total_time);
} }
if (use_log && st->state == 0) if (use_log && st->state == 0)
gettimeofday(&(st->txn_begin), NULL); INSTR_TIME_SET_CURRENT(st->txn_begin);
if (commands[st->state]->type == SQL_COMMAND) if (commands[st->state]->type == SQL_COMMAND)
{ {
...@@ -738,11 +735,11 @@ top: ...@@ -738,11 +735,11 @@ top:
{ {
fprintf(stderr, "out of memory\n"); fprintf(stderr, "out of memory\n");
st->ecnt++; st->ecnt++;
return; return true;
} }
if (debug) if (debug)
fprintf(stderr, "client %d sending %s\n", n, sql); fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQuery(st->con, sql); r = PQsendQuery(st->con, sql);
free(sql); free(sql);
} }
...@@ -754,7 +751,7 @@ top: ...@@ -754,7 +751,7 @@ top:
getQueryParams(st, command, params); getQueryParams(st, command, params);
if (debug) if (debug)
fprintf(stderr, "client %d sending %s\n", n, sql); fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQueryParams(st->con, sql, command->argc - 1, r = PQsendQueryParams(st->con, sql, command->argc - 1,
NULL, params, NULL, NULL, 0); NULL, params, NULL, NULL, 0);
} }
...@@ -788,7 +785,7 @@ top: ...@@ -788,7 +785,7 @@ top:
preparedStatementName(name, st->use_file, st->state); preparedStatementName(name, st->use_file, st->state);
if (debug) if (debug)
fprintf(stderr, "client %d sending %s\n", n, name); fprintf(stderr, "client %d sending %s\n", st->id, name);
r = PQsendQueryPrepared(st->con, name, command->argc - 1, r = PQsendQueryPrepared(st->con, name, command->argc - 1,
params, NULL, NULL, 0); params, NULL, NULL, 0);
} }
...@@ -798,7 +795,7 @@ top: ...@@ -798,7 +795,7 @@ top:
if (r == 0) if (r == 0)
{ {
if (debug) if (debug)
fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]); fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
st->ecnt++; st->ecnt++;
} }
else else
...@@ -812,7 +809,7 @@ top: ...@@ -812,7 +809,7 @@ top:
if (debug) if (debug)
{ {
fprintf(stderr, "client %d executing \\%s", n, argv[0]); fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
for (i = 1; i < argc; i++) for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]); fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
...@@ -831,7 +828,7 @@ top: ...@@ -831,7 +828,7 @@ top:
{ {
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++; st->ecnt++;
return; return true;
} }
min = atoi(var); min = atoi(var);
} }
...@@ -853,7 +850,7 @@ top: ...@@ -853,7 +850,7 @@ top:
{ {
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
st->ecnt++; st->ecnt++;
return; return true;
} }
max = atoi(var); max = atoi(var);
} }
...@@ -864,7 +861,7 @@ top: ...@@ -864,7 +861,7 @@ top:
{ {
fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
st->ecnt++; st->ecnt++;
return; return true;
} }
#ifdef DEBUG #ifdef DEBUG
...@@ -876,7 +873,7 @@ top: ...@@ -876,7 +873,7 @@ top:
{ {
fprintf(stderr, "%s: out of memory\n", argv[0]); fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++; st->ecnt++;
return; return true;
} }
st->listen = 1; st->listen = 1;
...@@ -894,7 +891,7 @@ top: ...@@ -894,7 +891,7 @@ top:
{ {
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++; st->ecnt++;
return; return true;
} }
ope1 = atoi(var); ope1 = atoi(var);
} }
...@@ -911,7 +908,7 @@ top: ...@@ -911,7 +908,7 @@ top:
{ {
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
st->ecnt++; st->ecnt++;
return; return true;
} }
ope2 = atoi(var); ope2 = atoi(var);
} }
...@@ -930,7 +927,7 @@ top: ...@@ -930,7 +927,7 @@ top:
{ {
fprintf(stderr, "%s: division by zero\n", argv[0]); fprintf(stderr, "%s: division by zero\n", argv[0]);
st->ecnt++; st->ecnt++;
return; return true;
} }
snprintf(res, sizeof(res), "%d", ope1 / ope2); snprintf(res, sizeof(res), "%d", ope1 / ope2);
} }
...@@ -938,7 +935,7 @@ top: ...@@ -938,7 +935,7 @@ top:
{ {
fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
st->ecnt++; st->ecnt++;
return; return true;
} }
} }
...@@ -946,7 +943,7 @@ top: ...@@ -946,7 +943,7 @@ top:
{ {
fprintf(stderr, "%s: out of memory\n", argv[0]); fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++; st->ecnt++;
return; return true;
} }
st->listen = 1; st->listen = 1;
...@@ -955,7 +952,7 @@ top: ...@@ -955,7 +952,7 @@ top:
{ {
char *var; char *var;
int usec; int usec;
struct timeval now; instr_time now;
if (*argv[1] == ':') if (*argv[1] == ':')
{ {
...@@ -963,7 +960,7 @@ top: ...@@ -963,7 +960,7 @@ top:
{ {
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
st->ecnt++; st->ecnt++;
return; return true;
} }
usec = atoi(var); usec = atoi(var);
} }
...@@ -980,9 +977,8 @@ top: ...@@ -980,9 +977,8 @@ top:
else else
usec *= 1000000; usec *= 1000000;
gettimeofday(&now, NULL); INSTR_TIME_SET_CURRENT(now);
st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000; st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->until.tv_usec = (now.tv_usec + usec) % 1000000;
st->sleeping = 1; st->sleeping = 1;
st->listen = 1; st->listen = 1;
...@@ -990,18 +986,23 @@ top: ...@@ -990,18 +986,23 @@ top:
goto top; goto top;
} }
return true;
} }
/* discard connections */ /* discard connections */
static void static void
disconnect_all(CState *state) disconnect_all(CState *state, int length)
{ {
int i; int i;
for (i = 0; i < nclients; i++) for (i = 0; i < length; i++)
{ {
if (state[i].con) if (state[i].con)
{
PQfinish(state[i].con); PQfinish(state[i].con);
state[i].con = NULL;
}
} }
} }
...@@ -1267,6 +1268,24 @@ process_commands(char *buf) ...@@ -1267,6 +1268,24 @@ process_commands(char *buf)
return NULL; return NULL;
} }
/*
* Split argument into number and unit for "sleep 1ms" or so.
* We don't have to terminate the number argument with null
* because it will parsed with atoi, that ignores trailing
* non-digit characters.
*/
if (my_commands->argv[1][0] != ':')
{
char *c = my_commands->argv[1];
while (isdigit(*c)) { c++; }
if (*c)
{
my_commands->argv[2] = c;
if (my_commands->argc < 3)
my_commands->argc = 3;
}
}
if (my_commands->argc >= 3) if (my_commands->argc >= 3)
{ {
if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
...@@ -1453,25 +1472,18 @@ process_builtin(char *tb) ...@@ -1453,25 +1472,18 @@ process_builtin(char *tb)
/* print out results */ /* print out results */
static void static void
printResults( printResults(int ttype, int normal_xacts, int nclients, int nthreads,
int ttype, CState *state, instr_time total_time, instr_time conn_total_time)
struct timeval * start_time, struct timeval * end_time)
{ {
double t1, double time_include,
t2; tps_include,
int i; tps_exclude;
int normal_xacts = 0;
char *s; char *s;
for (i = 0; i < nclients; i++) time_include = INSTR_TIME_GET_DOUBLE(total_time);
normal_xacts += state[i].cnt; tps_include = normal_xacts / time_include;
tps_exclude = normal_xacts / (time_include -
t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec); (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
t1 = normal_xacts * 1000000.0 / t1;
t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
(end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
t2 = normal_xacts * 1000000.0 / t2;
if (ttype == 0) if (ttype == 0)
s = "TPC-B (sort of)"; s = "TPC-B (sort of)";
...@@ -1486,6 +1498,7 @@ printResults( ...@@ -1486,6 +1498,7 @@ printResults(
printf("scaling factor: %d\n", scale); printf("scaling factor: %d\n", scale);
printf("query mode: %s\n", QUERYMODE[querymode]); printf("query mode: %s\n", QUERYMODE[querymode]);
printf("number of clients: %d\n", nclients); printf("number of clients: %d\n", nclients);
printf("number of threads: %d\n", nthreads);
if (duration <= 0) if (duration <= 0)
{ {
printf("number of transactions per client: %d\n", nxacts); printf("number of transactions per client: %d\n", nxacts);
...@@ -1498,8 +1511,8 @@ printResults( ...@@ -1498,8 +1511,8 @@ printResults(
printf("number of transactions actually processed: %d\n", printf("number of transactions actually processed: %d\n",
normal_xacts); normal_xacts);
} }
printf("tps = %f (including connections establishing)\n", t1); printf("tps = %f (including connections establishing)\n", tps_include);
printf("tps = %f (excluding connections establishing)\n", t2); printf("tps = %f (excluding connections establishing)\n", tps_exclude);
} }
...@@ -1507,29 +1520,26 @@ int ...@@ -1507,29 +1520,26 @@ int
main(int argc, char **argv) main(int argc, char **argv)
{ {
int c; int c;
int nclients = 1; /* default number of simulated clients */
int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */ int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
int debug = 0; /* debug flag */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
* 2: skip update of branches and tellers */ * 2: skip update of branches and tellers */
char *filename = NULL; char *filename = NULL;
bool scale_given = false; bool scale_given = false;
CState *state; /* status of clients */ CState *state; /* status of clients */
TState *threads; /* array of thread */
struct timeval start_time; /* start up time */ instr_time start_time; /* start up time */
struct timeval end_time; /* end time */ instr_time total_time;
instr_time conn_total_time;
int total_xacts;
int i; int i;
fd_set input_mask;
int nsocks; /* return from select(2) */
int maxsock; /* max socket number to be waited */
struct timeval now;
struct timeval timeout;
int min_usec;
#ifdef HAVE_GETRLIMIT #ifdef HAVE_GETRLIMIT
struct rlimit rlim; struct rlimit rlim;
#endif #endif
...@@ -1579,7 +1589,7 @@ main(int argc, char **argv) ...@@ -1579,7 +1589,7 @@ main(int argc, char **argv)
memset(state, 0, sizeof(*state)); memset(state, 0, sizeof(*state));
while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1) while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
{ {
switch (c) switch (c)
{ {
...@@ -1632,6 +1642,14 @@ main(int argc, char **argv) ...@@ -1632,6 +1642,14 @@ main(int argc, char **argv)
} }
#endif /* HAVE_GETRLIMIT */ #endif /* HAVE_GETRLIMIT */
break; break;
case 'j': /* jobs */
nthreads = atoi(optarg);
if (nthreads <= 0)
{
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
exit(1);
}
break;
case 'C': case 'C':
is_connect = 1; is_connect = 1;
break; break;
...@@ -1752,7 +1770,11 @@ main(int argc, char **argv) ...@@ -1752,7 +1770,11 @@ main(int argc, char **argv)
if (nxacts <= 0 && duration <= 0) if (nxacts <= 0 && duration <= 0)
nxacts = DEFAULT_NXACTS; nxacts = DEFAULT_NXACTS;
remains = nclients; if (nclients % nthreads != 0)
{
fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
exit(1);
}
if (nclients > 1) if (nclients > 1)
{ {
...@@ -1770,6 +1792,7 @@ main(int argc, char **argv) ...@@ -1770,6 +1792,7 @@ main(int argc, char **argv)
{ {
int j; int j;
state[i].id = i;
for (j = 0; j < state[0].nvariables; j++) for (j = 0; j < state[0].nvariables; j++)
{ {
if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
...@@ -1879,33 +1902,8 @@ main(int argc, char **argv) ...@@ -1879,33 +1902,8 @@ main(int argc, char **argv)
PQfinish(con); PQfinish(con);
/* set random seed */ /* set random seed */
gettimeofday(&start_time, NULL); INSTR_TIME_SET_CURRENT(start_time);
srandom((unsigned int) start_time.tv_usec); srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
/* get start up time */
gettimeofday(&start_time, NULL);
/* set alarm if duration is specified. */
if (duration > 0)
setalarm(duration);
if (is_connect == 0)
{
struct timeval t,
now;
/* make connections to the database */
for (i = 0; i < nclients; i++)
{
state[i].id = i;
if ((state[i].con = doConnect()) == NULL)
exit(1);
}
/* time after connections set up */
gettimeofday(&now, NULL);
diffTime(&now, &start_time, &t);
addTime(&conn_total_time, &t, &conn_total_time);
}
/* process bultin SQL scripts */ /* process bultin SQL scripts */
switch (ttype) switch (ttype)
...@@ -1929,140 +1927,227 @@ main(int argc, char **argv) ...@@ -1929,140 +1927,227 @@ main(int argc, char **argv)
break; break;
} }
/* send start up queries in async manner */ /* get start up time */
for (i = 0; i < nclients; i++) INSTR_TIME_SET_CURRENT(start_time);
{
Command **commands = sql_files[state[i].use_file]; /* set alarm if duration is specified. */
int prev_ecnt = state[i].ecnt; if (duration > 0)
setalarm(duration);
state[i].use_file = getrand(0, num_files - 1); /* start threads */
doCustom(state, i, debug); threads = (TState *) malloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
threads[i].state = &state[nclients / nthreads * i];
threads[i].nstate = nclients / nthreads;
INSTR_TIME_SET_CURRENT(threads[i].start_time);
if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) /* the first thread (i = 0) is executed by main thread */
if (i > 0)
{ {
fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state); int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
remains--; /* I've aborted */ if (err != 0 || threads[i].thread == INVALID_THREAD)
PQfinish(state[i].con); {
state[i].con = NULL; fprintf(stderr, "cannot create thread: %s\n", strerror(err));
exit(1);
}
}
else
{
threads[i].thread = INVALID_THREAD;
} }
} }
for (;;) /* wait for threads and accumulate results */
total_xacts = 0;
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
void *ret = NULL;
if (threads[i].thread == INVALID_THREAD)
ret = threadRun(&threads[i]);
else
pthread_join(threads[i].thread, &ret);
if (ret != NULL)
{ {
if (remains <= 0) TResult *r = (TResult *) ret;
{ /* all done ? */ total_xacts += r->xacts;
disconnect_all(state); INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
}
}
disconnect_all(state, nclients);
/* get end time */ /* get end time */
gettimeofday(&end_time, NULL); INSTR_TIME_SET_CURRENT(total_time);
printResults(ttype, state, &start_time, &end_time); INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
if (LOGFILE) if (LOGFILE)
fclose(LOGFILE); fclose(LOGFILE);
exit(0);
return 0;
}
static void *
threadRun(void *arg)
{
TState *thread = (TState *) arg;
CState *state = thread->state;
TResult *result;
instr_time start, end;
int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */
int i;
result = malloc(sizeof(TResult));
INSTR_TIME_SET_ZERO(result->conn_time);
if (is_connect == 0)
{
/* make connections to the database */
for (i = 0; i < nstate; i++)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
}
}
/* time after thread and connections set up */
INSTR_TIME_SET_CURRENT(result->conn_time);
INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
/* send start up queries in async manner */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
Command **commands = sql_files[st->use_file];
int prev_ecnt = st->ecnt;
st->use_file = getrand(0, num_files - 1);
if (!doCustom(st, &result->conn_time))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
}
} }
while (remains > 0)
{
fd_set input_mask;
int maxsock; /* max socket number to be waited */
int64 now_usec = 0;
int64 min_usec;
FD_ZERO(&input_mask); FD_ZERO(&input_mask);
maxsock = -1; maxsock = -1;
min_usec = -1; min_usec = INT64_MAX;
for (i = 0; i < nclients; i++) for (i = 0; i < nstate; i++)
{ {
Command **commands = sql_files[state[i].use_file]; CState *st = &state[i];
Command **commands = sql_files[st->use_file];
int sock;
if (state[i].sleeping) if (st->sleeping)
{ {
int this_usec; int this_usec;
int sock = PQsocket(state[i].con);
if (min_usec < 0) if (min_usec == INT64_MAX)
{ {
gettimeofday(&now, NULL); instr_time now;
min_usec = 0; INSTR_TIME_SET_CURRENT(now);
now_usec = INSTR_TIME_GET_MICROSEC(now);
} }
this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 + this_usec = st->until - now_usec;
state[i].until.tv_usec - now.tv_usec; if (min_usec > this_usec)
if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
min_usec = this_usec; min_usec = this_usec;
FD_SET (sock, &input_mask);
if (maxsock < sock)
maxsock = sock;
} }
else if (state[i].con && commands[state[i].state]->type != META_COMMAND) else if (st->con == NULL)
{ {
int sock = PQsocket(state[i].con); continue;
}
else if (commands[st->state]->type == META_COMMAND)
{
min_usec = 0; /* the connection is ready to run */
break;
}
sock = PQsocket(st->con);
if (sock < 0) if (sock < 0)
{ {
disconnect_all(state); fprintf(stderr, "bad socket: %s\n", strerror(errno));
exit(1); goto done;
} }
FD_SET (sock, &input_mask);
FD_SET(sock, &input_mask);
if (maxsock < sock) if (maxsock < sock)
maxsock = sock; maxsock = sock;
} }
}
if (maxsock != -1) if (min_usec > 0 && maxsock != -1)
{ {
if (min_usec >= 0) int nsocks; /* return from select(2) */
if (min_usec != INT64_MAX)
{ {
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000; timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000; timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
(fd_set *) NULL, &timeout);
} }
else else
nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
(fd_set *) NULL, (struct timeval *) NULL);
if (nsocks < 0) if (nsocks < 0)
{ {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
/* must be something wrong */ /* must be something wrong */
disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno)); fprintf(stderr, "select failed: %s\n", strerror(errno));
exit(1); goto done;
}
#ifdef NOT_USED
else if (nsocks == 0)
{ /* timeout */
fprintf(stderr, "select timeout\n");
for (i = 0; i < nclients; i++)
{
fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
} }
exit(0);
}
#endif
} }
/* ok, backend returns reply */ /* ok, backend returns reply */
for (i = 0; i < nclients; i++) for (i = 0; i < nstate; i++)
{ {
Command **commands = sql_files[state[i].use_file]; CState *st = &state[i];
int prev_ecnt = state[i].ecnt; Command **commands = sql_files[st->use_file];
int prev_ecnt = st->ecnt;
if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[state[i].state]->type == META_COMMAND)) || commands[st->state]->type == META_COMMAND))
{ {
doCustom(state, i, debug); if (!doCustom(st, &result->conn_time))
remains--; /* I've aborted */
} }
if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{ {
fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state); fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
remains--; /* I've aborted */ remains--; /* I've aborted */
PQfinish(state[i].con); PQfinish(st->con);
state[i].con = NULL; st->con = NULL;
} }
} }
} }
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
result->xacts = 0;
for (i = 0; i < nstate; i++)
result->xacts += state[i].cnt;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
return result;
} }
...@@ -2084,6 +2169,87 @@ setalarm(int seconds) ...@@ -2084,6 +2169,87 @@ setalarm(int seconds)
pqsignal(SIGALRM, handle_sig_alarm); pqsignal(SIGALRM, handle_sig_alarm);
alarm(seconds); alarm(seconds);
} }
#ifndef ENABLE_THREAD_SAFETY
/*
* implements pthread using fork.
*/
typedef struct fork_pthread
{
pid_t pid;
int pipes[2];
} fork_pthread;
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
void * (*start_routine)(void *),
void *arg)
{
fork_pthread *th;
void *ret;
th = (fork_pthread *) malloc(sizeof(fork_pthread));
pipe(th->pipes);
th->pid = fork();
if (th->pid == -1) /* error */
{
free(th);
return errno;
}
if (th->pid != 0) /* parent process */
{
close(th->pipes[1]);
*thread = th;
return 0;
}
/* child process */
close(th->pipes[0]);
/* set alarm again because the child does not inherit timers */
if (duration > 0)
setalarm(duration);
ret = start_routine(arg);
write(th->pipes[1], ret, sizeof(TResult));
close(th->pipes[1]);
free(th);
exit(0);
}
static int
pthread_join(pthread_t th, void **thread_return)
{
int status;
while (waitpid(th->pid, &status, 0) != th->pid)
{
if (errno != EINTR)
return errno;
}
if (thread_return != NULL)
{
/* assume result is TResult */
*thread_return = malloc(sizeof(TResult));
if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
{
free(*thread_return);
*thread_return = NULL;
}
}
close(th->pipes[0]);
free(th);
return 0;
}
#endif
#else /* WIN32 */ #else /* WIN32 */
static VOID CALLBACK static VOID CALLBACK
...@@ -2110,4 +2276,70 @@ setalarm(int seconds) ...@@ -2110,4 +2276,70 @@ setalarm(int seconds)
} }
} }
/* partial pthread implementation for Windows */
typedef struct win32_pthread
{
HANDLE handle;
void *(*routine)(void *);
void *arg;
void *result;
} win32_pthread;
static unsigned __stdcall
win32_pthread_run(void *arg)
{
win32_pthread *th = (win32_pthread *) arg;
th->result = th->routine(th->arg);
return 0;
}
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
void * (*start_routine)(void *),
void *arg)
{
int save_errno;
win32_pthread *th;
th = (win32_pthread *) malloc(sizeof(win32_pthread));
th->routine = start_routine;
th->arg = arg;
th->result = NULL;
th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
if (th->handle == NULL)
{
save_errno = errno;
free(th);
return save_errno;
}
*thread = th;
return 0;
}
static int
pthread_join(pthread_t th, void **thread_return)
{
if (th == NULL || th->handle == NULL)
return errno = EINVAL;
if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
{
_dosmaperr(GetLastError());
return errno;
}
if (thread_return)
*thread_return = th->result;
CloseHandle(th->handle);
free(th);
return 0;
}
#endif /* WIN32 */ #endif /* WIN32 */
<!-- $PostgreSQL: pgsql/doc/src/sgml/pgbench.sgml,v 1.8 2009/05/07 22:01:18 tgl Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/pgbench.sgml,v 1.9 2009/08/03 15:18:14 ishii Exp $ -->
<sect1 id="pgbench"> <sect1 id="pgbench">
<title>pgbench</title> <title>pgbench</title>
...@@ -171,6 +171,14 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</> ...@@ -171,6 +171,14 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
sessions. Default is 1. sessions. Default is 1.
</entry> </entry>
</row> </row>
<row>
<entry><literal>-j</literal> <replaceable>threads</></entry>
<entry>
Number of worker threads. Clients are equally-divided into those
threads and executed in it. The number of clients must be a multiple
number of threads. Default is 1.
</entry>
</row>
<row> <row>
<entry><literal>-t</literal> <replaceable>transactions</></entry> <entry><literal>-t</literal> <replaceable>transactions</></entry>
<entry> <entry>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment