Commit 7bae0284 authored by Tom Lane's avatar Tom Lane

Avoid transaction-commit race condition while receiving a NOTIFY message.

Use TransactionIdIsInProgress, then TransactionIdDidCommit, to distinguish
whether a NOTIFY message's originating transaction is in progress,
committed, or aborted.  The previous coding could accept a message from a
transaction that was still in-progress according to the PGPROC array;
if the client were fast enough at starting a new transaction, it might fail
to see table rows added/updated by the message-sending transaction.  Which
of course would usually be the point of receiving the message.  We noted
this type of race condition long ago in tqual.c, but async.c overlooked it.

The race condition probably cannot occur unless there are multiple NOTIFY
senders in action, since an individual backend doesn't send NOTIFY signals
until well after it's done committing.  But if two senders commit in close
succession, it's certainly possible that we could see the second sender's
message within the race condition window while responding to the signal
from the first one.

Per bug #9557 from Marko Tiikkaja.  This patch is slightly more invasive
than what he proposed, since it removes the now-redundant
TransactionIdDidAbort call.

Back-patch to 9.0, where the current NOTIFY implementation was introduced.
parent 16ff08b7
...@@ -126,6 +126,7 @@ ...@@ -126,6 +126,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/procarray.h"
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
...@@ -1978,7 +1979,27 @@ asyncQueueProcessPageEntries(QueuePosition *current, ...@@ -1978,7 +1979,27 @@ asyncQueueProcessPageEntries(QueuePosition *current,
/* Ignore messages destined for other databases */ /* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId) if (qe->dboid == MyDatabaseId)
{ {
if (TransactionIdDidCommit(qe->xid)) if (TransactionIdIsInProgress(qe->xid))
{
/*
* The source transaction is still in progress, so we can't
* process this message yet. Break out of the loop, but first
* back up *current so we will reprocess the message next
* time. (Note: it is unlikely but not impossible for
* TransactionIdDidCommit to fail, so we can't really avoid
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
* Note that we must test TransactionIdIsInProgress before we
* test TransactionIdDidCommit, else we might return a message
* from a transaction that is not yet visible to snapshots;
* compare the comments at the head of tqual.c.
*/
*current = thisentry;
reachedStop = true;
break;
}
else if (TransactionIdDidCommit(qe->xid))
{ {
/* qe->data is the null-terminated channel name */ /* qe->data is the null-terminated channel name */
char *channel = qe->data; char *channel = qe->data;
...@@ -1991,27 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current, ...@@ -1991,27 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
NotifyMyFrontEnd(channel, payload, qe->srcPid); NotifyMyFrontEnd(channel, payload, qe->srcPid);
} }
} }
else if (TransactionIdDidAbort(qe->xid))
{
/*
* If the source transaction aborted, we just ignore its
* notifications.
*/
}
else else
{ {
/* /*
* The transaction has neither committed nor aborted so far, * The source transaction aborted or crashed, so we just
* so we can't process its message yet. Break out of the * ignore its notifications.
* loop, but first back up *current so we will reprocess the
* message next time. (Note: it is unlikely but not
* impossible for TransactionIdDidCommit to fail, so we can't
* really avoid this advance-then-back-up behavior when
* dealing with an uncommitted message.)
*/ */
*current = thisentry;
reachedStop = true;
break;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment