Commit 7bb0e974 authored by Robert Haas's avatar Robert Haas

Extend shm_mq API with new functions shm_mq_sendv, shm_mq_set_handle.

shm_mq_sendv sends a message to the queue assembled from multiple
locations.  This is expected to be used by forthcoming patches to
allow frontend/backend protocol messages to be sent via shm_mq, but
might be useful for other purposes as well.

shm_mq_set_handle associates a BackgroundWorkerHandle with an
already-existing shm_mq_handle.  This solves a timing problem when
creating a shm_mq to communicate with a newly-launched background
worker: if you attach to the queue first, and the background worker
fails to start, you might block forever trying to do I/O on the queue;
but if you start the background worker first, but then die before
attaching to the queue, the background worrker might block forever
trying to do I/O on the queue.  This lets you attach before starting
the worker (so that the worker is protected) and then associate the
BackgroundWorkerHandle later (so that you are also protected).

Patch by me, reviewed by Stephen Frost.
parent df630b0d
...@@ -139,7 +139,7 @@ struct shm_mq_handle ...@@ -139,7 +139,7 @@ struct shm_mq_handle
}; };
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
void *data, bool nowait, Size *bytes_written); const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
bool nowait, Size *nbytesp, void **datap); bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr, static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
...@@ -300,8 +300,34 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) ...@@ -300,8 +300,34 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
return mqh; return mqh;
} }
/*
* Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
* been passed to shm_mq_attach.
*/
void
shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
{
Assert(mqh->mqh_handle == NULL);
mqh->mqh_handle = handle;
}
/* /*
* Write a message into a shared message queue. * Write a message into a shared message queue.
*/
shm_mq_result
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
{
shm_mq_iovec iov;
iov.data = data;
iov.len = nbytes;
return shm_mq_sendv(mqh, &iov, 1, nowait);
}
/*
* Write a message into a shared message queue, gathered from multiple
* addresses.
* *
* When nowait = false, we'll wait on our process latch when the ring buffer * When nowait = false, we'll wait on our process latch when the ring buffer
* fills up, and then continue writing once the receiver has drained some data. * fills up, and then continue writing once the receiver has drained some data.
...@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) ...@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
* the length or payload will corrupt the queue.) * the length or payload will corrupt the queue.)
*/ */
shm_mq_result shm_mq_result
shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait) shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{ {
shm_mq_result res; shm_mq_result res;
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
Size nbytes = 0;
Size bytes_written; Size bytes_written;
int i;
int which_iov = 0;
Size offset;
Assert(mq->mq_sender == MyProc); Assert(mq->mq_sender == MyProc);
/* Compute total size of write. */
for (i = 0; i < iovcnt; ++i)
nbytes += iov[i].len;
/* Try to write, or finish writing, the length word into the buffer. */ /* Try to write, or finish writing, the length word into the buffer. */
while (!mqh->mqh_length_word_complete) while (!mqh->mqh_length_word_complete)
{ {
...@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait) ...@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
/* Write the actual data bytes into the buffer. */ /* Write the actual data bytes into the buffer. */
Assert(mqh->mqh_partial_bytes <= nbytes); Assert(mqh->mqh_partial_bytes <= nbytes);
res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes, offset = mqh->mqh_partial_bytes;
((char *) data) + mqh->mqh_partial_bytes, do
nowait, &bytes_written); {
if (res == SHM_MQ_WOULD_BLOCK) Size chunksize;
mqh->mqh_partial_bytes += bytes_written;
/* Figure out which bytes need to be sent next. */
if (offset >= iov[which_iov].len)
{
offset -= iov[which_iov].len;
++which_iov;
if (which_iov >= iovcnt)
break;
continue;
}
/*
* We want to avoid copying the data if at all possible, but every
* chunk of bytes we write into the queue has to be MAXALIGN'd,
* except the last. Thus, if a chunk other than the last one ends
* on a non-MAXALIGN'd boundary, we have to combine the tail end of
* its data with data from one or more following chunks until we
* either reach the last chunk or accumulate a number of bytes which
* is MAXALIGN'd.
*/
if (which_iov + 1 < iovcnt &&
offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
{
char tmpbuf[MAXIMUM_ALIGNOF];
int j = 0;
for (;;)
{
if (offset < iov[which_iov].len)
{
tmpbuf[j] = iov[which_iov].data[offset];
j++;
offset++;
if (j == MAXIMUM_ALIGNOF)
break;
}
else else
{ {
mqh->mqh_partial_bytes = 0; offset -= iov[which_iov].len;
mqh->mqh_length_word_complete = false; which_iov++;
if (which_iov >= iovcnt)
break;
}
} }
res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
mqh->mqh_partial_bytes += bytes_written;
if (res != SHM_MQ_SUCCESS)
return res;
continue;
}
/*
* If this is the last chunk, we can write all the data, even if it
* isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
* MAXALIGN_DOWN the write size.
*/
chunksize = iov[which_iov].len - offset;
if (which_iov + 1 < iovcnt)
chunksize = MAXALIGN_DOWN(chunksize);
res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
nowait, &bytes_written);
mqh->mqh_partial_bytes += bytes_written;
offset += bytes_written;
if (res != SHM_MQ_SUCCESS) if (res != SHM_MQ_SUCCESS)
return res; return res;
} while (mqh->mqh_partial_bytes < nbytes);
/* Reset for next message. */
mqh->mqh_partial_bytes = 0;
mqh->mqh_length_word_complete = false;
/* Notify receiver of the newly-written data, and return. */ /* Notify receiver of the newly-written data, and return. */
return shm_mq_notify_receiver(mq); return shm_mq_notify_receiver(mq);
...@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq) ...@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
* Write bytes into a shared message queue. * Write bytes into a shared message queue.
*/ */
static shm_mq_result static shm_mq_result
shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait, shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
Size *bytes_written) bool nowait, Size *bytes_written)
{ {
shm_mq *mq = mqh->mqh_queue; shm_mq *mq = mqh->mqh_queue;
Size sent = 0; Size sent = 0;
......
...@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq; ...@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
struct shm_mq_handle; struct shm_mq_handle;
typedef struct shm_mq_handle shm_mq_handle; typedef struct shm_mq_handle shm_mq_handle;
/* Descriptors for a single write spanning multiple locations. */
typedef struct
{
const char *data;
Size len;
} shm_mq_iovec;
/* Possible results of a send or receive operation. */ /* Possible results of a send or receive operation. */
typedef enum typedef enum
{ {
...@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *); ...@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
BackgroundWorkerHandle *handle); BackgroundWorkerHandle *handle);
/* Associate worker handle with shm_mq. */
extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
/* Break connection. */ /* Break connection. */
extern void shm_mq_detach(shm_mq *); extern void shm_mq_detach(shm_mq *);
/* Send or receive messages. */ /* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
Size nbytes, void *data, bool nowait); Size nbytes, const void *data, bool nowait);
extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
shm_mq_iovec *iov, int iovcnt, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
Size *nbytesp, void **datap, bool nowait); Size *nbytesp, void **datap, bool nowait);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment