Commit b2ccb5f4 authored by Robert Haas's avatar Robert Haas

shm_mq: Fix failure to notice a dead counterparty when nowait is used.

The shm_mq mechanism was intended to optionally notice when the process
on the other end of the queue fails to attach to the queue.  It does
this by allowing the user to pass a BackgroundWorkerHandle; if the
background worker in question is launched and dies without attaching
to the queue, then we know it never will.  This logic works OK in
blocking mode, but when called with nowait = true we fail to notice
that this has happened due to an asymmetry in the logic.  Repair.

Reported off-list by Rushabh Lathia.  Patch by me.
parent 31ba62ce
...@@ -142,6 +142,8 @@ static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, ...@@ -142,6 +142,8 @@ static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
const void *data, bool nowait, Size *bytes_written); const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
bool nowait, Size *nbytesp, void **datap); bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
BackgroundWorkerHandle *handle);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr, static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
BackgroundWorkerHandle *handle); BackgroundWorkerHandle *handle);
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
...@@ -499,6 +501,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) ...@@ -499,6 +501,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
{ {
if (nowait) if (nowait)
{ {
if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
return SHM_MQ_DETACHED;
if (shm_mq_get_sender(mq) == NULL) if (shm_mq_get_sender(mq) == NULL)
return SHM_MQ_WOULD_BLOCK; return SHM_MQ_WOULD_BLOCK;
} }
...@@ -794,6 +798,11 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, ...@@ -794,6 +798,11 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
*/ */
if (nowait) if (nowait)
{ {
if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
{
*bytes_written = sent;
return SHM_MQ_DETACHED;
}
if (shm_mq_get_receiver(mq) == NULL) if (shm_mq_get_receiver(mq) == NULL)
{ {
*bytes_written = sent; *bytes_written = sent;
...@@ -947,6 +956,45 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, ...@@ -947,6 +956,45 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
} }
} }
/*
* Test whether a counterparty who may not even be alive yet is definitely gone.
*/
static bool
shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
{
bool detached;
pid_t pid;
/* Acquire the lock just long enough to check the pointer. */
SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
/* If the queue has been detached, counterparty is definitely gone. */
if (detached)
return true;
/* If there's a handle, check worker status. */
if (handle != NULL)
{
BgwHandleStatus status;
/* Check for unexpected worker death. */
status = GetBackgroundWorkerPid(handle, &pid);
if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
{
/* Mark it detached, just to make it official. */
SpinLockAcquire(&mq->mq_mutex);
mq->mq_detached = true;
SpinLockRelease(&mq->mq_mutex);
return true;
}
}
/* Counterparty is not definitively gone. */
return false;
}
/* /*
* This is used when a process is waiting for its counterpart to attach to the * This is used when a process is waiting for its counterpart to attach to the
* queue. We exit when the other process attaches as expected, or, if * queue. We exit when the other process attaches as expected, or, if
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment