Commit 60e612b6 authored by Tom Lane's avatar Tom Lane

Use ppoll(2), if available, to wait for input in pgbench.

Previously, pgbench always used select(2) for this purpose, but that's
problematic for very high client counts, because select() can't deal
with file descriptor numbers larger than FD_SETSIZE.  It's pretty common
for that to be only 1024 or so, whereas modern OSes can allow many more
open files than that.  Using poll(2) would surmount that problem, but it
creates another one: poll()'s timeout resolution is only 1ms, which is
poor enough to cause problems with --rate specifications approaching or
exceeding 1K TPS.

On platforms that have ppoll(2), which includes Linux and recent
FreeBSD, we can use that to avoid the FD_SETSIZE problem without any
loss of timeout resolution.  Hence, add configure logic to test for
ppoll(), and use it if available.

This patch introduces an abstraction layer into pgbench that could
be extended to support other kernel event-wait APIs such as kevents.
But actually adding such support is a matter for some future patch.

Doug Rady, reviewed by Robert Haas and Fabien Coelho, and whacked around
a good bit more by me

Discussion: https://postgr.es/m/23D017C9-81B7-484D-8490-FD94DEC4DF59@amazon.com
parent 87d9bbca
...@@ -15093,7 +15093,7 @@ fi ...@@ -15093,7 +15093,7 @@ fi
LIBS_including_readline="$LIBS" LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l
do : do :
as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
......
...@@ -1562,7 +1562,7 @@ PGAC_FUNC_WCSTOMBS_L ...@@ -1562,7 +1562,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS" LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
AC_REPLACE_FUNCS(fseeko) AC_REPLACE_FUNCS(fseeko)
case $host_os in case $host_os in
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
*/ */
#ifdef WIN32 #ifdef WIN32
#define FD_SETSIZE 1024 /* set before winsock2.h is included */ #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
#endif /* ! WIN32 */ #endif
#include "postgres_fe.h" #include "postgres_fe.h"
#include "fe_utils/conditional.h" #include "fe_utils/conditional.h"
...@@ -45,12 +45,21 @@ ...@@ -45,12 +45,21 @@
#include <signal.h> #include <signal.h>
#include <time.h> #include <time.h>
#include <sys/time.h> #include <sys/time.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
#endif
/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
#define POLL_USING_PPOLL
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#else /* no ppoll(), so use select() */
#define POLL_USING_SELECT
#ifdef HAVE_SYS_SELECT_H #ifdef HAVE_SYS_SELECT_H
#include <sys/select.h> #include <sys/select.h>
#endif #endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
#endif #endif
#ifndef M_PI #ifndef M_PI
...@@ -70,6 +79,33 @@ ...@@ -70,6 +79,33 @@
#define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8) #define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
#define MM2_ROT 47 #define MM2_ROT 47
/*
* Multi-platform socket set implementations
*/
#ifdef POLL_USING_PPOLL
#define SOCKET_WAIT_METHOD "ppoll"
typedef struct socket_set
{
int maxfds; /* allocated length of pollfds[] array */
int curfds; /* number currently in use */
struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
} socket_set;
#endif /* POLL_USING_PPOLL */
#ifdef POLL_USING_SELECT
#define SOCKET_WAIT_METHOD "select"
typedef struct socket_set
{
int maxfd; /* largest FD currently set in fds */
fd_set fds;
} socket_set;
#endif /* POLL_USING_SELECT */
/* /*
* Multi-platform pthread implementations * Multi-platform pthread implementations
*/ */
...@@ -93,13 +129,6 @@ static int pthread_join(pthread_t th, void **thread_return); ...@@ -93,13 +129,6 @@ static int pthread_join(pthread_t th, void **thread_return);
/******************************************************************** /********************************************************************
* some configurable parameters */ * some configurable parameters */
/* max number of clients allowed */
#ifdef FD_SETSIZE
#define MAXCLIENTS (FD_SETSIZE - 10)
#else
#define MAXCLIENTS 1024
#endif
#define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
#define LOG_STEP_SECONDS 5 /* seconds between log messages */ #define LOG_STEP_SECONDS 5 /* seconds between log messages */
...@@ -523,8 +552,14 @@ static void processXactStats(TState *thread, CState *st, instr_time *now, ...@@ -523,8 +552,14 @@ static void processXactStats(TState *thread, CState *st, instr_time *now,
static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2); static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script); static void addScript(ParsedScript script);
static void *threadRun(void *arg); static void *threadRun(void *arg);
static void setalarm(int seconds);
static void finishCon(CState *st); static void finishCon(CState *st);
static void setalarm(int seconds);
static socket_set *alloc_socket_set(int count);
static void free_socket_set(socket_set *sa);
static void clear_socket_set(socket_set *sa);
static void add_socket_to_set(socket_set *sa, int fd, int idx);
static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
/* callback functions for our flex lexer */ /* callback functions for our flex lexer */
...@@ -4903,7 +4938,7 @@ main(int argc, char **argv) ...@@ -4903,7 +4938,7 @@ main(int argc, char **argv)
case 'c': case 'c':
benchmarking_option_set = true; benchmarking_option_set = true;
nclients = atoi(optarg); nclients = atoi(optarg);
if (nclients <= 0 || nclients > MAXCLIENTS) if (nclients <= 0)
{ {
fprintf(stderr, "invalid number of clients: \"%s\"\n", fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg); optarg);
...@@ -5606,6 +5641,7 @@ threadRun(void *arg) ...@@ -5606,6 +5641,7 @@ threadRun(void *arg)
end; end;
int nstate = thread->nstate; int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */ int remains = nstate; /* number of remaining clients */
socket_set *sockets = alloc_socket_set(nstate);
int i; int i;
/* for reporting progress: */ /* for reporting progress: */
...@@ -5673,14 +5709,16 @@ threadRun(void *arg) ...@@ -5673,14 +5709,16 @@ threadRun(void *arg)
/* loop till all clients have terminated */ /* loop till all clients have terminated */
while (remains > 0) while (remains > 0)
{ {
fd_set input_mask; int nsocks; /* number of sockets to be waited for */
int maxsock; /* max socket number to be waited for */
int64 min_usec; int64 min_usec;
int64 now_usec = 0; /* set this only if needed */ int64 now_usec = 0; /* set this only if needed */
/* identify which client sockets should be checked for input */ /*
FD_ZERO(&input_mask); * identify which client sockets should be checked for input, and
maxsock = -1; * compute the nearest time (if any) at which we need to wake up.
*/
clear_socket_set(sockets);
nsocks = 0;
min_usec = PG_INT64_MAX; min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
...@@ -5728,9 +5766,7 @@ threadRun(void *arg) ...@@ -5728,9 +5766,7 @@ threadRun(void *arg)
goto done; goto done;
} }
FD_SET(sock, &input_mask); add_socket_to_set(sockets, sock, nsocks++);
if (maxsock < sock)
maxsock = sock;
} }
else if (st->state != CSTATE_ABORTED && else if (st->state != CSTATE_ABORTED &&
st->state != CSTATE_FINISHED) st->state != CSTATE_FINISHED)
...@@ -5764,35 +5800,29 @@ threadRun(void *arg) ...@@ -5764,35 +5800,29 @@ threadRun(void *arg)
/* /*
* If no clients are ready to execute actions, sleep until we receive * If no clients are ready to execute actions, sleep until we receive
* data from the server, or a nap-time specified in the script ends, * data on some client socket or the timeout (if any) elapses.
* or it's time to print a progress report. Update input_mask to show
* which client(s) received data.
*/ */
if (min_usec > 0) if (min_usec > 0)
{ {
int nsocks = 0; /* return from select(2) if called */ int rc = 0;
if (min_usec != PG_INT64_MAX) if (min_usec != PG_INT64_MAX)
{ {
if (maxsock != -1) if (nsocks > 0)
{ {
struct timeval timeout; rc = wait_on_socket_set(sockets, min_usec);
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
} }
else /* nothing active, simple sleep */ else /* nothing active, simple sleep */
{ {
pg_usleep(min_usec); pg_usleep(min_usec);
} }
} }
else /* no explicit delay, select without timeout */ else /* no explicit delay, wait without timeout */
{ {
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); rc = wait_on_socket_set(sockets, 0);
} }
if (nsocks < 0) if (rc < 0)
{ {
if (errno == EINTR) if (errno == EINTR)
{ {
...@@ -5800,19 +5830,20 @@ threadRun(void *arg) ...@@ -5800,19 +5830,20 @@ threadRun(void *arg)
continue; continue;
} }
/* must be something wrong */ /* must be something wrong */
fprintf(stderr, "select() failed: %s\n", strerror(errno)); fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno));
goto done; goto done;
} }
} }
else else
{ {
/* min_usec == 0, i.e. something needs to be executed */ /* min_usec <= 0, i.e. something needs to be executed now */
/* If we didn't call select(), don't try to read any data */ /* If we didn't wait, don't try to read any data */
FD_ZERO(&input_mask); clear_socket_set(sockets);
} }
/* ok, advance the state machine of each connection */ /* ok, advance the state machine of each connection */
nsocks = 0;
for (i = 0; i < nstate; i++) for (i = 0; i < nstate; i++)
{ {
CState *st = &state[i]; CState *st = &state[i];
...@@ -5829,7 +5860,7 @@ threadRun(void *arg) ...@@ -5829,7 +5860,7 @@ threadRun(void *arg)
goto done; goto done;
} }
if (!FD_ISSET(sock, &input_mask)) if (!socket_has_input(sockets, sock, nsocks++))
continue; continue;
} }
else if (st->state == CSTATE_FINISHED || else if (st->state == CSTATE_FINISHED ||
...@@ -5967,6 +5998,7 @@ done: ...@@ -5967,6 +5998,7 @@ done:
fclose(thread->logfile); fclose(thread->logfile);
thread->logfile = NULL; thread->logfile = NULL;
} }
free_socket_set(sockets);
return NULL; return NULL;
} }
...@@ -6025,8 +6057,185 @@ setalarm(int seconds) ...@@ -6025,8 +6057,185 @@ setalarm(int seconds)
} }
} }
#endif /* WIN32 */
/*
* These functions provide an abstraction layer that hides the syscall
* we use to wait for input on a set of sockets.
*
* Currently there are two implementations, based on ppoll(2) and select(2).
* ppoll() is preferred where available due to its typically higher ceiling
* on the number of usable sockets. We do not use the more-widely-available
* poll(2) because it only offers millisecond timeout resolution, which could
* be problematic with high --rate settings.
*
* Function APIs:
*
* alloc_socket_set: allocate an empty socket set with room for up to
* "count" sockets.
*
* free_socket_set: deallocate a socket set.
*
* clear_socket_set: reset a socket set to empty.
*
* add_socket_to_set: add socket with indicated FD to slot "idx" in the
* socket set. Slots must be filled in order, starting with 0.
*
* wait_on_socket_set: wait for input on any socket in set, or for timeout
* to expire. timeout is measured in microseconds; 0 means wait forever.
* Returns result code of underlying syscall (>=0 if OK, else see errno).
*
* socket_has_input: after waiting, call this to see if given socket has
* input. fd and idx parameters should match some previous call to
* add_socket_to_set.
*
* Note that wait_on_socket_set destructively modifies the state of the
* socket set. After checking for input, caller must apply clear_socket_set
* and add_socket_to_set again before waiting again.
*/
#ifdef POLL_USING_PPOLL
static socket_set *
alloc_socket_set(int count)
{
socket_set *sa;
sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
sizeof(struct pollfd) * count);
sa->maxfds = count;
sa->curfds = 0;
return sa;
}
static void
free_socket_set(socket_set *sa)
{
pg_free(sa);
}
static void
clear_socket_set(socket_set *sa)
{
sa->curfds = 0;
}
static void
add_socket_to_set(socket_set *sa, int fd, int idx)
{
Assert(idx < sa->maxfds && idx == sa->curfds);
sa->pollfds[idx].fd = fd;
sa->pollfds[idx].events = POLLIN;
sa->pollfds[idx].revents = 0;
sa->curfds++;
}
static int
wait_on_socket_set(socket_set *sa, int64 usecs)
{
if (usecs > 0)
{
struct timespec timeout;
timeout.tv_sec = usecs / 1000000;
timeout.tv_nsec = (usecs % 1000000) * 1000;
return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
}
else
{
return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
}
}
static bool
socket_has_input(socket_set *sa, int fd, int idx)
{
/*
* In some cases, threadRun will apply clear_socket_set and then try to
* apply socket_has_input anyway with arguments that it used before that,
* or might've used before that except that it exited its setup loop
* early. Hence, if the socket set is empty, silently return false
* regardless of the parameters. If it's not empty, we can Assert that
* the parameters match a previous call.
*/
if (sa->curfds == 0)
return false;
Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
return (sa->pollfds[idx].revents & POLLIN) != 0;
}
#endif /* POLL_USING_PPOLL */
#ifdef POLL_USING_SELECT
static socket_set *
alloc_socket_set(int count)
{
return (socket_set *) pg_malloc0(sizeof(socket_set));
}
static void
free_socket_set(socket_set *sa)
{
pg_free(sa);
}
static void
clear_socket_set(socket_set *sa)
{
FD_ZERO(&sa->fds);
sa->maxfd = -1;
}
static void
add_socket_to_set(socket_set *sa, int fd, int idx)
{
if (fd < 0 || fd >= FD_SETSIZE)
{
/*
* Doing a hard exit here is a bit grotty, but it doesn't seem worth
* complicating the API to make it less grotty.
*/
fprintf(stderr, "too many client connections for select()\n");
exit(1);
}
FD_SET(fd, &sa->fds);
if (fd > sa->maxfd)
sa->maxfd = fd;
}
static int
wait_on_socket_set(socket_set *sa, int64 usecs)
{
if (usecs > 0)
{
struct timeval timeout;
timeout.tv_sec = usecs / 1000000;
timeout.tv_usec = usecs % 1000000;
return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
}
else
{
return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
}
}
static bool
socket_has_input(socket_set *sa, int fd, int idx)
{
return (FD_ISSET(fd, &sa->fds) != 0);
}
#endif /* POLL_USING_SELECT */
/* partial pthread implementation for Windows */ /* partial pthread implementation for Windows */
#ifdef WIN32
typedef struct win32_pthread typedef struct win32_pthread
{ {
HANDLE handle; HANDLE handle;
......
...@@ -443,6 +443,9 @@ ...@@ -443,6 +443,9 @@
/* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */ /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */
#undef HAVE_PPC_LWARX_MUTEX_HINT #undef HAVE_PPC_LWARX_MUTEX_HINT
/* Define to 1 if you have the `ppoll' function. */
#undef HAVE_PPOLL
/* Define to 1 if you have the `pstat' function. */ /* Define to 1 if you have the `pstat' function. */
#undef HAVE_PSTAT #undef HAVE_PSTAT
......
...@@ -327,6 +327,9 @@ ...@@ -327,6 +327,9 @@
/* Define to 1 if you have the `posix_fallocate' function. */ /* Define to 1 if you have the `posix_fallocate' function. */
/* #undef HAVE_POSIX_FALLOCATE */ /* #undef HAVE_POSIX_FALLOCATE */
/* Define to 1 if you have the `ppoll' function. */
/* #undef HAVE_PPOLL */
/* Define to 1 if you have the `pstat' function. */ /* Define to 1 if you have the `pstat' function. */
/* #undef HAVE_PSTAT */ /* #undef HAVE_PSTAT */
......
...@@ -6,6 +6,7 @@ if test x"$PREFERRED_SEMAPHORES" = x"" ; then ...@@ -6,6 +6,7 @@ if test x"$PREFERRED_SEMAPHORES" = x"" ; then
fi fi
# Force _GNU_SOURCE on; plperl is broken with Perl 5.8.0 otherwise # Force _GNU_SOURCE on; plperl is broken with Perl 5.8.0 otherwise
# This is also required for ppoll(2), and perhaps other things
CPPFLAGS="$CPPFLAGS -D_GNU_SOURCE" CPPFLAGS="$CPPFLAGS -D_GNU_SOURCE"
# If --enable-profiling is specified, we need -DLINUX_PROFILE # If --enable-profiling is specified, we need -DLINUX_PROFILE
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment