Commit c77a29a1 authored by Tom Lane's avatar Tom Lane

Substantial rewrite of async.c to avoid problems with non-reentrant stdio

and possibly other problems.  Minor changes in xact.c and postgres.c's
main loop to support new handling of async NOTIFY.
parent e7e027a6
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.23 1998/09/01 04:27:19 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.24 1998/10/06 02:39:58 tgl Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -901,6 +901,9 @@ CommitTransaction() ...@@ -901,6 +901,9 @@ CommitTransaction()
/* handle commit for large objects [ PA, 7/17/98 ] */ /* handle commit for large objects [ PA, 7/17/98 ] */
_lo_commit(); _lo_commit();
/* NOTIFY commit must also come before lower-level cleanup */
AtCommit_Notify();
CloseSequences(); CloseSequences();
DestroyTempRels(); DestroyTempRels();
AtEOXact_portals(); AtEOXact_portals();
...@@ -916,10 +919,6 @@ CommitTransaction() ...@@ -916,10 +919,6 @@ CommitTransaction()
* ---------------- * ----------------
*/ */
s->state = TRANS_DEFAULT; s->state = TRANS_DEFAULT;
{ /* want this after commit */
if (IsNormalProcessingMode())
Async_NotifyAtCommit();
}
/* /*
* Let others to know about no transaction in progress - vadim * Let others to know about no transaction in progress - vadim
...@@ -967,6 +966,7 @@ AbortTransaction() ...@@ -967,6 +966,7 @@ AbortTransaction()
* do abort processing * do abort processing
* ---------------- * ----------------
*/ */
AtAbort_Notify();
CloseSequences(); CloseSequences();
AtEOXact_portals(); AtEOXact_portals();
RecordTransactionAbort(); RecordTransactionAbort();
...@@ -982,17 +982,6 @@ AbortTransaction() ...@@ -982,17 +982,6 @@ AbortTransaction()
* ---------------- * ----------------
*/ */
s->state = TRANS_DEFAULT; s->state = TRANS_DEFAULT;
{
/*
* We need to do this in case another process notified us while we
* are in the middle of an aborted transaction. We need to notify
* our frontend after we finish the current transaction. -- jw,
* 1/3/94
*/
if (IsNormalProcessingMode())
Async_NotifyAtAbort();
}
} }
/* -------------------------------- /* --------------------------------
...@@ -1455,6 +1444,30 @@ UserAbortTransactionBlock() ...@@ -1455,6 +1444,30 @@ UserAbortTransactionBlock()
s->blockState = TBLOCK_ENDABORT; s->blockState = TBLOCK_ENDABORT;
} }
/* --------------------------------
* AbortOutOfAnyTransaction
*
* This routine is provided for error recovery purposes. It aborts any
* active transaction or transaction block, leaving the system in a known
* idle state.
* --------------------------------
*/
void
AbortOutOfAnyTransaction()
{
TransactionState s = CurrentTransactionState;
/*
* Get out of any low-level transaction
*/
if (s->state != TRANS_DEFAULT)
AbortTransaction();
/*
* Now reset the high-level state
*/
s->blockState = TBLOCK_DEFAULT;
}
bool bool
IsTransactionBlock() IsTransactionBlock()
{ {
......
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* async.c-- * async.c--
* Asynchronous notification * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
*
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.41 1998/10/06 02:39:59 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
/* New Async Notification Model:
/*-------------------------------------------------------------------------
* New Async Notification Model:
* 1. Multiple backends on same machine. Multiple backends listening on * 1. Multiple backends on same machine. Multiple backends listening on
* one relation. * one relation. (Note: "listening on a relation" is not really the
* * right way to think about it, since the notify names need not have
* 2. One of the backend does a 'notify <relname>'. For all backends that * anything to do with the names of relations actually in the database.
* are listening to this relation (all notifications take place at the * But this terminology is all over the code and docs, and I don't feel
* end of commit), * like trying to replace it.)
* 2.a If the process is the same as the backend process that issued *
* notification (we are notifying something that we are listening), * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
* signal the corresponding frontend over the comm channel. * ie, each relname/listenerPID pair. The "notification" field of the
* 2.b For all other listening processes, we send kill(SIGUSR2) to wake up * tuple is zero when no NOTIFY is pending for that listener, or the PID
* the listening backend. * of the originating backend when a cross-backend NOTIFY is pending.
* 3. Upon receiving a kill(SIGUSR2) signal from another backend process * (We skip writing to pg_listener when doing a self-NOTIFY, so the
* notifying that one of the relation that we are listening is being * notification field should never be equal to the listenerPID field.)
* notified, we can be in either of two following states: *
* 3.a We are sleeping, wake up and signal our frontend. * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
* 3.b We are in middle of another transaction, wait until the end of * relname to a list of outstanding NOTIFY requests. Actual processing
* of the current transaction and signal our frontend. * happens if and only if we reach transaction commit. At that time (in
* 4. Each frontend receives this notification and processes accordingly. * routine AtCommit_Notify) we scan pg_listener for matching relnames.
* * If the listenerPID in a matching tuple is ours, we just send a notify
* -- jw, 12/28/93 * message to our own front end. If it is not ours, and "notification"
* * is not already nonzero, we set notification to our own PID and send a
* SIGUSR2 signal to the receiving process (indicated by listenerPID).
* BTW: if the signal operation fails, we presume that the listener backend
* crashed without removing this tuple, and remove the tuple for it.
*
* 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
* notify processing immediately if this backend is idle (ie, it is
* waiting for a frontend command and is not within a transaction block).
* Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
* 5. Inbound-notify processing consists of scanning pg_listener for tuples
* matching our own listenerPID and having nonzero notification fields.
* For each such tuple, we send a message to our frontend and clear the
* notification field. BTW: this routine has to start/commit its own
* transaction, since by assumption it is only called from outside any
* transaction.
*
* Note that the system's use of pg_listener is confined to very short
* intervals at the end of a transaction that contains NOTIFY statements,
* or during the transaction caused by an inbound SIGUSR2. So the fact that
* pg_listener is a global resource shouldn't cause too much performance
* problem. But application authors ought to be discouraged from doing
* LISTEN or UNLISTEN near the start of a long transaction --- that would
* result in holding the pg_listener write lock for a long time, possibly
* blocking unrelated activity. It could even lead to deadlock against another
* transaction that touches the same user tables and then tries to NOTIFY.
* Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
*
* An application that listens on the same relname it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
* PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies. Note,
* however, that we do *not* guarantee that a separate frontend message will
* be sent for every outside NOTIFY. Since there is only room for one
* originating PID in pg_listener, outside notifies occurring at about the
* same time may be collapsed into a single message bearing the PID of the
* first outside backend to perform the NOTIFY.
*-------------------------------------------------------------------------
*/ */
#include <unistd.h> #include <unistd.h>
...@@ -44,90 +85,59 @@ ...@@ -44,90 +85,59 @@
#include "postgres.h" #include "postgres.h"
#include "commands/async.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/catname.h" #include "catalog/catname.h"
#include "catalog/pg_listener.h" #include "catalog/pg_listener.h"
#include "commands/async.h"
#include "fmgr.h" #include "fmgr.h"
#include "lib/dllist.h" #include "lib/dllist.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/memnodes.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "tcop/dest.h" #include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include <utils/trace.h> #include <utils/trace.h>
#include <utils/ps_status.h> #include <utils/ps_status.h>
#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK] /* stuff that we really ought not be touching directly :-( */
#define NotifyHack pg_options[OPT_NOTIFYHACK]
extern TransactionState CurrentTransactionState; extern TransactionState CurrentTransactionState;
extern CommandDest whereToSendOutput; extern CommandDest whereToSendOutput;
GlobalMemory notifyContext = NULL; /*
* State for outbound notifies consists of a list of all relnames NOTIFYed
static int notifyFrontEndPending = 0; * in the current transaction. We do not actually perform a NOTIFY until
static int notifyIssued = 0; * and unless the transaction commits. pendingNotifies is NULL if no
* NOTIFYs have been done in the current transaction.
*/
static Dllist *pendingNotifies = NULL; static Dllist *pendingNotifies = NULL;
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
static void Async_UnlistenAll(void);
/* /*
*-------------------------------------------------------------- * State for inbound notifies consists of two flags: one saying whether
* Async_NotifyHandler -- * the signal handler is currently allowed to call ProcessIncomingNotify
* * directly, and one saying whether the signal has occurred but the handler
* This is the signal handler for SIGUSR2. When the backend * was not allowed to call ProcessIncomingNotify at the time.
* is signaled, the backend can be in two states. *
* 1. If the backend is in the middle of another transaction, * NB: the "volatile" on these declarations is critical! If your compiler
* we set the flag, notifyFrontEndPending, and wait until * does not grok "volatile", you'd be best advised to compile this file
* the end of the transaction to notify the front end. * with all optimization turned off.
* 2. If the backend is not in the middle of another transaction,
* we notify the front end immediately.
*
* -- jw, 12/28/93
* Results:
* none
*
* Side effects:
* none
*/ */
void static volatile int notifyInterruptEnabled = 0;
Async_NotifyHandler(SIGNAL_ARGS) static volatile int notifyInterruptOccurred = 0;
{
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
if ((CurrentTransactionState->state == TRANS_DEFAULT) && /* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
(CurrentTransactionState->blockState == TRANS_DEFAULT)) static int unlistenExitRegistered = 0;
{
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
"waking up sleeping backend process"); static void Async_UnlistenAll(void);
PS_SET_STATUS("async_notify"); static void Async_UnlistenOnExit(void);
Async_NotifyFrontEnd(); static void ProcessIncomingNotify(void);
PS_SET_STATUS("idle"); static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
} static int AsyncExistsPendingNotify(char *relname);
else static void ClearPendingNotifies(void);
{
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
"process in middle of transaction, state=%d, blockstate=%d",
CurrentTransactionState->state,
CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
}
/* /*
*-------------------------------------------------------------- *--------------------------------------------------------------
...@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS) ...@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS)
* This is executed by the SQL notify command. * This is executed by the SQL notify command.
* *
* Adds the relation to the list of pending notifies. * Adds the relation to the list of pending notifies.
* All notification happens at end of commit. * Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
* All notification of backend processes happens here,
* then each backend notifies its corresponding front end at
* the end of commit.
*
* -- jw, 12/28/93
* *
* Results: * Results:
* XXX * XXX
* *
* Side effects:
* All tuples for relname in pg_listener are updated.
*
*-------------------------------------------------------------- *--------------------------------------------------------------
*/ */
void void
Async_Notify(char *relname) Async_Notify(char *relname)
{ {
HeapTuple lTuple,
rTuple;
Relation lRel;
HeapScanDesc sRel;
TupleDesc tdesc;
ScanKeyData key;
Datum d,
value[3];
bool isnull;
char repl[3],
nulls[3];
char *notifyName; char *notifyName;
TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname); TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
if (!pendingNotifies)
pendingNotifies = DLNewList();
/* /*
* Allocate memory from the global malloc pool because it needs to be * We allocate list memory from the global malloc pool to ensure that
* referenced also when the transaction is finished. DZ - 26-08-1996 * it will live until we want to use it. This is probably not necessary
* any longer, since we will use it before the end of the transaction.
* DLList only knows how to use malloc() anyway, but we could probably
* palloc() the strings...
*/ */
if (!pendingNotifies)
pendingNotifies = DLNewList();
notifyName = strdup(relname); notifyName = strdup(relname);
DLAddHead(pendingNotifies, DLNewElem(notifyName)); DLAddHead(pendingNotifies, DLNewElem(notifyName));
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_relname,
F_NAMEEQ,
PointerGetDatum(notifyName));
lRel = heap_openr(ListenerRelationName);
tdesc = RelationGetDescr(lRel);
RelationSetLockForWrite(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' ';
repl[Anum_pg_listener_notify - 1] = 'r';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
if (!DatumGetInt32(d))
{
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notify is really issued only if a tuple has been changed */
notifyIssued = 1;
}
}
heap_endscan(sRel);
/* /*
* Note: if the write lock is unset we can get multiple tuples with * NOTE: we could check to see if pendingNotifies already has an entry
* same oid if other backends notify the same relation. Use this * for relname, and thus avoid making duplicate entries. However, most
* option at your own risk. * apps probably don't notify the same name multiple times per transaction,
* so we'd likely just be wasting cycles to make such a check.
* AsyncExistsPendingNotify() doesn't really care whether the list
* contains duplicates...
*/ */
if (NotifyUnlock)
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}
/*
*--------------------------------------------------------------
* Async_NotifyAtCommit --
*
* This is called at transaction commit.
*
* Signal our corresponding frontend process on relations that
* were notified. Signal all other backend process that
* are listening also.
*
* -- jw, 12/28/93
*
* Results:
* XXX
*
* Side effects:
* Tuples in pg_listener that has our listenerpid are updated so
* that the notification is 0. We do not want to notify frontend
* more than once.
*
* -- jw, 12/28/93
*
*--------------------------------------------------------------
*/
void
Async_NotifyAtCommit()
{
HeapTuple lTuple;
Relation lRel;
HeapScanDesc sRel;
TupleDesc tdesc;
ScanKeyData key;
Datum d;
bool isnull;
extern TransactionState CurrentTransactionState;
if (!pendingNotifies)
pendingNotifies = DLNewList();
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
if (notifyIssued)
{
/* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
F_INT4EQ,
Int32GetDatum(1));
lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
tdesc = RelationGetDescr(lRel);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
d = heap_getattr(lTuple, Anum_pg_listener_relname,
tdesc, &isnull);
if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
{
d = heap_getattr(lTuple, Anum_pg_listener_pid,
tdesc, &isnull);
if (MyProcPid == DatumGetInt32(d))
{
notifyFrontEndPending = 1;
TPRINTF(TRACE_NOTIFY,
"Async_NotifyAtCommit: notifying self");
}
else
{
TPRINTF(TRACE_NOTIFY,
"Async_NotifyAtCommit: notifying pid %d",
DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
if (errno == ESRCH)
heap_delete(lRel, &lTuple->t_ctid);
}
#endif
}
}
}
heap_endscan(sRel);
heap_close(lRel);
/*
* Notify the frontend inside the current transaction while we
* still have a valid write lock on pg_listeners. This avoid
* waiting until all other backends have finished with
* pg_listener.
*/
if (notifyFrontEndPending)
{
/* The aux version is called inside transaction */
Async_NotifyFrontEnd_Aux();
}
TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
CommitTransactionCommand();
}
else
{
/*
* No notifies issued by us. If notifyFrontEndPending has been
* set by Async_NotifyHandler notify the frontend of pending
* notifies from other backends.
*/
if (notifyFrontEndPending)
Async_NotifyFrontEnd();
}
ClearPendingNotify();
}
}
/*
*--------------------------------------------------------------
* Async_NotifyAtAbort --
*
* This is called at transaction commit.
*
* Gets rid of pending notifies. List elements are automatically
* freed through memory context.
*
*
* Results:
* XXX
*
* Side effects:
* XXX
*
*--------------------------------------------------------------
*/
void
Async_NotifyAtAbort()
{
if (pendingNotifies)
{
ClearPendingNotify();
DLFreeList(pendingNotifies);
}
pendingNotifies = DLNewList();
notifyIssued = 0;
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
/* don't forget to notify front end */
if (notifyFrontEndPending)
Async_NotifyFrontEnd();
}
} }
/* /*
...@@ -394,108 +191,94 @@ Async_NotifyAtAbort() ...@@ -394,108 +191,94 @@ Async_NotifyAtAbort()
* Register a backend (identified by its Unix PID) as listening * Register a backend (identified by its Unix PID) as listening
* on the specified relation. * on the specified relation.
* *
* One listener per relation, pg_listener relation is keyed
* on (relname,pid) to provide multiple listeners in future.
*
* Results: * Results:
* pg_listeners is updated. * XXX
* *
* Side effects: * Side effects:
* XXX * pg_listener is updated.
* *
*-------------------------------------------------------------- *--------------------------------------------------------------
*/ */
void void
Async_Listen(char *relname, int pid) Async_Listen(char *relname, int pid)
{ {
Datum values[Natts_pg_listener]; Relation lRel;
char nulls[Natts_pg_listener];
TupleDesc tdesc; TupleDesc tdesc;
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple tuple, HeapTuple tuple,
newtup; newtup;
Relation lDesc; Datum values[Natts_pg_listener];
char nulls[Natts_pg_listener];
Datum d; Datum d;
int i; int i;
bool isnull; bool isnull;
int alreadyListener = 0; int alreadyListener = 0;
char *relnamei;
TupleDesc tupDesc; TupleDesc tupDesc;
if (whereToSendOutput != Remote)
{
elog(NOTICE, "Async_Listen: "
"listen not available on interactive sessions");
return;
}
TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname); TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
values[i] = PointerGetDatum(NULL);
}
i = 0; lRel = heap_openr(ListenerRelationName);
values[i++] = (Datum) relname; RelationSetLockForWrite(lRel);
values[i++] = (Datum) pid; tdesc = RelationGetDescr(lRel);
values[i++] = (Datum) 0; /* no notifies pending */
lDesc = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lDesc);
/* is someone already listening. One listener per relation */ /* Detect whether we are already listening on this relname */
tdesc = RelationGetDescr(lDesc); scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
{ {
d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
&isnull); if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
relnamei = DatumGetPointer(d);
if (!strncmp(relnamei, relname, NAMEDATALEN))
{ {
d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull); d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
pid = DatumGetInt32(d); if (DatumGetInt32(d) == pid)
if (pid == MyProcPid) {
alreadyListener = 1; alreadyListener = 1;
} /* No need to scan the rest of the table */
if (alreadyListener) break;
{ }
/* No need to scan the rest of the table */
break;
} }
} }
heap_endscan(scan); heap_endscan(scan);
if (alreadyListener) if (alreadyListener)
{ {
elog(NOTICE, "Async_Listen: We are already listening on %s", elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
relname); RelationUnsetLockForWrite(lRel);
RelationUnsetLockForWrite(lDesc); heap_close(lRel);
heap_close(lDesc);
return; return;
} }
tupDesc = lDesc->rd_att;
newtup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, newtup);
pfree(newtup);
/* /*
* if (alreadyListener) { elog(NOTICE,"Async_Listen: already one * OK to insert a new tuple
* listener on %s (possibly dead)",relname); }
*/ */
RelationUnsetLockForWrite(lDesc); for (i = 0; i < Natts_pg_listener; i++)
heap_close(lDesc); {
nulls[i] = ' ';
values[i] = PointerGetDatum(NULL);
}
i = 0;
values[i++] = (Datum) relname;
values[i++] = (Datum) pid;
values[i++] = (Datum) 0; /* no notifies pending */
tupDesc = lRel->rd_att;
newtup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lRel, newtup);
pfree(newtup);
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
/* /*
* now that we are listening, we should make a note to ourselves to * now that we are listening, make sure we will unlisten before dying.
* unlisten prior to dying.
*/ */
relnamei = malloc(NAMEDATALEN); /* persists to process exit */ if (! unlistenExitRegistered)
StrNCpy(relnamei, relname, NAMEDATALEN); {
on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei); if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
elog(NOTICE, "Async_Listen: out of shmem_exit slots");
unlistenExitRegistered = 1;
}
} }
/* /*
...@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid) ...@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid)
* for the specified relation. * for the specified relation.
* *
* Results: * Results:
* pg_listeners is updated. * XXX
* *
* Side effects: * Side effects:
* XXX * pg_listener is updated.
* *
*-------------------------------------------------------------- *--------------------------------------------------------------
*/ */
void void
Async_Unlisten(char *relname, int pid) Async_Unlisten(char *relname, int pid)
{ {
Relation lDesc; Relation lRel;
HeapTuple lTuple; HeapTuple lTuple;
/* Handle specially the `unlisten "*"' command */ /* Handle specially the `unlisten "*"' command */
...@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid) ...@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid)
TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname); TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
/* Note we assume there can be only one matching tuple. */
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname), lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid), Int32GetDatum(pid),
0, 0); 0, 0);
if (lTuple != NULL) if (lTuple != NULL)
{ {
lDesc = heap_openr(ListenerRelationName); lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lDesc); RelationSetLockForWrite(lRel);
heap_delete(lDesc, &lTuple->t_ctid); heap_delete(lRel, &lTuple->t_ctid);
RelationUnsetLockForWrite(lDesc); RelationUnsetLockForWrite(lRel);
heap_close(lDesc); heap_close(lRel);
} }
/* We do not complain about unlistening something not being listened;
* should we?
*/
} }
/* /*
...@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid) ...@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid)
* *
* Unlisten all relations for this backend. * Unlisten all relations for this backend.
* *
* This is invoked by UNLISTEN "*" command, and also at backend exit.
*
* Results: * Results:
* pg_listeners is updated. * XXX
* *
* Side effects: * Side effects:
* XXX * pg_listener is updated.
* *
*-------------------------------------------------------------- *--------------------------------------------------------------
*/ */
static void static void
Async_UnlistenAll() Async_UnlistenAll()
{ {
HeapTuple lTuple;
Relation lRel; Relation lRel;
HeapScanDesc sRel;
TupleDesc tdesc; TupleDesc tdesc;
HeapScanDesc sRel;
HeapTuple lTuple;
ScanKeyData key[1]; ScanKeyData key[1];
TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll"); TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel);
tdesc = RelationGetDescr(lRel);
/* Find and delete all entries with my listenerPID */
ScanKeyEntryInitialize(&key[0], 0, ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_pid, Anum_pg_listener_pid,
F_INT4EQ, F_INT4EQ,
Int32GetDatum(MyProcPid)); Int32GetDatum(MyProcPid));
lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel);
tdesc = RelationGetDescr(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key); sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
heap_delete(lRel, &lTuple->t_ctid); heap_delete(lRel, &lTuple->t_ctid);
heap_endscan(sRel); heap_endscan(sRel);
RelationUnsetLockForWrite(lRel); RelationUnsetLockForWrite(lRel);
heap_close(lRel); heap_close(lRel);
TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
} }
/* /*
* -------------------------------------------------------------- *--------------------------------------------------------------
* Async_UnlistenOnExit -- * Async_UnlistenOnExit --
* *
* This is called at backend exit for each registered listen. * Clean up the pg_listener table at backend exit.
*
* This is executed if we have done any LISTENs in this backend.
* It might not be necessary anymore, if the user UNLISTENed everything,
* but we don't try to detect that case.
* *
* Results: * Results:
* XXX * XXX
* *
* -------------------------------------------------------------- * Side effects:
*/ * pg_listener is updated if necessary.
static void
Async_UnlistenOnExit(int code, /* from exitpg */
char *relname)
{
Async_Unlisten((char *) relname, MyProcPid);
}
/*
* --------------------------------------------------------------
* Async_NotifyFrontEnd --
*
* This is called outside transactions. The real work is done
* by Async_NotifyFrontEnd_Aux().
* *
* -------------------------------------------------------------- *--------------------------------------------------------------
*/ */
static void static void
Async_NotifyFrontEnd() Async_UnlistenOnExit()
{ {
/*
* We need to start/commit a transaction for the unlisten,
* but if there is already an active transaction we had better
* abort that one first. Otherwise we'd end up committing changes
* that probably ought to be discarded.
*/
AbortOutOfAnyTransaction();
/* Now we can do the unlisten */
StartTransactionCommand(); StartTransactionCommand();
Async_NotifyFrontEnd_Aux(); Async_UnlistenAll();
CommitTransactionCommand(); CommitTransactionCommand();
} }
/* /*
* -------------------------------------------------------------- *--------------------------------------------------------------
* Async_NotifyFrontEnd_Aux -- * AtCommit_Notify --
* *
* This must be called inside a transaction block. * This is called at transaction commit.
* *
* Perform an asynchronous notification to front end over * If there are outbound notify requests in the pendingNotifies list,
* portal comm channel. The name of the relation which contains the * scan pg_listener for matching tuples, and either signal the other
* data is sent to the front end. * backend or send a message to our own frontend.
* *
* We remove the notification flag from the pg_listener tuple * NOTE: we are still inside the current transaction, therefore can
* associated with our process. * piggyback on its committing of changes.
* *
* Results: * Results:
* XXX * XXX
* *
* -------------------------------------------------------------- * Side effects:
* Tuples in pg_listener that have matching relnames and other peoples'
* listenerPIDs are updated with a nonzero notification field.
*
*--------------------------------------------------------------
*/ */
static void void
Async_NotifyFrontEnd_Aux() AtCommit_Notify()
{ {
HeapTuple lTuple,
rTuple;
Relation lRel; Relation lRel;
HeapScanDesc sRel;
TupleDesc tdesc; TupleDesc tdesc;
ScanKeyData key[2]; HeapScanDesc sRel;
HeapTuple lTuple,
rTuple;
Datum d, Datum d,
value[3]; value[Natts_pg_listener];
char repl[3], char repl[Natts_pg_listener],
nulls[3]; nulls[Natts_pg_listener];
bool isnull; bool isnull;
char *relname;
int32 listenerPID;
#define MAX_DONE 64 if (!pendingNotifies)
return; /* no NOTIFY statements in this transaction */
char *done[MAX_DONE]; /* NOTIFY is disabled if not normal processing mode.
int ndone = 0; * This test used to be in xact.c, but it seems cleaner to do it here.
int i; */
if (! IsNormalProcessingMode())
{
ClearPendingNotifies();
return;
}
notifyFrontEndPending = 0; TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
StartTransactionCommand();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
F_INT4EQ,
Int32GetDatum(1));
ScanKeyEntryInitialize(&key[1], 0,
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
lRel = heap_openr(ListenerRelationName); lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel); RelationSetLockForWrite(lRel);
tdesc = RelationGetDescr(lRel); tdesc = RelationGetDescr(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key); sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
/* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' '; nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' '; repl[0] = repl[1] = repl[2] = ' ';
repl[Anum_pg_listener_notify - 1] = 'r'; repl[Anum_pg_listener_notify - 1] = 'r';
value[0] = value[1] = value[2] = (Datum) 0; value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{ {
d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
&isnull); relname = (char *) DatumGetPointer(d);
/* if (AsyncExistsPendingNotify(relname))
* This hack deletes duplicate tuples which can be left in the
* table if the NotifyUnlock option is set. I'm further
* investigating this. -- dz
*/
if (NotifyHack)
{ {
for (i = 0; i < ndone; i++) d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
listenerPID = DatumGetInt32(d);
if (listenerPID == MyProcPid)
{
/* Self-notify: no need to bother with table update.
* Indeed, we *must not* clear the notification field in
* this path, or we could lose an outside notify, which'd be
* bad for applications that ignore self-notify messages.
*/
TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
NotifyMyFrontEnd(relname, listenerPID);
}
else
{ {
if (strcmp(DatumGetName(d)->data, done[i]) == 0) TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
listenerPID);
/*
* If someone has already notified this listener,
* we don't bother modifying the table, but we do still send
* a SIGUSR2 signal, just in case that backend missed the
* earlier signal for some reason. It's OK to send the signal
* first, because the other guy can't read pg_listener until
* we unlock it.
*/
#ifdef HAVE_KILL
if (kill(listenerPID, SIGUSR2) < 0)
{ {
TPRINTF(TRACE_NOTIFY, /* Get rid of pg_listener entry if it refers to a PID
"Async_NotifyFrontEnd: duplicate %s", * that no longer exists. Presumably, that backend
DatumGetName(d)->data); * crashed without deleting its pg_listener entries.
* This code used to only delete the entry if errno==ESRCH,
* but as far as I can see we should just do it for any
* failure (certainly at least for EPERM too...)
*/
heap_delete(lRel, &lTuple->t_ctid); heap_delete(lRel, &lTuple->t_ctid);
continue;
} }
else
#endif
{
d = heap_getattr(lTuple, Anum_pg_listener_notify,
tdesc, &isnull);
if (DatumGetInt32(d) == 0)
{
rTuple = heap_modifytuple(lTuple, lRel,
value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
}
}
}
}
heap_endscan(sRel);
/*
* We do not do RelationUnsetLockForWrite(lRel) here, because the
* transaction is about to be committed anyway.
*/
heap_close(lRel);
ClearPendingNotifies();
TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
}
/*
*--------------------------------------------------------------
* AtAbort_Notify --
*
* This is called at transaction abort.
*
* Gets rid of pending outbound notifies that we would have executed
* if the transaction got committed.
*
* Results:
* XXX
*
*--------------------------------------------------------------
*/
void
AtAbort_Notify()
{
ClearPendingNotifies();
}
/*
*--------------------------------------------------------------
* Async_NotifyHandler --
*
* This is the signal handler for SIGUSR2.
*
* If we are idle (notifyInterruptEnabled is set), we can safely invoke
* ProcessIncomingNotify directly. Otherwise, just set a flag
* to do it later.
*
* Results:
* none
*
* Side effects:
* per above
*--------------------------------------------------------------
*/
void
Async_NotifyHandler(SIGNAL_ARGS)
{
/*
* Note: this is a SIGNAL HANDLER. You must be very wary what you do here.
* Some helpful soul had this routine sprinkled with TPRINTFs, which would
* likely lead to corruption of stdio buffers if they were ever turned on.
*/
if (notifyInterruptEnabled)
{
/* I'm not sure whether some flavors of Unix might allow another
* SIGUSR2 occurrence to recursively interrupt this routine.
* To cope with the possibility, we do the same sort of dance that
* EnableNotifyInterrupt must do --- see that routine for comments.
*/
notifyInterruptEnabled = 0; /* disable any recursive signal */
notifyInterruptOccurred = 1; /* do at least one iteration */
for (;;)
{
notifyInterruptEnabled = 1;
if (! notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
/* Here, it is finally safe to do stuff. */
TPRINTF(TRACE_NOTIFY,
"Async_NotifyHandler: perform async notify");
ProcessIncomingNotify();
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
} }
if (ndone < MAX_DONE)
done[ndone++] = pstrdup(DatumGetName(d)->data);
} }
}
else
{
/* In this path it is NOT SAFE to do much of anything, except this: */
notifyInterruptOccurred = 1;
}
}
/*
* --------------------------------------------------------------
* EnableNotifyInterrupt --
*
* This is called by the PostgresMain main loop just before waiting
* for a frontend command. If we are truly idle (ie, *not* inside
* a transaction block), then process any pending inbound notifies,
* and enable the signal handler to process future notifies directly.
*
* NOTE: the signal handler starts out disabled, and stays so until
* PostgresMain calls this the first time.
* --------------------------------------------------------------
*/
void
EnableNotifyInterrupt(void)
{
if (CurrentTransactionState->blockState != TRANS_DEFAULT)
return; /* not really idle */
/*
* This code is tricky because we are communicating with a signal
* handler that could interrupt us at any point. If we just checked
* notifyInterruptOccurred and then set notifyInterruptEnabled, we
* could fail to respond promptly to a signal that happens in between
* those two steps. (A very small time window, perhaps, but Murphy's
* Law says you can hit it...) Instead, we first set the enable flag,
* then test the occurred flag. If we see an unserviced interrupt
* has occurred, we re-clear the enable flag before going off to do
* the service work. (That prevents re-entrant invocation of
* ProcessIncomingNotify() if another interrupt occurs.)
* If an interrupt comes in between the setting and clearing of
* notifyInterruptEnabled, then it will have done the service
* work and left notifyInterruptOccurred zero, so we have to check
* again after clearing enable. The whole thing has to be in a loop
* in case another interrupt occurs while we're servicing the first.
* Once we get out of the loop, enable is set and we know there is no
* unserviced interrupt.
*
* NB: an overenthusiastic optimizing compiler could easily break this
* code. Hopefully, they all understand what "volatile" means these days.
*/
for (;;)
{
notifyInterruptEnabled = 1;
if (! notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
TPRINTF(TRACE_NOTIFY,
"EnableNotifyInterrupt: perform async notify");
ProcessIncomingNotify();
TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
}
}
}
/*
* --------------------------------------------------------------
* DisableNotifyInterrupt --
*
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of inbound notifies
* is disabled until the next EnableNotifyInterrupt call.
* --------------------------------------------------------------
*/
void
DisableNotifyInterrupt(void)
{
notifyInterruptEnabled = 0;
}
/*
* --------------------------------------------------------------
* ProcessIncomingNotify --
*
* Deal with arriving NOTIFYs from other backends.
* This is called either directly from the SIGUSR2 signal handler,
* or the next time control reaches the outer idle loop.
* Scan pg_listener for arriving notifies, report them to my front end,
* and clear the notification field in pg_listener until next time.
*
* NOTE: since we are outside any transaction, we must create our own.
*
* Results:
* XXX
*
* --------------------------------------------------------------
*/
static void
ProcessIncomingNotify(void)
{
Relation lRel;
TupleDesc tdesc;
ScanKeyData key[1];
HeapScanDesc sRel;
HeapTuple lTuple,
rTuple;
Datum d,
value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
bool isnull;
char *relname;
int32 sourcePID;
TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
PS_SET_STATUS("async_notify");
notifyInterruptOccurred = 0;
StartTransactionCommand();
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); lRel = heap_openr(ListenerRelationName);
heap_replace(lRel, &lTuple->t_ctid, rTuple); RelationSetLockForWrite(lRel);
tdesc = RelationGetDescr(lRel);
/* Scan only entries with my listenerPID */
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
/* notifying the front end */ /* Prepare data for rewriting 0 into notification field */
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s", nulls[0] = nulls[1] = nulls[2] = ' ';
DatumGetName(d)->data); repl[0] = repl[1] = repl[2] = ' ';
repl[Anum_pg_listener_notify - 1] = 'r';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
if (whereToSendOutput == Remote) while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
sourcePID = DatumGetInt32(d);
if (sourcePID != 0)
{ {
pq_putnchar("A", 1); d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
pq_putint((int32) MyProcPid, sizeof(int32)); relname = (char *) DatumGetPointer(d);
pq_putstr(DatumGetName(d)->data); /* Notify the frontend */
pq_flush(); TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
relname, (int) sourcePID);
NotifyMyFrontEnd(relname, sourcePID);
/* Rewrite the tuple with 0 in notification column */
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
} }
} }
heap_endscan(sRel); heap_endscan(sRel);
RelationUnsetLockForWrite(lRel); /*
* We do not do RelationUnsetLockForWrite(lRel) here, because the
* transaction is about to be committed anyway.
*/
heap_close(lRel); heap_close(lRel);
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done"); CommitTransactionCommand();
/* Must flush the notify messages to ensure frontend gets them promptly. */
pq_flush();
PS_SET_STATUS("idle");
TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
} }
/* Send NOTIFY message to my front end. */
static void
NotifyMyFrontEnd(char *relname, int32 listenerPID)
{
if (whereToSendOutput == Remote)
{
pq_putnchar("A", 1);
pq_putint(listenerPID, sizeof(int32));
pq_putstr(relname);
/* NOTE: we do not do pq_flush() here. For a self-notify, it will
* happen at the end of the transaction, and for incoming notifies
* ProcessIncomingNotify will do it after finding all the notifies.
*/
}
else
{
elog(NOTICE, "NOTIFY for %s", relname);
}
}
/* Does pendingNotifies include the given relname?
*
* NB: not called unless pendingNotifies != NULL.
*/
static int static int
AsyncExistsPendingNotify(char *relname) AsyncExistsPendingNotify(char *relname)
{ {
...@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname) ...@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname)
return 0; return 0;
} }
/* Clear the pendingNotifies list. */
static void static void
ClearPendingNotify() ClearPendingNotifies()
{ {
Dlelem *p; Dlelem *p;
while ((p = DLRemHead(pendingNotifies)) != NULL) if (pendingNotifies)
free(DLE_VAL(p)); {
/* Since the referenced strings are malloc'd, we have to scan the
* list and delete them individually. If we used palloc for the
* strings then we could just do DLFreeList to get rid of both
* the list nodes and the list base...
*/
while ((p = DLRemHead(pendingNotifies)) != NULL)
{
free(DLE_VAL(p));
DLFreeElem(p);
}
DLFreeList(pendingNotifies);
pendingNotifies = NULL;
}
} }
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.90 1998/10/02 01:14:14 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.91 1998/10/06 02:40:01 tgl Exp $
* *
* NOTES * NOTES
* this is the "main" module of the postgres backend and * this is the "main" module of the postgres backend and
...@@ -1511,7 +1511,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) ...@@ -1511,7 +1511,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
if (!IsUnderPostmaster) if (!IsUnderPostmaster)
{ {
puts("\nPOSTGRES backend interactive interface "); puts("\nPOSTGRES backend interactive interface ");
puts("$Revision: 1.90 $ $Date: 1998/10/02 01:14:14 $\n"); puts("$Revision: 1.91 $ $Date: 1998/10/06 02:40:01 $\n");
} }
/* ---------------- /* ----------------
...@@ -1559,7 +1559,16 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) ...@@ -1559,7 +1559,16 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
ReadyForQuery(whereToSendOutput); ReadyForQuery(whereToSendOutput);
/* ---------------- /* ----------------
* (2) read a command. * (2) deal with pending asynchronous NOTIFY from other backends,
* and enable async.c's signal handler to execute NOTIFY directly.
* ----------------
*/
QueryCancel = false; /* forget any earlier CANCEL signal */
EnableNotifyInterrupt();
/* ----------------
* (3) read a command.
* ---------------- * ----------------
*/ */
MemSet(parser_input, 0, MAX_PARSE_BUFFER); MemSet(parser_input, 0, MAX_PARSE_BUFFER);
...@@ -1569,7 +1578,13 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) ...@@ -1569,7 +1578,13 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
QueryCancel = false; /* forget any earlier CANCEL signal */ QueryCancel = false; /* forget any earlier CANCEL signal */
/* ---------------- /* ----------------
* (3) process the command. * (4) disable async.c's signal handler.
* ----------------
*/
DisableNotifyInterrupt();
/* ----------------
* (5) process the command.
* ---------------- * ----------------
*/ */
switch (firstchar) switch (firstchar)
...@@ -1640,7 +1655,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) ...@@ -1640,7 +1655,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
} }
/* ---------------- /* ----------------
* (4) commit the current transaction * (6) commit the current transaction
* *
* Note: if we had an empty input buffer, then we didn't * Note: if we had an empty input buffer, then we didn't
* call pg_exec_query, so we don't bother to commit this transaction. * call pg_exec_query, so we don't bother to commit this transaction.
......
...@@ -70,10 +70,6 @@ static char *opt_names[] = { ...@@ -70,10 +70,6 @@ static char *opt_names[] = {
"syslog", /* use syslog for error messages */ "syslog", /* use syslog for error messages */
"hostlookup", /* enable hostname lookup in ps_status */ "hostlookup", /* enable hostname lookup in ps_status */
"showportnumber", /* show port number in ps_status */ "showportnumber", /* show port number in ps_status */
"notifyunlock", /* enable unlock of pg_listener after
* notify */
"notifyhack" /* enable notify hack to remove duplicate
* tuples */
}; };
/* /*
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: xact.h,v 1.15 1998/09/01 04:34:35 momjian Exp $ * $Id: xact.h,v 1.16 1998/10/06 02:40:06 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -107,6 +107,7 @@ extern void BeginTransactionBlock(void); ...@@ -107,6 +107,7 @@ extern void BeginTransactionBlock(void);
extern void EndTransactionBlock(void); extern void EndTransactionBlock(void);
extern bool IsTransactionBlock(void); extern bool IsTransactionBlock(void);
extern void UserAbortTransactionBlock(void); extern void UserAbortTransactionBlock(void);
extern void AbortOutOfAnyTransaction(void);
extern TransactionId DisabledTransactionId; extern TransactionId DisabledTransactionId;
......
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* async.h-- * async.h--
* * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: async.h,v 1.9 1998/09/01 04:35:22 momjian Exp $ * $Id: async.h,v 1.10 1998/10/06 02:40:08 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef ASYNC_H #ifndef ASYNC_H
#define ASYNC_H #define ASYNC_H
#include <nodes/memnodes.h> #include <postgres.h>
extern void Async_NotifyHandler(SIGNAL_ARGS); /* notify-related SQL statements */
extern void Async_Notify(char *relname); extern void Async_Notify(char *relname);
extern void Async_NotifyAtCommit(void);
extern void Async_NotifyAtAbort(void);
extern void Async_Listen(char *relname, int pid); extern void Async_Listen(char *relname, int pid);
extern void Async_Unlisten(char *relname, int pid); extern void Async_Unlisten(char *relname, int pid);
extern GlobalMemory notifyContext; /* perform (or cancel) outbound notify processing at transaction commit */
extern void AtCommit_Notify(void);
extern void AtAbort_Notify(void);
/* signal handler for inbound notifies (SIGUSR2) */
extern void Async_NotifyHandler(SIGNAL_ARGS);
/*
* enable/disable processing of inbound notifies directly from signal handler.
* The enable routine first performs processing of any inbound notifies that
* have occurred since the last disable. These are meant to be called ONLY
* from the appropriate places in PostgresMain().
*/
extern void EnableNotifyInterrupt(void);
extern void DisableNotifyInterrupt(void);
#endif /* ASYNC_H */ #endif /* ASYNC_H */
...@@ -66,10 +66,6 @@ enum pg_option_enum ...@@ -66,10 +66,6 @@ enum pg_option_enum
OPT_SYSLOG, /* use syslog for error messages */ OPT_SYSLOG, /* use syslog for error messages */
OPT_HOSTLOOKUP, /* enable hostname lookup in ps_status */ OPT_HOSTLOOKUP, /* enable hostname lookup in ps_status */
OPT_SHOWPORTNUMBER, /* show port number in ps_status */ OPT_SHOWPORTNUMBER, /* show port number in ps_status */
OPT_NOTIFYUNLOCK, /* enable unlock of pg_listener after
* notify */
OPT_NOTIFYHACK, /* enable notify hack to remove duplicate
* tuples */
NUM_PG_OPTIONS /* must be the last item of enum */ NUM_PG_OPTIONS /* must be the last item of enum */
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment