Commit 3bd261ca authored by Robert Haas's avatar Robert Haas

Improve shm_mq portability around MAXIMUM_ALIGNOF and sizeof(Size).

Revise the original decision to expose a uint64-based interface and
use Size everywhere possible.  Avoid assuming that MAXIMUM_ALIGNOF is
8, or making any assumption about the relationship between that value
and sizeof(Size).  If MAXIMUM_ALIGNOF is bigger, we'll now insert
padding after the length word; if it's smaller, we are now prepared
to read and write the length word in chunks.

Per discussion with Tom Lane.
parent 19f2d6cd
...@@ -72,7 +72,7 @@ struct shm_mq ...@@ -72,7 +72,7 @@ struct shm_mq
PGPROC *mq_sender; PGPROC *mq_sender;
uint64 mq_bytes_read; uint64 mq_bytes_read;
uint64 mq_bytes_written; uint64 mq_bytes_written;
uint64 mq_ring_size; Size mq_ring_size;
bool mq_detached; bool mq_detached;
uint8 mq_ring_offset; uint8 mq_ring_offset;
char mq_ring[FLEXIBLE_ARRAY_MEMBER]; char mq_ring[FLEXIBLE_ARRAY_MEMBER];
...@@ -103,15 +103,16 @@ struct shm_mq ...@@ -103,15 +103,16 @@ struct shm_mq
* locally by copying the chunks into a backend-local buffer. mqh_buffer is * locally by copying the chunks into a backend-local buffer. mqh_buffer is
* the buffer, and mqh_buflen is the number of bytes allocated for it. * the buffer, and mqh_buflen is the number of bytes allocated for it.
* *
* mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
* are used to track the state of non-blocking operations. When the caller * are used to track the state of non-blocking operations. When the caller
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
* are expected to retry the call at a later time with the same argument; * are expected to retry the call at a later time with the same argument;
* we need to retain enough state to pick up where we left off. * we need to retain enough state to pick up where we left off.
* mqh_did_length_word tracks whether we read or wrote the length word, * mqh_length_word_complete tracks whether we are done sending or receiving
* mqh_partial_message_bytes tracks the number of payload bytes read or * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
* written, and mqh_expected_bytes - which is used only for reads - tracks * the number of bytes read or written for either the length word or the
* the expected total size of the payload. * message itself, and mqh_expected_bytes - which is used only for reads -
* tracks the expected total size of the payload.
* *
* mqh_counterparty_attached tracks whether we know the counterparty to have * mqh_counterparty_attached tracks whether we know the counterparty to have
* attached to the queue at some previous point. This lets us avoid some * attached to the queue at some previous point. This lets us avoid some
...@@ -128,25 +129,25 @@ struct shm_mq_handle ...@@ -128,25 +129,25 @@ struct shm_mq_handle
dsm_segment *mqh_segment; dsm_segment *mqh_segment;
BackgroundWorkerHandle *mqh_handle; BackgroundWorkerHandle *mqh_handle;
char *mqh_buffer; char *mqh_buffer;
uint64 mqh_buflen; Size mqh_buflen;
uint64 mqh_consume_pending; Size mqh_consume_pending;
uint64 mqh_partial_message_bytes; Size mqh_partial_bytes;
uint64 mqh_expected_bytes; Size mqh_expected_bytes;
bool mqh_did_length_word; bool mqh_length_word_complete;
bool mqh_counterparty_attached; bool mqh_counterparty_attached;
MemoryContext mqh_context; MemoryContext mqh_context;
}; };
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes, static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
void *data, bool nowait, uint64 *bytes_written); void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
bool nowait, uint64 *nbytesp, void **datap); bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr, static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
BackgroundWorkerHandle *handle); BackgroundWorkerHandle *handle);
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n); static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached); static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n); static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
...@@ -163,7 +164,7 @@ shm_mq * ...@@ -163,7 +164,7 @@ shm_mq *
shm_mq_create(void *address, Size size) shm_mq_create(void *address, Size size)
{ {
shm_mq *mq = address; shm_mq *mq = address;
uint64 data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
/* If the size isn't MAXALIGN'd, just discard the odd bytes. */ /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
size = MAXALIGN_DOWN(size); size = MAXALIGN_DOWN(size);
...@@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) ...@@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
mqh->mqh_buflen = 0; mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0; mqh->mqh_consume_pending = 0;
mqh->mqh_context = CurrentMemoryContext; mqh->mqh_context = CurrentMemoryContext;
mqh->mqh_partial_message_bytes = 0; mqh->mqh_partial_bytes = 0;
mqh->mqh_did_length_word = false; mqh->mqh_length_word_complete = false;
mqh->mqh_counterparty_attached = false; mqh->mqh_counterparty_attached = false;
if (seg != NULL) if (seg != NULL)
...@@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) ...@@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
* the length or payload will corrupt the queue.) * the length or payload will corrupt the queue.)
*/ */
shm_mq_result shm_mq_result
shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
{ {
shm_mq_result res; shm_mq_result res;
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
uint64 bytes_written; Size bytes_written;
Assert(mq->mq_sender == MyProc); Assert(mq->mq_sender == MyProc);
/* Write the message length into the buffer. */ /* Try to write, or finish writing, the length word into the buffer. */
if (!mqh->mqh_did_length_word) while (!mqh->mqh_length_word_complete)
{ {
res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait, Assert(mqh->mqh_partial_bytes < sizeof(Size));
&bytes_written); res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
((char *) &nbytes) + mqh->mqh_partial_bytes,
nowait, &bytes_written);
mqh->mqh_partial_bytes += bytes_written;
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
return res; return res;
/* if (mqh->mqh_partial_bytes >= sizeof(Size))
* We're sure to have sent the length in full, since we always {
* write a MAXALIGN'd chunk. Assert(mqh->mqh_partial_bytes == sizeof(Size));
*/
Assert(bytes_written == MAXALIGN64(sizeof(uint64))); mqh->mqh_partial_bytes = 0;
mqh->mqh_did_length_word = true; mqh->mqh_length_word_complete = true;
}
/* Length word can't be split unless bigger than required alignment. */
Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
} }
/* Write the actual data bytes into the buffer. */ /* Write the actual data bytes into the buffer. */
Assert(mqh->mqh_partial_message_bytes <= nbytes); Assert(mqh->mqh_partial_bytes <= nbytes);
res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes, res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
((char *) data) + mqh->mqh_partial_message_bytes, ((char *) data) + mqh->mqh_partial_bytes,
nowait, &bytes_written); nowait, &bytes_written);
if (res == SHM_MQ_WOULD_BLOCK) if (res == SHM_MQ_WOULD_BLOCK)
mqh->mqh_partial_message_bytes += bytes_written; mqh->mqh_partial_bytes += bytes_written;
else else
{ {
mqh->mqh_partial_message_bytes = 0; mqh->mqh_partial_bytes = 0;
mqh->mqh_did_length_word = false; mqh->mqh_length_word_complete = false;
} }
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
return res; return res;
...@@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) ...@@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
* function again after the process latch has been set. * function again after the process latch has been set.
*/ */
shm_mq_result shm_mq_result
shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
{ {
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
shm_mq_result res; shm_mq_result res;
uint64 rb = 0; Size rb = 0;
uint64 nbytes; Size nbytes;
uint64 needed;
void *rawdata; void *rawdata;
Assert(mq->mq_receiver == MyProc); Assert(mq->mq_receiver == MyProc);
...@@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
mqh->mqh_consume_pending = 0; mqh->mqh_consume_pending = 0;
} }
/* Determine the message length. */ /* Try to read, or finish reading, the length word from the buffer. */
if (mqh->mqh_did_length_word) while (!mqh->mqh_length_word_complete)
{
/* We've partially received a message; recall expected length. */
nbytes = mqh->mqh_expected_bytes;
}
else
{ {
/* Try to receive the message length word. */ /* Try to receive the message length word. */
res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata); Assert(mqh->mqh_partial_bytes < sizeof(Size));
res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
return res; return res;
Assert(rb >= sizeof(uint64));
memcpy(&nbytes, rawdata, sizeof(uint64)); /*
mqh->mqh_expected_bytes = nbytes; * Hopefully, we'll receive the entire message length word at once.
* But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
* multiple reads.
*/
if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
{
Size needed;
nbytes = * (Size *) rawdata;
/* If we've already got the whole message, we're done. */ /* If we've already got the whole message, we're done. */
needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes); needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
if (rb >= needed) if (rb >= needed)
{ {
/* /*
* Technically, we could consume the message length information at * Technically, we could consume the message length information
* this point, but the extra write to shared memory wouldn't be * at this point, but the extra write to shared memory wouldn't
* free and in most cases we would reap no benefit. * be free and in most cases we would reap no benefit.
*/ */
mqh->mqh_consume_pending = needed; mqh->mqh_consume_pending = needed;
*nbytesp = nbytes; *nbytesp = nbytes;
*datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64)); *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
return SHM_MQ_SUCCESS; return SHM_MQ_SUCCESS;
} }
/* Consume the length word. */ /*
shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64))); * We don't have the whole message, but we at least have the whole
mqh->mqh_did_length_word = true; * length word.
rb -= MAXALIGN64(sizeof(uint64)); */
mqh->mqh_expected_bytes = nbytes;
mqh->mqh_length_word_complete = true;
shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
rb -= MAXALIGN(sizeof(Size));
} }
else
{
Size lengthbytes;
/* Can't be split unless bigger than required alignment. */
Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
if (mqh->mqh_partial_message_bytes == 0) /* Message word is split; need buffer to reassemble. */
if (mqh->mqh_buffer == NULL)
{
mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
MQH_INITIAL_BUFSIZE);
mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
}
Assert(mqh->mqh_buflen >= sizeof(Size));
/* Copy and consume partial length word. */
if (mqh->mqh_partial_bytes + rb > sizeof(Size))
lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
else
lengthbytes = rb - mqh->mqh_partial_bytes;
memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
lengthbytes);
mqh->mqh_partial_bytes += lengthbytes;
shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
rb -= lengthbytes;
/* If we now have the whole word, we're ready to read payload. */
if (mqh->mqh_partial_bytes >= sizeof(Size))
{
Assert(mqh->mqh_partial_bytes == sizeof(Size));
mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer;
mqh->mqh_length_word_complete = true;
mqh->mqh_partial_bytes = 0;
}
}
}
nbytes = mqh->mqh_expected_bytes;
if (mqh->mqh_partial_bytes == 0)
{ {
/* /*
* Try to obtain the whole message in a single chunk. If this works, * Try to obtain the whole message in a single chunk. If this works,
...@@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
return res; return res;
if (rb >= nbytes) if (rb >= nbytes)
{ {
mqh->mqh_did_length_word = false; mqh->mqh_length_word_complete = false;
mqh->mqh_consume_pending = MAXALIGN64(nbytes); mqh->mqh_consume_pending = MAXALIGN(nbytes);
*nbytesp = nbytes; *nbytesp = nbytes;
*datap = rawdata; *datap = rawdata;
return SHM_MQ_SUCCESS; return SHM_MQ_SUCCESS;
...@@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
*/ */
if (mqh->mqh_buflen < nbytes) if (mqh->mqh_buflen < nbytes)
{ {
uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
while (newbuflen < nbytes) while (newbuflen < nbytes)
newbuflen *= 2; newbuflen *= 2;
...@@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
/* Loop until we've copied the entire message. */ /* Loop until we've copied the entire message. */
for (;;) for (;;)
{ {
uint64 still_needed; Size still_needed;
/* Copy as much as we can. */ /* Copy as much as we can. */
Assert(mqh->mqh_partial_message_bytes + rb <= nbytes); Assert(mqh->mqh_partial_bytes + rb <= nbytes);
memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb); memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
mqh->mqh_partial_message_bytes += rb; mqh->mqh_partial_bytes += rb;
/* /*
* Update count of bytes read, with alignment padding. Note * Update count of bytes read, with alignment padding. Note
...@@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
* end of a message, because the buffer size is a multiple of * end of a message, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read and write is as well. * MAXIMUM_ALIGNOF, and each read and write is as well.
*/ */
Assert(mqh->mqh_partial_message_bytes == nbytes || Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
rb == MAXALIGN64(rb)); shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
/* If we got all the data, exit the loop. */ /* If we got all the data, exit the loop. */
if (mqh->mqh_partial_message_bytes >= nbytes) if (mqh->mqh_partial_bytes >= nbytes)
break; break;
/* Wait for some more data. */ /* Wait for some more data. */
still_needed = nbytes - mqh->mqh_partial_message_bytes; still_needed = nbytes - mqh->mqh_partial_bytes;
res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata); res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
return res; return res;
...@@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) ...@@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
/* Return the complete message, and reset for next message. */ /* Return the complete message, and reset for next message. */
*nbytesp = nbytes; *nbytesp = nbytes;
*datap = mqh->mqh_buffer; *datap = mqh->mqh_buffer;
mqh->mqh_did_length_word = false; mqh->mqh_length_word_complete = false;
mqh->mqh_partial_message_bytes = 0; mqh->mqh_partial_bytes = 0;
return SHM_MQ_SUCCESS; return SHM_MQ_SUCCESS;
} }
...@@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq) ...@@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq)
* Write bytes into a shared message queue. * Write bytes into a shared message queue.
*/ */
static shm_mq_result static shm_mq_result
shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
uint64 *bytes_written) Size *bytes_written)
{ {
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
uint64 sent = 0; Size sent = 0;
uint64 used; uint64 used;
uint64 ringsize = mq->mq_ring_size; Size ringsize = mq->mq_ring_size;
uint64 available; Size available;
while (sent < nbytes) while (sent < nbytes)
{ {
...@@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, ...@@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
res = shm_mq_notify_receiver(mq); res = shm_mq_notify_receiver(mq);
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
{ {
*bytes_written = res; *bytes_written = sent;
return res; return res;
} }
...@@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, ...@@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
} }
else else
{ {
uint64 offset = mq->mq_bytes_written % ringsize; Size offset = mq->mq_bytes_written % (uint64) ringsize;
uint64 sendnow = Min(available, ringsize - offset); Size sendnow = Min(available, ringsize - offset);
/* Write as much data as we can via a single memcpy(). */ /* Write as much data as we can via a single memcpy(). */
memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
...@@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, ...@@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
* end of a run of bytes, because the buffer size is a multiple of * end of a run of bytes, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read is as well. * MAXIMUM_ALIGNOF, and each read is as well.
*/ */
Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow)); Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow)); shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
/* /*
* For efficiency, we don't set the reader's latch here. We'll * For efficiency, we don't set the reader's latch here. We'll
...@@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, ...@@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
* bytes_needed. * bytes_needed.
*/ */
static shm_mq_result static shm_mq_result
shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait, shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
uint64 *nbytesp, void **datap) Size *nbytesp, void **datap)
{ {
Size ringsize = mq->mq_ring_size;
uint64 used; uint64 used;
uint64 ringsize = mq->mq_ring_size;
uint64 written; uint64 written;
for (;;) for (;;)
{ {
uint64 offset; Size offset;
bool detached; bool detached;
/* Get bytes written, so we can compute what's available to read. */ /* Get bytes written, so we can compute what's available to read. */
written = shm_mq_get_bytes_written(mq, &detached); written = shm_mq_get_bytes_written(mq, &detached);
used = written - mq->mq_bytes_read; used = written - mq->mq_bytes_read;
Assert(used <= ringsize); Assert(used <= ringsize);
offset = mq->mq_bytes_read % ringsize; offset = mq->mq_bytes_read % (uint64) ringsize;
/* If we have enough data or buffer has wrapped, we're done. */ /* If we have enough data or buffer has wrapped, we're done. */
if (used >= bytes_needed || offset + used >= ringsize) if (used >= bytes_needed || offset + used >= ringsize)
...@@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached) ...@@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
* Increment the number of bytes read. * Increment the number of bytes read.
*/ */
static void static void
shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n) shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
{ {
PGPROC *sender; PGPROC *sender;
...@@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached) ...@@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
* Increment the number of bytes written. * Increment the number of bytes written.
*/ */
static void static void
shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n) shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
{ {
SpinLockAcquire(&mq->mq_mutex); SpinLockAcquire(&mq->mq_mutex);
mq->mq_bytes_written += n; mq->mq_bytes_written += n;
......
...@@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *); ...@@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *);
/* Send or receive messages. */ /* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
uint64 nbytes, void *data, bool nowait); Size nbytes, void *data, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
uint64 *nbytesp, void **datap, bool nowait); Size *nbytesp, void **datap, bool nowait);
/* Wait for our counterparty to attach to the queue. */ /* Wait for our counterparty to attach to the queue. */
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment