Commit 34db06ef authored by Robert Haas's avatar Robert Haas

shm_mq: Reduce spinlock usage.

Previously, mq_bytes_read and mq_bytes_written were protected by the
spinlock, but that turns out to cause pretty serious spinlock
contention on queries which send many tuples through a Gather or
Gather Merge node.  This patches changes things so that we instead
read and write those values using 8-byte atomics.  Since mq_bytes_read
can only be changed by the receiver and mq_bytes_written can only be
changed by the sender, the only purpose of the spinlock is to prevent
reads and writes of these values from being torn on platforms where
8-byte memory access is not atomic, making the conversion fairly
straightforward.

Testing shows that this produces some slowdown if we're using emulated
64-bit atomics, but since they should be available on any platform
where performance is a primary concern, that seems OK.  It's faster,
sometimes a lot faster, on platforms where such atomics are available.

Patch by me, reviewed by Andres Freund, who also suggested the
design.  Also tested by Rafia Sabih.

Discussion: http://postgr.es/m/CA+TgmoYuK0XXxmUNTFT9TSNiBtWnRwasBcHHRCOK9iYmDLQVPg@mail.gmail.com
parent 2b8c94e1
...@@ -31,27 +31,28 @@ ...@@ -31,27 +31,28 @@
* Some notes on synchronization: * Some notes on synchronization:
* *
* mq_receiver and mq_bytes_read can only be changed by the receiver; and * mq_receiver and mq_bytes_read can only be changed by the receiver; and
* mq_sender and mq_bytes_written can only be changed by the sender. However, * mq_sender and mq_bytes_written can only be changed by the sender.
* because most of these fields are 8 bytes and we don't assume that 8 byte * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
* reads and writes are atomic, the spinlock must be taken whenever the field * they cannot change once set, and thus may be read without a lock once this
* is updated, and whenever it is read by a process other than the one allowed * is known to be the case.
* to modify it. But the process that is allowed to modify it is also allowed
* to read it without the lock. On architectures where 8-byte writes are
* atomic, we could replace these spinlocks with memory barriers, but
* testing found no performance benefit, so it seems best to keep things
* simple for now.
* *
* mq_detached can be set by either the sender or the receiver, so the mutex * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
* must be held to read or write it. Memory barriers could be used here as * they are written atomically using 8 byte loads and stores. Memory barriers
* well, if needed. * must be carefully used to synchronize reads and writes of these values with
* reads and writes of the actual data in mq_ring.
*
* mq_detached needs no locking. It can be set by either the sender or the
* receiver, but only ever from false to true, so redundant writes don't
* matter. It is important that if we set mq_detached and then set the
* counterparty's latch, the counterparty must be certain to see the change
* after waking up. Since SetLatch begins with a memory barrier and ResetLatch
* ends with one, this should be OK.
* *
* mq_ring_size and mq_ring_offset never change after initialization, and * mq_ring_size and mq_ring_offset never change after initialization, and
* can therefore be read without the lock. * can therefore be read without the lock.
* *
* Importantly, mq_ring can be safely read and written without a lock. Were * Importantly, mq_ring can be safely read and written without a lock.
* this not the case, we'd have to hold the spinlock for much longer * At any given time, the difference between mq_bytes_read and
* intervals, and performance might suffer. Fortunately, that's not
* necessary. At any given time, the difference between mq_bytes_read and
* mq_bytes_written defines the number of bytes within mq_ring that contain * mq_bytes_written defines the number of bytes within mq_ring that contain
* unread data, and mq_bytes_read defines the position where those bytes * unread data, and mq_bytes_read defines the position where those bytes
* begin. The sender can increase the number of unread bytes at any time, * begin. The sender can increase the number of unread bytes at any time,
...@@ -71,8 +72,8 @@ struct shm_mq ...@@ -71,8 +72,8 @@ struct shm_mq
slock_t mq_mutex; slock_t mq_mutex;
PGPROC *mq_receiver; PGPROC *mq_receiver;
PGPROC *mq_sender; PGPROC *mq_sender;
uint64 mq_bytes_read; pg_atomic_uint64 mq_bytes_read;
uint64 mq_bytes_written; pg_atomic_uint64 mq_bytes_written;
Size mq_ring_size; Size mq_ring_size;
bool mq_detached; bool mq_detached;
uint8 mq_ring_offset; uint8 mq_ring_offset;
...@@ -150,11 +151,8 @@ static bool shm_mq_counterparty_gone(shm_mq *mq, ...@@ -150,11 +151,8 @@ static bool shm_mq_counterparty_gone(shm_mq *mq,
BackgroundWorkerHandle *handle); BackgroundWorkerHandle *handle);
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
BackgroundWorkerHandle *handle); BackgroundWorkerHandle *handle);
static uint64 shm_mq_get_bytes_read(shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
static uint64 shm_mq_get_bytes_written(shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
static shm_mq_result shm_mq_notify_receiver(shm_mq *mq);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
/* Minimum queue size is enough for header and at least one chunk of data. */ /* Minimum queue size is enough for header and at least one chunk of data. */
...@@ -182,8 +180,8 @@ shm_mq_create(void *address, Size size) ...@@ -182,8 +180,8 @@ shm_mq_create(void *address, Size size)
SpinLockInit(&mq->mq_mutex); SpinLockInit(&mq->mq_mutex);
mq->mq_receiver = NULL; mq->mq_receiver = NULL;
mq->mq_sender = NULL; mq->mq_sender = NULL;
mq->mq_bytes_read = 0; pg_atomic_init_u64(&mq->mq_bytes_read, 0);
mq->mq_bytes_written = 0; pg_atomic_init_u64(&mq->mq_bytes_written, 0);
mq->mq_ring_size = size - data_offset; mq->mq_ring_size = size - data_offset;
mq->mq_detached = false; mq->mq_detached = false;
mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
...@@ -348,6 +346,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) ...@@ -348,6 +346,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{ {
shm_mq_result res; shm_mq_result res;
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
PGPROC *receiver;
Size nbytes = 0; Size nbytes = 0;
Size bytes_written; Size bytes_written;
int i; int i;
...@@ -488,8 +487,30 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) ...@@ -488,8 +487,30 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
mqh->mqh_partial_bytes = 0; mqh->mqh_partial_bytes = 0;
mqh->mqh_length_word_complete = false; mqh->mqh_length_word_complete = false;
/* If queue has been detached, let caller know. */
if (mq->mq_detached)
return SHM_MQ_DETACHED;
/*
* If the counterpary is known to have attached, we can read mq_receiver
* without acquiring the spinlock and assume it isn't NULL. Otherwise,
* more caution is needed.
*/
if (mqh->mqh_counterparty_attached)
receiver = mq->mq_receiver;
else
{
SpinLockAcquire(&mq->mq_mutex);
receiver = mq->mq_receiver;
SpinLockRelease(&mq->mq_mutex);
if (receiver == NULL)
return SHM_MQ_SUCCESS;
mqh->mqh_counterparty_attached = true;
}
/* Notify receiver of the newly-written data, and return. */ /* Notify receiver of the newly-written data, and return. */
return shm_mq_notify_receiver(mq); SetLatch(&receiver->procLatch);
return SHM_MQ_SUCCESS;
} }
/* /*
...@@ -843,18 +864,28 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, ...@@ -843,18 +864,28 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
while (sent < nbytes) while (sent < nbytes)
{ {
bool detached;
uint64 rb; uint64 rb;
uint64 wb;
/* Compute number of ring buffer bytes used and available. */ /* Compute number of ring buffer bytes used and available. */
rb = shm_mq_get_bytes_read(mq, &detached); rb = pg_atomic_read_u64(&mq->mq_bytes_read);
Assert(mq->mq_bytes_written >= rb); wb = pg_atomic_read_u64(&mq->mq_bytes_written);
used = mq->mq_bytes_written - rb; Assert(wb >= rb);
used = wb - rb;
Assert(used <= ringsize); Assert(used <= ringsize);
available = Min(ringsize - used, nbytes - sent); available = Min(ringsize - used, nbytes - sent);
/* Bail out if the queue has been detached. */ /*
if (detached) * Bail out if the queue has been detached. Note that we would be in
* trouble if the compiler decided to cache the value of
* mq->mq_detached in a register or on the stack across loop
* iterations. It probably shouldn't do that anyway since we'll
* always return, call an external function that performs a system
* call, or reach a memory barrier at some point later in the loop,
* but just to be sure, insert a compiler barrier here.
*/
pg_compiler_barrier();
if (mq->mq_detached)
{ {
*bytes_written = sent; *bytes_written = sent;
return SHM_MQ_DETACHED; return SHM_MQ_DETACHED;
...@@ -895,15 +926,13 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, ...@@ -895,15 +926,13 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
} }
else if (available == 0) else if (available == 0)
{ {
shm_mq_result res; /*
* Since mq->mqh_counterparty_attached is known to be true at this
/* Let the receiver know that we need them to read some data. */ * point, mq_receiver has been set, and it can't change once set.
res = shm_mq_notify_receiver(mq); * Therefore, we can read it without acquiring the spinlock.
if (res != SHM_MQ_SUCCESS) */
{ Assert(mqh->mqh_counterparty_attached);
*bytes_written = sent; SetLatch(&mq->mq_receiver->procLatch);
return res;
}
/* Skip manipulation of our latch if nowait = true. */ /* Skip manipulation of our latch if nowait = true. */
if (nowait) if (nowait)
...@@ -929,10 +958,20 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, ...@@ -929,10 +958,20 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
} }
else else
{ {
Size offset = mq->mq_bytes_written % (uint64) ringsize; Size offset;
Size sendnow = Min(available, ringsize - offset); Size sendnow;
/* Write as much data as we can via a single memcpy(). */ offset = wb % (uint64) ringsize;
sendnow = Min(available, ringsize - offset);
/*
* Write as much data as we can via a single memcpy(). Make sure
* these writes happen after the read of mq_bytes_read, above.
* This barrier pairs with the one in shm_mq_inc_bytes_read.
* (Since we're separating the read of mq_bytes_read from a
* subsequent write to mq_ring, we need a full barrier here.)
*/
pg_memory_barrier();
memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
(char *) data + sent, sendnow); (char *) data + sent, sendnow);
sent += sendnow; sent += sendnow;
...@@ -978,19 +1017,27 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, ...@@ -978,19 +1017,27 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
for (;;) for (;;)
{ {
Size offset; Size offset;
bool detached; uint64 read;
/* Get bytes written, so we can compute what's available to read. */ /* Get bytes written, so we can compute what's available to read. */
written = shm_mq_get_bytes_written(mq, &detached); written = pg_atomic_read_u64(&mq->mq_bytes_written);
used = written - mq->mq_bytes_read; read = pg_atomic_read_u64(&mq->mq_bytes_read);
used = written - read;
Assert(used <= ringsize); Assert(used <= ringsize);
offset = mq->mq_bytes_read % (uint64) ringsize; offset = read % (uint64) ringsize;
/* If we have enough data or buffer has wrapped, we're done. */ /* If we have enough data or buffer has wrapped, we're done. */
if (used >= bytes_needed || offset + used >= ringsize) if (used >= bytes_needed || offset + used >= ringsize)
{ {
*nbytesp = Min(used, ringsize - offset); *nbytesp = Min(used, ringsize - offset);
*datap = &mq->mq_ring[mq->mq_ring_offset + offset]; *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
/*
* Separate the read of mq_bytes_written, above, from caller's
* attempt to read the data itself. Pairs with the barrier in
* shm_mq_inc_bytes_written.
*/
pg_read_barrier();
return SHM_MQ_SUCCESS; return SHM_MQ_SUCCESS;
} }
...@@ -1002,7 +1049,7 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, ...@@ -1002,7 +1049,7 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
* receiving a message stored in the buffer even after the sender has * receiving a message stored in the buffer even after the sender has
* detached. * detached.
*/ */
if (detached) if (mq->mq_detached)
return SHM_MQ_DETACHED; return SHM_MQ_DETACHED;
/* Skip manipulation of our latch if nowait = true. */ /* Skip manipulation of our latch if nowait = true. */
...@@ -1032,16 +1079,10 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, ...@@ -1032,16 +1079,10 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
static bool static bool
shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
{ {
bool detached;
pid_t pid; pid_t pid;
/* Acquire the lock just long enough to check the pointer. */
SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
/* If the queue has been detached, counterparty is definitely gone. */ /* If the queue has been detached, counterparty is definitely gone. */
if (detached) if (mq->mq_detached)
return true; return true;
/* If there's a handle, check worker status. */ /* If there's a handle, check worker status. */
...@@ -1054,9 +1095,7 @@ shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) ...@@ -1054,9 +1095,7 @@ shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
{ {
/* Mark it detached, just to make it official. */ /* Mark it detached, just to make it official. */
SpinLockAcquire(&mq->mq_mutex);
mq->mq_detached = true; mq->mq_detached = true;
SpinLockRelease(&mq->mq_mutex);
return true; return true;
} }
} }
...@@ -1085,16 +1124,14 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) ...@@ -1085,16 +1124,14 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
{ {
BgwHandleStatus status; BgwHandleStatus status;
pid_t pid; pid_t pid;
bool detached;
/* Acquire the lock just long enough to check the pointer. */ /* Acquire the lock just long enough to check the pointer. */
SpinLockAcquire(&mq->mq_mutex); SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
result = (*ptr != NULL); result = (*ptr != NULL);
SpinLockRelease(&mq->mq_mutex); SpinLockRelease(&mq->mq_mutex);
/* Fail if detached; else succeed if initialized. */ /* Fail if detached; else succeed if initialized. */
if (detached) if (mq->mq_detached)
{ {
result = false; result = false;
break; break;
...@@ -1126,23 +1163,6 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) ...@@ -1126,23 +1163,6 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
return result; return result;
} }
/*
* Get the number of bytes read. The receiver need not use this to access
* the count of bytes read, but the sender must.
*/
static uint64
shm_mq_get_bytes_read(shm_mq *mq, bool *detached)
{
uint64 v;
SpinLockAcquire(&mq->mq_mutex);
v = mq->mq_bytes_read;
*detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
return v;
}
/* /*
* Increment the number of bytes read. * Increment the number of bytes read.
*/ */
...@@ -1151,63 +1171,50 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n) ...@@ -1151,63 +1171,50 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n)
{ {
PGPROC *sender; PGPROC *sender;
SpinLockAcquire(&mq->mq_mutex); /*
mq->mq_bytes_read += n; * Separate prior reads of mq_ring from the increment of mq_bytes_read
* which follows. Pairs with the full barrier in shm_mq_send_bytes(). We
* only need a read barrier here because the increment of mq_bytes_read is
* actually a read followed by a dependent write.
*/
pg_read_barrier();
/*
* There's no need to use pg_atomic_fetch_add_u64 here, because nobody
* else can be changing this value. This method should be cheaper.
*/
pg_atomic_write_u64(&mq->mq_bytes_read,
pg_atomic_read_u64(&mq->mq_bytes_read) + n);
/*
* We shouldn't have any bytes to read without a sender, so we can read
* mq_sender here without a lock. Once it's initialized, it can't change.
*/
sender = mq->mq_sender; sender = mq->mq_sender;
SpinLockRelease(&mq->mq_mutex);
/* We shouldn't have any bytes to read without a sender. */
Assert(sender != NULL); Assert(sender != NULL);
SetLatch(&sender->procLatch); SetLatch(&sender->procLatch);
} }
/*
* Get the number of bytes written. The sender need not use this to access
* the count of bytes written, but the receiver must.
*/
static uint64
shm_mq_get_bytes_written(shm_mq *mq, bool *detached)
{
uint64 v;
SpinLockAcquire(&mq->mq_mutex);
v = mq->mq_bytes_written;
*detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
return v;
}
/* /*
* Increment the number of bytes written. * Increment the number of bytes written.
*/ */
static void static void
shm_mq_inc_bytes_written(shm_mq *mq, Size n) shm_mq_inc_bytes_written(shm_mq *mq, Size n)
{ {
SpinLockAcquire(&mq->mq_mutex); /*
mq->mq_bytes_written += n; * Separate prior reads of mq_ring from the write of mq_bytes_written
SpinLockRelease(&mq->mq_mutex); * which we're about to do. Pairs with the read barrier found in
} * shm_mq_get_receive_bytes.
*/
/* pg_write_barrier();
* Set receiver's latch, unless queue is detached.
*/ /*
static shm_mq_result * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
shm_mq_notify_receiver(shm_mq *mq) * else can be changing this value. This method avoids taking the bus
{ * lock unnecessarily.
PGPROC *receiver; */
bool detached; pg_atomic_write_u64(&mq->mq_bytes_written,
pg_atomic_read_u64(&mq->mq_bytes_written) + n);
SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
receiver = mq->mq_receiver;
SpinLockRelease(&mq->mq_mutex);
if (detached)
return SHM_MQ_DETACHED;
if (receiver)
SetLatch(&receiver->procLatch);
return SHM_MQ_SUCCESS;
} }
/* Shim for on_dsm_callback. */ /* Shim for on_dsm_callback. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment