Commit 033eb158 authored by Tom Lane's avatar Tom Lane

Fix LISTEN/NOTIFY race condition reported by Laurent Birtz, by postponing

pg_listener modifications commanded by LISTEN and UNLISTEN until the end
of the current transaction.  This allows us to hold the ExclusiveLock on
pg_listener until after commit, with no greater risk of deadlock than there
was before.  Aside from fixing the race condition, this gets rid of a
truly ugly kludge that was there before, namely having to ignore
HeapTupleBeingUpdated failures during NOTIFY.  There is a small potential
incompatibility, which is that if a transaction issues LISTEN or UNLISTEN
and then looks into pg_listener before committing, it won't see any resulting
row insertion or deletion, where before it would have.  It seems unlikely
that anyone would be depending on that, though.

This patch also disallows LISTEN and UNLISTEN inside a prepared transaction.
That case had some pretty undesirable properties already, such as possibly
allowing pg_listener entries to be made for PIDs no longer present, so
disallowing it seems like a better idea than trying to maintain the behavior.
parent 1c228fa5
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/prepare_transaction.sgml,v 1.6 2007/09/11 00:06:41 tgl Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/prepare_transaction.sgml,v 1.7 2008/03/12 20:11:45 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -88,8 +88,10 @@ PREPARE TRANSACTION <replaceable class="PARAMETER">transaction_id</replaceable> ...@@ -88,8 +88,10 @@ PREPARE TRANSACTION <replaceable class="PARAMETER">transaction_id</replaceable>
<para> <para>
It is not currently allowed to <command>PREPARE</> a transaction that It is not currently allowed to <command>PREPARE</> a transaction that
has executed any operations involving temporary tables or has executed any operations involving temporary tables,
created any cursors <literal>WITH HOLD</>. Those features are too tightly created any cursors <literal>WITH HOLD</>, or executed
<command>LISTEN</> or <command>UNLISTEN</>.
Those features are too tightly
tied to the current session to be useful in a transaction to be prepared. tied to the current session to be useful in a transaction to be prepared.
</para> </para>
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.138 2008/01/01 19:45:48 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.139 2008/03/12 20:11:46 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -52,6 +52,16 @@ ...@@ -52,6 +52,16 @@
* transaction, since by assumption it is only called from outside any * transaction, since by assumption it is only called from outside any
* transaction. * transaction.
* *
* Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
* of pending actions. If we reach transaction commit, the changes are
* applied to pg_listener just before executing any pending NOTIFYs. This
* method is necessary because to avoid race conditions, we must hold lock
* on pg_listener from when we insert a new listener tuple until we commit.
* To do that and not create undue hazard of deadlock, we don't want to
* touch pg_listener until we are otherwise done with the transaction;
* in particular it'd be uncool to still be taking user-commanded locks
* while holding the pg_listener lock.
*
* Although we grab ExclusiveLock on pg_listener for any operation, * Although we grab ExclusiveLock on pg_listener for any operation,
* the lock is never held very long, so it shouldn't cause too much of * the lock is never held very long, so it shouldn't cause too much of
* a performance problem. (Previously we used AccessExclusiveLock, but * a performance problem. (Previously we used AccessExclusiveLock, but
...@@ -75,7 +85,6 @@ ...@@ -75,7 +85,6 @@
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include <netinet/in.h>
#include "access/heapam.h" #include "access/heapam.h"
#include "access/twophase_rmgr.h" #include "access/twophase_rmgr.h"
...@@ -88,11 +97,39 @@ ...@@ -88,11 +97,39 @@
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
* we don't actually modify pg_listener until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
* successful subtransactions attach their lists to their parent's list.
* Failed subtransactions simply discard their lists.
*/
typedef enum
{
LISTEN_LISTEN,
LISTEN_UNLISTEN,
LISTEN_UNLISTEN_ALL
} ListenActionKind;
typedef struct
{
ListenActionKind action;
char condname[1]; /* actually, as long as needed */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
static List *upperPendingActions = NIL; /* list of upper-xact lists */
/* /*
* State for outbound notifies consists of a list of all relnames NOTIFYed * State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until * in the current transaction. We do not actually perform a NOTIFY until
...@@ -103,8 +140,13 @@ ...@@ -103,8 +140,13 @@
* subtransaction has its own list in its own CurTransactionContext, but * subtransaction has its own list in its own CurTransactionContext, but
* successful subtransactions attach their lists to their parent's list. * successful subtransactions attach their lists to their parent's list.
* Failed subtransactions simply discard their lists. * Failed subtransactions simply discard their lists.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
* condition name, it will get a self-notify at commit. This is a bit odd
* but is consistent with our historical behavior.
*/ */
static List *pendingNotifies = NIL; static List *pendingNotifies = NIL; /* list of C strings */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
...@@ -118,8 +160,8 @@ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ ...@@ -118,8 +160,8 @@ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
* does not grok "volatile", you'd be best advised to compile this file * does not grok "volatile", you'd be best advised to compile this file
* with all optimization turned off. * with all optimization turned off.
*/ */
static volatile int notifyInterruptEnabled = 0; static volatile sig_atomic_t notifyInterruptEnabled = 0;
static volatile int notifyInterruptOccurred = 0; static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */ /* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false; static bool unlistenExitRegistered = false;
...@@ -127,15 +169,19 @@ static bool unlistenExitRegistered = false; ...@@ -127,15 +169,19 @@ static bool unlistenExitRegistered = false;
bool Trace_notify = false; bool Trace_notify = false;
static void queue_listen(ListenActionKind action, const char *condname);
static void Async_UnlistenOnExit(int code, Datum arg); static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_Listen(Relation lRel, const char *relname);
static void Exec_Unlisten(Relation lRel, const char *relname);
static void Exec_UnlistenAll(Relation lRel);
static void Send_Notify(Relation lRel);
static void ProcessIncomingNotify(void); static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID); static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
static bool AsyncExistsPendingNotify(const char *relname); static bool AsyncExistsPendingNotify(const char *relname);
static void ClearPendingNotifies(void); static void ClearPendingActionsAndNotifies(void);
/* /*
*--------------------------------------------------------------
* Async_Notify * Async_Notify
* *
* This is executed by the SQL notify command. * This is executed by the SQL notify command.
...@@ -143,8 +189,6 @@ static void ClearPendingNotifies(void); ...@@ -143,8 +189,6 @@ static void ClearPendingNotifies(void);
* Adds the relation to the list of pending notifies. * Adds the relation to the list of pending notifies.
* Actual notification happens during transaction commit. * Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
*--------------------------------------------------------------
*/ */
void void
Async_Notify(const char *relname) Async_Notify(const char *relname)
...@@ -163,6 +207,12 @@ Async_Notify(const char *relname) ...@@ -163,6 +207,12 @@ Async_Notify(const char *relname)
oldcontext = MemoryContextSwitchTo(CurTransactionContext); oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/*
* Ordering of the list isn't important. We choose to put new
* entries on the front, as this might make duplicate-elimination
* a tad faster when the same condition is signaled many times in
* a row.
*/
pendingNotifies = lcons(pstrdup(relname), pendingNotifies); pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
...@@ -170,34 +220,245 @@ Async_Notify(const char *relname) ...@@ -170,34 +220,245 @@ Async_Notify(const char *relname)
} }
/* /*
*-------------------------------------------------------------- * queue_listen
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
* Actual update of pg_listener happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
static void
queue_listen(ListenActionKind action, const char *condname)
{
MemoryContext oldcontext;
ListenAction *actrec;
/*
* Unlike Async_Notify, we don't try to collapse out duplicates.
* It would be too complicated to ensure we get the right interactions
* of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
* there would be any performance benefit anyway in sane applications.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
actrec->action = action;
strcpy(actrec->condname, condname);
pendingActions = lappend(pendingActions, actrec);
MemoryContextSwitchTo(oldcontext);
}
/*
* Async_Listen * Async_Listen
* *
* This is executed by the SQL listen command. * This is executed by the SQL listen command.
*/
void
Async_Listen(const char *relname)
{
if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
queue_listen(LISTEN_LISTEN, relname);
}
/*
* Async_Unlisten
* *
* Register the current backend as listening on the specified * This is executed by the SQL unlisten command.
* relation. */
void
Async_Unlisten(const char *relname)
{
/* Handle specially the `unlisten "*"' command */
if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
{
Async_UnlistenAll();
}
else
{
if (Trace_notify)
elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
queue_listen(LISTEN_UNLISTEN, relname);
}
}
/*
* Async_UnlistenAll
* *
* Side effects: * This is invoked by UNLISTEN "*" command, and also at backend exit.
* pg_listener is updated. */
void
Async_UnlistenAll(void)
{
if (Trace_notify)
elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
queue_listen(LISTEN_UNLISTEN_ALL, "");
}
/*
* Async_UnlistenOnExit
*
* Clean up the pg_listener table at backend exit.
* *
*-------------------------------------------------------------- * This is executed if we have done any LISTENs in this backend.
* It might not be necessary anymore, if the user UNLISTENed everything,
* but we don't try to detect that case.
*/
static void
Async_UnlistenOnExit(int code, Datum arg)
{
/*
* We need to start/commit a transaction for the unlisten, but if there is
* already an active transaction we had better abort that one first.
* Otherwise we'd end up committing changes that probably ought to be
* discarded.
*/
AbortOutOfAnyTransaction();
/* Now we can do the unlisten */
StartTransactionCommand();
Async_UnlistenAll();
CommitTransactionCommand();
}
/*
* AtPrepare_Notify
*
* This is called at the prepare phase of a two-phase
* transaction. Save the state for possible commit later.
*/ */
void void
Async_Listen(const char *relname) AtPrepare_Notify(void)
{
ListCell *p;
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
if (pendingActions)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
/* We can deal with pending NOTIFY though */
foreach(p, pendingNotifies)
{
const char *relname = (const char *) lfirst(p);
RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
relname, strlen(relname) + 1);
}
/*
* We can clear the state immediately, rather than needing a separate
* PostPrepare call, because if the transaction fails we'd just discard
* the state anyway.
*/
ClearPendingActionsAndNotifies();
}
/*
* AtCommit_Notify
*
* This is called at transaction commit.
*
* If there are pending LISTEN/UNLISTEN actions, insert or delete
* tuples in pg_listener accordingly.
*
* If there are outbound notify requests in the pendingNotifies list,
* scan pg_listener for matching tuples, and either signal the other
* backend or send a message to our own frontend.
*
* NOTE: we are still inside the current transaction, therefore can
* piggyback on its committing of changes.
*/
void
AtCommit_Notify(void)
{ {
Relation lRel; Relation lRel;
ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
return; /* no relevant statements in this xact */
/*
* NOTIFY is disabled if not normal processing mode. This test used to be
* in xact.c, but it seems cleaner to do it here.
*/
if (!IsNormalProcessingMode())
{
ClearPendingActionsAndNotifies();
return;
}
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
/* Acquire ExclusiveLock on pg_listener */
lRel = heap_open(ListenerRelationId, ExclusiveLock);
/* Perform any pending listen/unlisten actions */
foreach(p, pendingActions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_Listen(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN:
Exec_Unlisten(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAll(lRel);
break;
}
/* We must CCI after each action in case of conflicting actions */
CommandCounterIncrement();
}
/* Perform any pending notifies */
if (pendingNotifies)
Send_Notify(lRel);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
* until end of transaction (which is about to happen, anyway) to ensure
* that notified backends see our tuple updates when they look. Else they
* might disregard the signal, which would make the application programmer
* very unhappy. Also, this prevents race conditions when we have just
* inserted a listening tuple.
*/
heap_close(lRel, NoLock);
ClearPendingActionsAndNotifies();
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: done");
}
/*
* Exec_Listen --- subroutine for AtCommit_Notify
*
* Register the current backend as listening on the specified relation.
*/
static void
Exec_Listen(Relation lRel, const char *relname)
{
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple tuple; HeapTuple tuple;
Datum values[Natts_pg_listener]; Datum values[Natts_pg_listener];
char nulls[Natts_pg_listener]; char nulls[Natts_pg_listener];
int i; NameData condname;
bool alreadyListener = false; bool alreadyListener = false;
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
lRel = heap_open(ListenerRelationId, ExclusiveLock);
/* Detect whether we are already listening on this relname */ /* Detect whether we are already listening on this relname */
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
...@@ -216,27 +477,20 @@ Async_Listen(const char *relname) ...@@ -216,27 +477,20 @@ Async_Listen(const char *relname)
heap_endscan(scan); heap_endscan(scan);
if (alreadyListener) if (alreadyListener)
{
heap_close(lRel, ExclusiveLock);
return; return;
}
/* /*
* OK to insert a new tuple * OK to insert a new tuple
*/ */
memset(nulls, ' ', sizeof(nulls));
for (i = 0; i < Natts_pg_listener; i++) namestrcpy(&condname, relname);
{ values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
nulls[i] = ' '; values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid);
values[i] = PointerGetDatum(NULL); values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */
}
i = 0;
values[i++] = (Datum) relname;
values[i++] = (Datum) MyProcPid;
values[i++] = (Datum) 0; /* no notifies pending */
tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
simple_heap_insert(lRel, tuple); simple_heap_insert(lRel, tuple);
#ifdef NOT_USED /* currently there are no indexes */ #ifdef NOT_USED /* currently there are no indexes */
...@@ -245,8 +499,6 @@ Async_Listen(const char *relname) ...@@ -245,8 +499,6 @@ Async_Listen(const char *relname)
heap_freetuple(tuple); heap_freetuple(tuple);
heap_close(lRel, ExclusiveLock);
/* /*
* now that we are listening, make sure we will unlisten before dying. * now that we are listening, make sure we will unlisten before dying.
*/ */
...@@ -258,37 +510,19 @@ Async_Listen(const char *relname) ...@@ -258,37 +510,19 @@ Async_Listen(const char *relname)
} }
/* /*
*-------------------------------------------------------------- * Exec_Unlisten --- subroutine for AtCommit_Notify
* Async_Unlisten
*
* This is executed by the SQL unlisten command.
* *
* Remove the current backend from the list of listening backends * Remove the current backend from the list of listening backends
* for the specified relation. * for the specified relation.
*
* Side effects:
* pg_listener is updated.
*
*--------------------------------------------------------------
*/ */
void static void
Async_Unlisten(const char *relname) Exec_Unlisten(Relation lRel, const char *relname)
{ {
Relation lRel;
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple tuple; HeapTuple tuple;
/* Handle specially the `unlisten "*"' command */
if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
{
Async_UnlistenAll();
return;
}
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
lRel = heap_open(ListenerRelationId, ExclusiveLock);
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
...@@ -310,8 +544,6 @@ Async_Unlisten(const char *relname) ...@@ -310,8 +544,6 @@ Async_Unlisten(const char *relname)
} }
heap_endscan(scan); heap_endscan(scan);
heap_close(lRel, ExclusiveLock);
/* /*
* We do not complain about unlistening something not being listened; * We do not complain about unlistening something not being listened;
* should we? * should we?
...@@ -319,35 +551,19 @@ Async_Unlisten(const char *relname) ...@@ -319,35 +551,19 @@ Async_Unlisten(const char *relname)
} }
/* /*
*-------------------------------------------------------------- * Exec_UnlistenAll --- subroutine for AtCommit_Notify
* Async_UnlistenAll
*
* Unlisten all relations for this backend.
*
* This is invoked by UNLISTEN "*" command, and also at backend exit.
*
* Results:
* XXX
*
* Side effects:
* pg_listener is updated.
* *
*-------------------------------------------------------------- * Update pg_listener to unlisten all relations for this backend.
*/ */
void static void
Async_UnlistenAll(void) Exec_UnlistenAll(Relation lRel)
{ {
Relation lRel;
TupleDesc tdesc;
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple lTuple; HeapTuple lTuple;
ScanKeyData key[1]; ScanKeyData key[1];
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_UnlistenAll"); elog(DEBUG1, "Exec_UnlistenAll");
lRel = heap_open(ListenerRelationId, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
/* Find and delete all entries with my listenerPID */ /* Find and delete all entries with my listenerPID */
ScanKeyInit(&key[0], ScanKeyInit(&key[0],
...@@ -360,100 +576,18 @@ Async_UnlistenAll(void) ...@@ -360,100 +576,18 @@ Async_UnlistenAll(void)
simple_heap_delete(lRel, &lTuple->t_self); simple_heap_delete(lRel, &lTuple->t_self);
heap_endscan(scan); heap_endscan(scan);
heap_close(lRel, ExclusiveLock);
} }
/* /*
*-------------------------------------------------------------- * Send_Notify --- subroutine for AtCommit_Notify
* Async_UnlistenOnExit
*
* Clean up the pg_listener table at backend exit.
*
* This is executed if we have done any LISTENs in this backend.
* It might not be necessary anymore, if the user UNLISTENed everything,
* but we don't try to detect that case.
*
* Results:
* XXX
*
* Side effects:
* pg_listener is updated if necessary.
* *
*-------------------------------------------------------------- * Scan pg_listener for tuples matching our pending notifies, and
* either signal the other backend or send a message to our own frontend.
*/ */
static void static void
Async_UnlistenOnExit(int code, Datum arg) Send_Notify(Relation lRel)
{
/*
* We need to start/commit a transaction for the unlisten, but if there is
* already an active transaction we had better abort that one first.
* Otherwise we'd end up committing changes that probably ought to be
* discarded.
*/
AbortOutOfAnyTransaction();
/* Now we can do the unlisten */
StartTransactionCommand();
Async_UnlistenAll();
CommitTransactionCommand();
}
/*
*--------------------------------------------------------------
* AtPrepare_Notify
*
* This is called at the prepare phase of a two-phase
* transaction. Save the state for possible commit later.
*--------------------------------------------------------------
*/
void
AtPrepare_Notify(void)
{
ListCell *p;
foreach(p, pendingNotifies)
{
const char *relname = (const char *) lfirst(p);
RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
relname, strlen(relname) + 1);
}
/*
* We can clear the state immediately, rather than needing a separate
* PostPrepare call, because if the transaction fails we'd just discard
* the state anyway.
*/
ClearPendingNotifies();
}
/*
*--------------------------------------------------------------
* AtCommit_Notify
*
* This is called at transaction commit.
*
* If there are outbound notify requests in the pendingNotifies list,
* scan pg_listener for matching tuples, and either signal the other
* backend or send a message to our own frontend.
*
* NOTE: we are still inside the current transaction, therefore can
* piggyback on its committing of changes.
*
* Results:
* XXX
*
* Side effects:
* Tuples in pg_listener that have matching relnames and other peoples'
* listenerPIDs are updated with a nonzero notification field.
*
*--------------------------------------------------------------
*/
void
AtCommit_Notify(void)
{ {
Relation lRel; TupleDesc tdesc = RelationGetDescr(lRel);
TupleDesc tdesc;
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple lTuple, HeapTuple lTuple,
rTuple; rTuple;
...@@ -461,22 +595,6 @@ AtCommit_Notify(void) ...@@ -461,22 +595,6 @@ AtCommit_Notify(void)
char repl[Natts_pg_listener], char repl[Natts_pg_listener],
nulls[Natts_pg_listener]; nulls[Natts_pg_listener];
if (pendingNotifies == NIL)
return; /* no NOTIFY statements in this transaction */
/*
* NOTIFY is disabled if not normal processing mode. This test used to be
* in xact.c, but it seems cleaner to do it here.
*/
if (!IsNormalProcessingMode())
{
ClearPendingNotifies();
return;
}
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
/* preset data to update notify column to MyProcPid */ /* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' '; nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' '; repl[0] = repl[1] = repl[2] = ' ';
...@@ -484,8 +602,6 @@ AtCommit_Notify(void) ...@@ -484,8 +602,6 @@ AtCommit_Notify(void)
value[0] = value[1] = value[2] = (Datum) 0; value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
lRel = heap_open(ListenerRelationId, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
...@@ -505,7 +621,6 @@ AtCommit_Notify(void) ...@@ -505,7 +621,6 @@ AtCommit_Notify(void)
* could lose an outside notify, which'd be bad for applications * could lose an outside notify, which'd be bad for applications
* that ignore self-notify messages. * that ignore self-notify messages.
*/ */
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: notifying self"); elog(DEBUG1, "AtCommit_Notify: notifying self");
...@@ -538,99 +653,32 @@ AtCommit_Notify(void) ...@@ -538,99 +653,32 @@ AtCommit_Notify(void)
} }
else if (listener->notification == 0) else if (listener->notification == 0)
{ {
HTSU_Result result; /* Rewrite the tuple with my PID in notification column */
ItemPointerData update_ctid; rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
TransactionId update_xmax; simple_heap_update(lRel, &lTuple->t_self, rTuple);
rTuple = heap_modifytuple(lTuple, tdesc,
value, nulls, repl);
/*
* We cannot use simple_heap_update here because the tuple
* could have been modified by an uncommitted transaction;
* specifically, since UNLISTEN releases exclusive lock on the
* table before commit, the other guy could already have tried
* to unlisten. There are no other cases where we should be
* able to see an uncommitted update or delete. Therefore, our
* response to a HeapTupleBeingUpdated result is just to
* ignore it. We do *not* wait for the other guy to commit
* --- that would risk deadlock, and we don't want to block
* while holding the table lock anyway for performance
* reasons. We also ignore HeapTupleUpdated, which could occur
* if the other guy commits between our heap_getnext and
* heap_update calls.
*/
result = heap_update(lRel, &lTuple->t_self, rTuple,
&update_ctid, &update_xmax,
GetCurrentCommandId(true),
InvalidSnapshot,
false /* no wait for commit */ );
switch (result)
{
case HeapTupleSelfUpdated:
/* Tuple was already updated in current command? */
elog(ERROR, "tuple already updated by self");
break;
case HeapTupleMayBeUpdated:
/* done successfully */
#ifdef NOT_USED /* currently there are no indexes */ #ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, rTuple); CatalogUpdateIndexes(lRel, rTuple);
#endif #endif
break;
case HeapTupleBeingUpdated:
/* ignore uncommitted tuples */
break;
case HeapTupleUpdated:
/* ignore just-committed tuples */
break;
default:
elog(ERROR, "unrecognized heap_update status: %u",
result);
break;
}
} }
} }
} }
heap_endscan(scan); heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
* until end of transaction (which is about to happen, anyway) to ensure
* that notified backends see our tuple updates when they look. Else they
* might disregard the signal, which would make the application programmer
* very unhappy.
*/
heap_close(lRel, NoLock);
ClearPendingNotifies();
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: done");
} }
/* /*
*--------------------------------------------------------------
* AtAbort_Notify * AtAbort_Notify
* *
* This is called at transaction abort. * This is called at transaction abort.
* *
* Gets rid of pending outbound notifies that we would have executed * Gets rid of pending actions and outbound notifies that we would have
* if the transaction got committed. * executed if the transaction got committed.
*
* Results:
* XXX
*
*--------------------------------------------------------------
*/ */
void void
AtAbort_Notify(void) AtAbort_Notify(void)
{ {
ClearPendingNotifies(); ClearPendingActionsAndNotifies();
} }
/* /*
...@@ -646,6 +694,13 @@ AtSubStart_Notify(void) ...@@ -646,6 +694,13 @@ AtSubStart_Notify(void)
/* Keep the list-of-lists in TopTransactionContext for simplicity */ /* Keep the list-of-lists in TopTransactionContext for simplicity */
old_cxt = MemoryContextSwitchTo(TopTransactionContext); old_cxt = MemoryContextSwitchTo(TopTransactionContext);
upperPendingActions = lcons(pendingActions, upperPendingActions);
Assert(list_length(upperPendingActions) ==
GetCurrentTransactionNestLevel() - 1);
pendingActions = NIL;
upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
Assert(list_length(upperPendingNotifies) == Assert(list_length(upperPendingNotifies) ==
...@@ -659,13 +714,25 @@ AtSubStart_Notify(void) ...@@ -659,13 +714,25 @@ AtSubStart_Notify(void)
/* /*
* AtSubCommit_Notify() --- Take care of subtransaction commit. * AtSubCommit_Notify() --- Take care of subtransaction commit.
* *
* Reassign all items in the pending notifies list to the parent transaction. * Reassign all items in the pending lists to the parent transaction.
*/ */
void void
AtSubCommit_Notify(void) AtSubCommit_Notify(void)
{ {
List *parentPendingActions;
List *parentPendingNotifies; List *parentPendingNotifies;
parentPendingActions = (List *) linitial(upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
Assert(list_length(upperPendingActions) ==
GetCurrentTransactionNestLevel() - 2);
/*
* Mustn't try to eliminate duplicates here --- see queue_listen()
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
parentPendingNotifies = (List *) linitial(upperPendingNotifies); parentPendingNotifies = (List *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies);
...@@ -687,7 +754,7 @@ AtSubAbort_Notify(void) ...@@ -687,7 +754,7 @@ AtSubAbort_Notify(void)
int my_level = GetCurrentTransactionNestLevel(); int my_level = GetCurrentTransactionNestLevel();
/* /*
* All we have to do is pop the stack --- the notifies made in this * All we have to do is pop the stack --- the actions/notifies made in this
* subxact are no longer interesting, and the space will be freed when * subxact are no longer interesting, and the space will be freed when
* CurTransactionContext is recycled. * CurTransactionContext is recycled.
* *
...@@ -696,6 +763,12 @@ AtSubAbort_Notify(void) ...@@ -696,6 +763,12 @@ AtSubAbort_Notify(void)
* GetCurrentTransactionNestLevel as the indicator of how far we need to * GetCurrentTransactionNestLevel as the indicator of how far we need to
* prune the list. * prune the list.
*/ */
while (list_length(upperPendingActions) > my_level - 2)
{
pendingActions = (List *) linitial(upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
}
while (list_length(upperPendingNotifies) > my_level - 2) while (list_length(upperPendingNotifies) > my_level - 2)
{ {
pendingNotifies = (List *) linitial(upperPendingNotifies); pendingNotifies = (List *) linitial(upperPendingNotifies);
...@@ -704,7 +777,6 @@ AtSubAbort_Notify(void) ...@@ -704,7 +777,6 @@ AtSubAbort_Notify(void)
} }
/* /*
*--------------------------------------------------------------
* NotifyInterruptHandler * NotifyInterruptHandler
* *
* This is the signal handler for SIGUSR2. * This is the signal handler for SIGUSR2.
...@@ -712,13 +784,6 @@ AtSubAbort_Notify(void) ...@@ -712,13 +784,6 @@ AtSubAbort_Notify(void)
* If we are idle (notifyInterruptEnabled is set), we can safely invoke * If we are idle (notifyInterruptEnabled is set), we can safely invoke
* ProcessIncomingNotify directly. Otherwise, just set a flag * ProcessIncomingNotify directly. Otherwise, just set a flag
* to do it later. * to do it later.
*
* Results:
* none
*
* Side effects:
* per above
*--------------------------------------------------------------
*/ */
void void
NotifyInterruptHandler(SIGNAL_ARGS) NotifyInterruptHandler(SIGNAL_ARGS)
...@@ -794,7 +859,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) ...@@ -794,7 +859,6 @@ NotifyInterruptHandler(SIGNAL_ARGS)
} }
/* /*
* --------------------------------------------------------------
* EnableNotifyInterrupt * EnableNotifyInterrupt
* *
* This is called by the PostgresMain main loop just before waiting * This is called by the PostgresMain main loop just before waiting
...@@ -804,7 +868,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) ...@@ -804,7 +868,6 @@ NotifyInterruptHandler(SIGNAL_ARGS)
* *
* NOTE: the signal handler starts out disabled, and stays so until * NOTE: the signal handler starts out disabled, and stays so until
* PostgresMain calls this the first time. * PostgresMain calls this the first time.
* --------------------------------------------------------------
*/ */
void void
EnableNotifyInterrupt(void) EnableNotifyInterrupt(void)
...@@ -853,7 +916,6 @@ EnableNotifyInterrupt(void) ...@@ -853,7 +916,6 @@ EnableNotifyInterrupt(void)
} }
/* /*
* --------------------------------------------------------------
* DisableNotifyInterrupt * DisableNotifyInterrupt
* *
* This is called by the PostgresMain main loop just after receiving * This is called by the PostgresMain main loop just after receiving
...@@ -863,7 +925,6 @@ EnableNotifyInterrupt(void) ...@@ -863,7 +925,6 @@ EnableNotifyInterrupt(void)
* The SIGUSR1 signal handler also needs to call this, so as to * The SIGUSR1 signal handler also needs to call this, so as to
* prevent conflicts if one signal interrupts the other. So we * prevent conflicts if one signal interrupts the other. So we
* must return the previous state of the flag. * must return the previous state of the flag.
* --------------------------------------------------------------
*/ */
bool bool
DisableNotifyInterrupt(void) DisableNotifyInterrupt(void)
...@@ -876,7 +937,6 @@ DisableNotifyInterrupt(void) ...@@ -876,7 +937,6 @@ DisableNotifyInterrupt(void)
} }
/* /*
* --------------------------------------------------------------
* ProcessIncomingNotify * ProcessIncomingNotify
* *
* Deal with arriving NOTIFYs from other backends. * Deal with arriving NOTIFYs from other backends.
...@@ -886,7 +946,6 @@ DisableNotifyInterrupt(void) ...@@ -886,7 +946,6 @@ DisableNotifyInterrupt(void)
* and clear the notification field in pg_listener until next time. * and clear the notification field in pg_listener until next time.
* *
* NOTE: since we are outside any transaction, we must create our own. * NOTE: since we are outside any transaction, we must create our own.
* --------------------------------------------------------------
*/ */
static void static void
ProcessIncomingNotify(void) ProcessIncomingNotify(void)
...@@ -949,9 +1008,6 @@ ProcessIncomingNotify(void) ...@@ -949,9 +1008,6 @@ ProcessIncomingNotify(void)
/* /*
* Rewrite the tuple with 0 in notification column. * Rewrite the tuple with 0 in notification column.
*
* simple_heap_update is safe here because no one else would have
* tried to UNLISTEN us, so there can be no uncommitted changes.
*/ */
rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple); simple_heap_update(lRel, &lTuple->t_self, rTuple);
...@@ -1035,17 +1091,18 @@ AsyncExistsPendingNotify(const char *relname) ...@@ -1035,17 +1091,18 @@ AsyncExistsPendingNotify(const char *relname)
return false; return false;
} }
/* Clear the pendingNotifies list. */ /* Clear the pendingActions and pendingNotifies lists. */
static void static void
ClearPendingNotifies(void) ClearPendingActionsAndNotifies(void)
{ {
/* /*
* We used to have to explicitly deallocate the list members and nodes, * We used to have to explicitly deallocate the list members and nodes,
* because they were malloc'd. Now, since we know they are palloc'd in * because they were malloc'd. Now, since we know they are palloc'd in
* CurTransactionContext, we need not do that --- they'll go away * CurTransactionContext, we need not do that --- they'll go away
* automatically at transaction exit. We need only reset the list head * automatically at transaction exit. We need only reset the list head
* pointer. * pointers.
*/ */
pendingActions = NIL;
pendingNotifies = NIL; pendingNotifies = NIL;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment