Commit cd89965a authored by Tom Lane's avatar Tom Lane

Fix bogus when-to-deregister-from-listener-array logic.

Since a backend adds itself to the global listener array during
Exec_ListenPreCommit, it's inappropriate for it to remove itself during
Exec_UnlistenCommit or Exec_UnlistenAllCommit --- that leads to failure
when committing a transaction that did UNLISTEN then LISTEN, since we end
up not registered though we should be.  (This leads to missing later
notifications, or to Assert failures in assert-enabled builds.)  Instead
deal with deregistering at the bottom of AtCommit_Notify, when we know the
final state of the listenChannels list.

Also, simplify the representation of registration status by replacing the
transient backendHasExecutedInitialListen flag with an amRegisteredListener
flag.

Per report from Greg Sabino Mullane.  Back-patch to 9.0, where the problem
was introduced during the LISTEN/NOTIFY rewrite.
parent fdf9e211
...@@ -350,12 +350,12 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0; ...@@ -350,12 +350,12 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */ /* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false; static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */ /* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false; static bool backendHasSentNotifications = false;
/* has this backend executed its first LISTEN in the current transaction? */
static bool backendHasExecutedInitialListen = false;
/* GUC parameter */ /* GUC parameter */
bool Trace_notify = false; bool Trace_notify = false;
...@@ -724,6 +724,7 @@ static void ...@@ -724,6 +724,7 @@ static void
Async_UnlistenOnExit(int code, Datum arg) Async_UnlistenOnExit(int code, Datum arg)
{ {
Exec_UnlistenAllCommit(); Exec_UnlistenAllCommit();
asyncQueueUnregister();
} }
/* /*
...@@ -768,8 +769,6 @@ PreCommit_Notify(void) ...@@ -768,8 +769,6 @@ PreCommit_Notify(void)
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify"); elog(DEBUG1, "PreCommit_Notify");
Assert(backendHasExecutedInitialListen == false);
/* Preflight for any pending listen/unlisten actions */ /* Preflight for any pending listen/unlisten actions */
foreach(p, pendingActions) foreach(p, pendingActions)
{ {
...@@ -892,11 +891,9 @@ AtCommit_Notify(void) ...@@ -892,11 +891,9 @@ AtCommit_Notify(void)
} }
} }
/* /* If no longer listening to anything, get out of listener array */
* If we did an initial LISTEN, listenChannels now has the entry, so we no if (amRegisteredListener && listenChannels == NIL)
* longer need or want the flag to be set. asyncQueueUnregister();
*/
backendHasExecutedInitialListen = false;
/* And clean up */ /* And clean up */
ClearPendingActionsAndNotifies(); ClearPendingActionsAndNotifies();
...@@ -914,19 +911,12 @@ Exec_ListenPreCommit(void) ...@@ -914,19 +911,12 @@ Exec_ListenPreCommit(void)
* Nothing to do if we are already listening to something, nor if we * Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction. * already ran this routine in this transaction.
*/ */
if (listenChannels != NIL || backendHasExecutedInitialListen) if (amRegisteredListener)
return; return;
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
/*
* We need this variable to detect an aborted initial LISTEN. In that case
* we would set up our pointer but not listen on any channel. This flag
* gets cleared in AtCommit_Notify or AtAbort_Notify().
*/
backendHasExecutedInitialListen = true;
/* /*
* Before registering, make sure we will unlisten before dying. (Note: * Before registering, make sure we will unlisten before dying. (Note:
* this action does not get undone if we abort later.) * this action does not get undone if we abort later.)
...@@ -950,6 +940,9 @@ Exec_ListenPreCommit(void) ...@@ -950,6 +940,9 @@ Exec_ListenPreCommit(void)
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock); LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
amRegisteredListener = true;
/* /*
* Try to move our pointer forward as far as possible. This will skip over * Try to move our pointer forward as far as possible. This will skip over
* already-committed notifications. Still, we could get notifications that * already-committed notifications. Still, we could get notifications that
...@@ -1022,10 +1015,6 @@ Exec_UnlistenCommit(const char *channel) ...@@ -1022,10 +1015,6 @@ Exec_UnlistenCommit(const char *channel)
* We do not complain about unlistening something not being listened; * We do not complain about unlistening something not being listened;
* should we? * should we?
*/ */
/* If no longer listening to anything, get out of listener array */
if (listenChannels == NIL)
asyncQueueUnregister();
} }
/* /*
...@@ -1041,8 +1030,6 @@ Exec_UnlistenAllCommit(void) ...@@ -1041,8 +1030,6 @@ Exec_UnlistenAllCommit(void)
list_free_deep(listenChannels); list_free_deep(listenChannels);
listenChannels = NIL; listenChannels = NIL;
asyncQueueUnregister();
} }
/* /*
...@@ -1160,6 +1147,9 @@ asyncQueueUnregister(void) ...@@ -1160,6 +1147,9 @@ asyncQueueUnregister(void)
Assert(listenChannels == NIL); /* else caller error */ Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
LWLockAcquire(AsyncQueueLock, LW_SHARED); LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* check if entry is valid and oldest ... */ /* check if entry is valid and oldest ... */
advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
...@@ -1168,6 +1158,9 @@ asyncQueueUnregister(void) ...@@ -1168,6 +1158,9 @@ asyncQueueUnregister(void)
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
LWLockRelease(AsyncQueueLock); LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
/* If we were the laziest backend, try to advance the tail pointer */ /* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail) if (advanceTail)
asyncQueueAdvanceTail(); asyncQueueAdvanceTail();
...@@ -1524,21 +1517,12 @@ void ...@@ -1524,21 +1517,12 @@ void
AtAbort_Notify(void) AtAbort_Notify(void)
{ {
/* /*
* If we LISTEN but then roll back the transaction we have set our pointer * If we LISTEN but then roll back the transaction after PreCommit_Notify,
* but have not made any entry in listenChannels. In that case, remove our * we have registered as a listener but have not made any entry in
* pointer again. * listenChannels. In that case, deregister again.
*/ */
if (backendHasExecutedInitialListen) if (amRegisteredListener && listenChannels == NIL)
{ asyncQueueUnregister();
/*
* Checking listenChannels should be redundant but it can't hurt doing
* it for safety reasons.
*/
if (listenChannels == NIL)
asyncQueueUnregister();
backendHasExecutedInitialListen = false;
}
/* And clean up */ /* And clean up */
ClearPendingActionsAndNotifies(); ClearPendingActionsAndNotifies();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment