Commit aeb57af8 authored by Thomas Munro's avatar Thomas Munro

pgbench: Synchronize client threads.

Wait until all pgbench threads are connected before benchmarking begins.
This fixes a problem where some connections could take a very long time
to be established because of lock contention from earlier connections,
making results unstable and bogus with high connection counts.

Author: Andres Freund <andres@anarazel.de>
Author: Fabien COELHO <coelho@cri.ensmp.fr>
Reviewed-by: default avatarMarina Polyakova <m.polyakova@postgrespro.ru>
Reviewed-by: default avatarKyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: default avatarHayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: default avatarDavid Rowley <dgrowleyml@gmail.com>
Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
parent 44bf3d50
...@@ -126,9 +126,16 @@ typedef struct socket_set ...@@ -126,9 +126,16 @@ typedef struct socket_set
#define THREAD_JOIN(handle) \ #define THREAD_JOIN(handle) \
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \ (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO()) GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
#define THREAD_BARRIER_INIT(barrier, n) \
(InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
#define THREAD_BARRIER_WAIT(barrier) \
EnterSynchronizationBarrier((barrier), \
SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
#define THREAD_BARRIER_DESTROY(barrier)
#elif defined(ENABLE_THREAD_SAFETY) #elif defined(ENABLE_THREAD_SAFETY)
/* Use POSIX threads */ /* Use POSIX threads */
#include <pthread.h> #include "port/pg_pthread.h"
#define THREAD_T pthread_t #define THREAD_T pthread_t
#define THREAD_FUNC_RETURN_TYPE void * #define THREAD_FUNC_RETURN_TYPE void *
#define THREAD_FUNC_RETURN return NULL #define THREAD_FUNC_RETURN return NULL
...@@ -136,11 +143,20 @@ typedef struct socket_set ...@@ -136,11 +143,20 @@ typedef struct socket_set
pthread_create((handle), NULL, (function), (arg)) pthread_create((handle), NULL, (function), (arg))
#define THREAD_JOIN(handle) \ #define THREAD_JOIN(handle) \
pthread_join((handle), NULL) pthread_join((handle), NULL)
#define THREAD_BARRIER_T pthread_barrier_t
#define THREAD_BARRIER_INIT(barrier, n) \
pthread_barrier_init((barrier), NULL, (n))
#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
#else #else
/* No threads implementation, use none (-j 1) */ /* No threads implementation, use none (-j 1) */
#define THREAD_T void * #define THREAD_T void *
#define THREAD_FUNC_RETURN_TYPE void * #define THREAD_FUNC_RETURN_TYPE void *
#define THREAD_FUNC_RETURN return NULL #define THREAD_FUNC_RETURN return NULL
#define THREAD_BARRIER_T int
#define THREAD_BARRIER_INIT(barrier, n) (*(barrier) = 0)
#define THREAD_BARRIER_WAIT(barrier)
#define THREAD_BARRIER_DESTROY(barrier)
#endif #endif
...@@ -326,6 +342,9 @@ typedef struct RandomState ...@@ -326,6 +342,9 @@ typedef struct RandomState
/* Various random sequences are initialized from this one. */ /* Various random sequences are initialized from this one. */
static RandomState base_random_sequence; static RandomState base_random_sequence;
/* Synchronization barrier for start and connection */
static THREAD_BARRIER_T barrier;
/* /*
* Connection state machine states. * Connection state machine states.
*/ */
...@@ -6121,6 +6140,10 @@ main(int argc, char **argv) ...@@ -6121,6 +6140,10 @@ main(int argc, char **argv)
if (duration > 0) if (duration > 0)
setalarm(duration); setalarm(duration);
errno = THREAD_BARRIER_INIT(&barrier, nthreads);
if (errno != 0)
pg_log_fatal("could not initialize barrier: %m");
#ifdef ENABLE_THREAD_SAFETY #ifdef ENABLE_THREAD_SAFETY
/* start all threads but thread 0 which is executed directly later */ /* start all threads but thread 0 which is executed directly later */
for (i = 1; i < nthreads; i++) for (i = 1; i < nthreads; i++)
...@@ -6191,6 +6214,8 @@ main(int argc, char **argv) ...@@ -6191,6 +6214,8 @@ main(int argc, char **argv)
printResults(&stats, pg_time_now() - bench_start, conn_total_duration, printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
bench_start - start_time, latency_late); bench_start - start_time, latency_late);
THREAD_BARRIER_DESTROY(&barrier);
if (exit_code != 0) if (exit_code != 0)
pg_log_fatal("Run was aborted; the above results are incomplete."); pg_log_fatal("Run was aborted; the above results are incomplete.");
...@@ -6237,6 +6262,8 @@ threadRun(void *arg) ...@@ -6237,6 +6262,8 @@ threadRun(void *arg)
state[i].state = CSTATE_CHOOSE_SCRIPT; state[i].state = CSTATE_CHOOSE_SCRIPT;
/* READY */ /* READY */
THREAD_BARRIER_WAIT(&barrier);
thread_start = pg_time_now(); thread_start = pg_time_now();
thread->started_time = thread_start; thread->started_time = thread_start;
last_report = thread_start; last_report = thread_start;
...@@ -6249,7 +6276,18 @@ threadRun(void *arg) ...@@ -6249,7 +6276,18 @@ threadRun(void *arg)
for (int i = 0; i < nstate; i++) for (int i = 0; i < nstate; i++)
{ {
if ((state[i].con = doConnect()) == NULL) if ((state[i].con = doConnect()) == NULL)
{
/*
* On connection failure, we meet the barrier here in place of
* GO before proceeding to the "done" path which will cleanup,
* so as to avoid locking the process.
*
* It is unclear whether it is worth doing anything rather than
* coldly exiting with an error message.
*/
THREAD_BARRIER_WAIT(&barrier);
goto done; goto done;
}
} }
/* compute connection delay */ /* compute connection delay */
...@@ -6261,6 +6299,8 @@ threadRun(void *arg) ...@@ -6261,6 +6299,8 @@ threadRun(void *arg)
thread->conn_duration = 0; thread->conn_duration = 0;
} }
/* GO */
THREAD_BARRIER_WAIT(&barrier);
start = pg_time_now(); start = pg_time_now();
thread->bench_start = start; thread->bench_start = start;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment