/*-------------------------------------------------------------------------
 *
 * async.c--
 *	  Asynchronous notification
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.34 1998/06/27 04:53:29 momjian Exp $
 *
 *-------------------------------------------------------------------------
 */
/* New Async Notification Model:
 * 1. Multiple backends on same machine.  Multiple backends listening on
 *	  one relation.
 *
 * 2. One of the backend does a 'notify <relname>'.  For all backends that
 *	  are listening to this relation (all notifications take place at the
 *	  end of commit),
 *	  2.a  If the process is the same as the backend process that issued
 *		   notification (we are notifying something that we are listening),
 *		   signal the corresponding frontend over the comm channel.
 *	  2.b  For all other listening processes, we send kill(2) to wake up
 *		   the listening backend.
 * 3. Upon receiving a kill(2) signal from another backend process notifying
 *	  that one of the relation that we are listening is being notified,
 *	  we can be in either of two following states:
 *	  3.a  We are sleeping, wake up and signal our frontend.
 *	  3.b  We are in middle of another transaction, wait until the end of
 *		   of the current transaction and signal our frontend.
 * 4. Each frontend receives this notification and processes accordingly.
 *
 * -- jw, 12/28/93
 *
 */
/*
 * The following is the old model which does not work.
 */
/*
 * Model is:
 * 1. Multiple backends on same machine.
 *
 * 2. Query on one backend sends stuff over an asynchronous portal by
 *	  appending to a relation, and then doing an async. notification
 *	  (which takes place after commit) to all listeners on this relation.
 *
 * 3. Async. notification results in all backends listening on relation
 *	  to be woken up, by a process signal kill(2), with name of relation
 *	  passed in shared memory.
 *
 * 4. Each backend notifies its respective frontend over the comm
 *	  channel using the out-of-band channel.
 *
 * 5. Each frontend receives this notification and processes accordingly.
 *
 * #4,#5 are changing soon with pending rewrite of portal/protocol.
 *
 */
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>			/* Needed by in.h on Ultrix */
#include <netinet/in.h>

#include "postgres.h"

#include "access/heapam.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/catname.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "fmgr.h"
#include "lib/dllist.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
#include "nodes/memnodes.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h"

static int	notifyFrontEndPending = 0;
static int	notifyIssued = 0;
static Dllist *pendingNotifies = NULL;


static int	AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
void		Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);

/*
 *--------------------------------------------------------------
 * Async_NotifyHandler --
 *
 *		This is the signal handler for SIGUSR2.  When the backend
 *		is signaled, the backend can be in two states.
 *		1. If the backend is in the middle of another transaction,
 *		   we set the flag, notifyFrontEndPending, and wait until
 *		   the end of the transaction to notify the front end.
 *		2. If the backend is not in the middle of another transaction,
 *		   we notify the front end immediately.
 *
 *		-- jw, 12/28/93
 * Results:
 *		none
 *
 * Side effects:
 *		none
 */
void
Async_NotifyHandler(SIGNAL_ARGS)
{
	extern TransactionState CurrentTransactionState;

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{

#ifdef ASYNC_DEBUG
		elog(DEBUG, "Waking up sleeping backend process");
#endif
		Async_NotifyFrontEnd();

	}
	else
	{
#ifdef ASYNC_DEBUG
		elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
			 CurrentTransactionState->state,
			 CurrentTransactionState->blockState);
#endif
		notifyFrontEndPending = 1;
	}
}

/*
 *--------------------------------------------------------------
 * Async_Notify --
 *
 *		Adds the relation to the list of pending notifies.
 *		All notification happens at end of commit.
 *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 *
 *		All notification of backend processes happens here,
 *		then each backend notifies its corresponding front end at
 *		the end of commit.
 *
 *		This correspond to 'notify <relname>' command
 *		-- jw, 12/28/93
 *
 * Results:
 *		XXX
 *
 * Side effects:
 *		All tuples for relname in pg_listener are updated.
 *
 *--------------------------------------------------------------
 */
void
Async_Notify(char *relname)
{

	HeapTuple	lTuple,
				rTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key;
	Buffer		b;
	Datum		d,
				value[3];
	bool		isnull;
	char		repl[3],
				nulls[3];

	char	   *notifyName;

#ifdef ASYNC_DEBUG
	elog(DEBUG, "Async_Notify: %s", relname);
#endif

	if (!pendingNotifies)
		pendingNotifies = DLNewList();

	/*
	 * Allocate memory from the global malloc pool because it needs to be
	 * referenced also when the transaction is finished.  DZ - 26-08-1996
	 */
	notifyName = strdup(relname);
	DLAddHead(pendingNotifies, DLNewElem(notifyName));

	ScanKeyEntryInitialize(&key, 0,
						   Anum_pg_listener_relname,
						   F_NAMEEQ,
						   PointerGetDatum(notifyName));

	lRel = heap_openr(ListenerRelationName);
	tdesc = RelationGetTupleDescriptor(lRel);
	RelationSetLockForWrite(lRel);
	sRel = heap_beginscan(lRel, 0, false, 1, &key);

	nulls[0] = nulls[1] = nulls[2] = ' ';
	repl[0] = repl[1] = repl[2] = ' ';
	repl[Anum_pg_listener_notify - 1] = 'r';
	value[0] = value[1] = value[2] = (Datum) 0;
	value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);

	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
	{
		d = heap_getattr(lTuple, Anum_pg_listener_notify,
						 tdesc, &isnull);
		if (!DatumGetInt32(d))
		{
			rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
			heap_replace(lRel, &lTuple->t_ctid, rTuple);
		}
		ReleaseBuffer(b);
	}
	heap_endscan(sRel);
	RelationUnsetLockForWrite(lRel);
	heap_close(lRel);
	notifyIssued = 1;
}

/*
 *--------------------------------------------------------------
 * Async_NotifyAtCommit --
 *
 *		Signal our corresponding frontend process on relations that
 *		were notified.	Signal all other backend process that
 *		are listening also.
 *
 *		-- jw, 12/28/93
 *
 * Results:
 *		XXX
 *
 * Side effects:
 *		Tuples in pg_listener that has our listenerpid are updated so
 *		that the notification is 0.  We do not want to notify frontend
 *		more than once.
 *
 *		-- jw, 12/28/93
 *
 *--------------------------------------------------------------
 */
void
Async_NotifyAtCommit()
{
	HeapTuple	lTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key;
	Datum		d;
	bool		isnull;
	Buffer		b;
	extern TransactionState CurrentTransactionState;

	if (!pendingNotifies)
		pendingNotifies = DLNewList();

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{

		if (notifyIssued)
		{						/* 'notify <relname>' issued by us */
			notifyIssued = 0;
			StartTransactionCommand();
#ifdef ASYNC_DEBUG
			elog(DEBUG, "Async_NotifyAtCommit.");
#endif
			ScanKeyEntryInitialize(&key, 0,
								   Anum_pg_listener_notify,
								   F_INT4EQ,
								   Int32GetDatum(1));
			lRel = heap_openr(ListenerRelationName);
			RelationSetLockForWrite(lRel);
			sRel = heap_beginscan(lRel, 0, false, 1, &key);
			tdesc = RelationGetTupleDescriptor(lRel);

			while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
			{
				d = heap_getattr(lTuple, Anum_pg_listener_relname,
								 tdesc, &isnull);

				if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
				{
					d = heap_getattr(lTuple, Anum_pg_listener_pid,
									 tdesc, &isnull);

					if (MyProcPid == DatumGetInt32(d))
					{
#ifdef ASYNC_DEBUG
						elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
#endif
						notifyFrontEndPending = 1;
					}
					else
					{
#ifdef ASYNC_DEBUG
						elog(DEBUG, "Notifying others");
#endif
#ifdef HAVE_KILL
						if (kill(DatumGetInt32(d), SIGUSR2) < 0)
						{
							if (errno == ESRCH)
								heap_delete(lRel, &lTuple->t_ctid);
						}
#endif
					}
				}
				ReleaseBuffer(b);
			}
			heap_endscan(sRel);
			RelationUnsetLockForWrite(lRel);
			heap_close(lRel);

			CommitTransactionCommand();
			ClearPendingNotify();
		}

		if (notifyFrontEndPending)
		{						/* we need to notify the frontend of all
								 * pending notifies. */
			notifyFrontEndPending = 1;
			Async_NotifyFrontEnd();
		}
	}
}

/*
 *--------------------------------------------------------------
 * Async_NotifyAtAbort --
 *
 *		Gets rid of pending notifies.  List elements are automatically
 *		freed through memory context.
 *
 *
 * Results:
 *		XXX
 *
 * Side effects:
 *		XXX
 *
 *--------------------------------------------------------------
 */
void
Async_NotifyAtAbort()
{
	extern TransactionState CurrentTransactionState;

	if (notifyIssued)
		ClearPendingNotify();
	notifyIssued = 0;
	if (pendingNotifies)
		DLFreeList(pendingNotifies);
	pendingNotifies = DLNewList();

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{
		if (notifyFrontEndPending)
		{						/* don't forget to notify front end */
			Async_NotifyFrontEnd();
		}
	}
}

/*
 *--------------------------------------------------------------
 * Async_Listen --
 *
 *		Register a backend (identified by its Unix PID) as listening
 *		on the specified relation.
 *
 *		This corresponds to the 'listen <relation>' command in SQL
 *
 *		One listener per relation, pg_listener relation is keyed
 *		on (relname,pid) to provide multiple listeners in future.
 *
 * Results:
 *		pg_listeners is updated.
 *
 * Side effects:
 *		XXX
 *
 *--------------------------------------------------------------
 */
void
Async_Listen(char *relname, int pid)
{
	Datum		values[Natts_pg_listener];
	char		nulls[Natts_pg_listener];
	TupleDesc	tdesc;
	HeapScanDesc s;
	HeapTuple	htup,
				tup;
	Relation	lDesc;
	Buffer		b;
	Datum		d;
	int			i;
	bool		isnull;
	int			alreadyListener = 0;
	char	   *relnamei;
	TupleDesc	tupDesc;

#ifdef ASYNC_DEBUG
	elog(DEBUG, "Async_Listen: %s", relname);
#endif
	for (i = 0; i < Natts_pg_listener; i++)
	{
		nulls[i] = ' ';
		values[i] = PointerGetDatum(NULL);
	}

	i = 0;
	values[i++] = (Datum) relname;
	values[i++] = (Datum) pid;
	values[i++] = (Datum) 0;	/* no notifies pending */

	lDesc = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lDesc);

	/* is someone already listening.  One listener per relation */
	tdesc = RelationGetTupleDescriptor(lDesc);
	s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
	while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
	{
		d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
						 &isnull);
		relnamei = DatumGetPointer(d);
		if (!strncmp(relnamei, relname, NAMEDATALEN))
		{
			d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
			pid = DatumGetInt32(d);
			if (pid == MyProcPid)
				alreadyListener = 1;
		}
		ReleaseBuffer(b);
	}
	heap_endscan(s);

	if (alreadyListener)
	{
		elog(NOTICE, "Async_Listen: We are already listening on %s",
			 relname);
		return;
	}

	tupDesc = lDesc->rd_att;
	tup = heap_formtuple(tupDesc,
						 values,
						 nulls);
	heap_insert(lDesc, tup);

	pfree(tup);

	/*
	 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
	 * listener on %s (possibly dead)",relname); }
	 */

	RelationUnsetLockForWrite(lDesc);
	heap_close(lDesc);

	/*
	 * now that we are listening, we should make a note to ourselves to
	 * unlisten prior to dying.
	 */
	relnamei = malloc(NAMEDATALEN);		/* persists to process exit */
	StrNCpy(relnamei, relname, NAMEDATALEN);
	on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
}

/*
 *--------------------------------------------------------------
 * Async_Unlisten --
 *
 *		Remove the backend from the list of listening backends
 *		for the specified relation.
 *
 *		This would correspond to the 'unlisten <relation>'
 *		command, but there isn't one yet.
 *
 * Results:
 *		pg_listeners is updated.
 *
 * Side effects:
 *		XXX
 *
 *--------------------------------------------------------------
 */
void
Async_Unlisten(char *relname, int pid)
{
	Relation	lDesc;
	HeapTuple	lTuple;

	lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
								 Int32GetDatum(pid),
								 0, 0);
	lDesc = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lDesc);

	if (lTuple != NULL)
		heap_delete(lDesc, &lTuple->t_ctid);

	RelationUnsetLockForWrite(lDesc);
	heap_close(lDesc);
}

static void
Async_UnlistenOnExit(int code,	/* from exitpg */
					 char *relname)
{
	Async_Unlisten((char *) relname, MyProcPid);
}

/*
 * --------------------------------------------------------------
 * Async_NotifyFrontEnd --
 *
 *		Perform an asynchronous notification to front end over
 *		portal comm channel.  The name of the relation which contains the
 *		data is sent to the front end.
 *
 *		We remove the notification flag from the pg_listener tuple
 *		associated with our process.
 *
 * Results:
 *		XXX
 *
 * --------------------------------------------------------------
 */
GlobalMemory notifyContext = NULL;

static void
Async_NotifyFrontEnd()
{
	extern CommandDest whereToSendOutput;
	HeapTuple	lTuple,
				rTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key[2];
	Datum		d,
				value[3];
	char		repl[3],
				nulls[3];
	Buffer		b;
	bool		isnull;

	notifyFrontEndPending = 0;

#ifdef ASYNC_DEBUG
	elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
#endif

	StartTransactionCommand();
	ScanKeyEntryInitialize(&key[0], 0,
						   Anum_pg_listener_notify,
						   F_INT4EQ,
						   Int32GetDatum(1));
	ScanKeyEntryInitialize(&key[1], 0,
						   Anum_pg_listener_pid,
						   F_INT4EQ,
						   Int32GetDatum(MyProcPid));
	lRel = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lRel);
	tdesc = RelationGetTupleDescriptor(lRel);
	sRel = heap_beginscan(lRel, 0, false, 2, key);

	nulls[0] = nulls[1] = nulls[2] = ' ';
	repl[0] = repl[1] = repl[2] = ' ';
	repl[Anum_pg_listener_notify - 1] = 'r';
	value[0] = value[1] = value[2] = (Datum) 0;
	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);

	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
	{
		d = heap_getattr(lTuple, Anum_pg_listener_relname,
						 tdesc, &isnull);
		rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
		heap_replace(lRel, &lTuple->t_ctid, rTuple);

		/* notifying the front end */

		if (whereToSendOutput == Remote)
		{
			pq_putnchar("A", 1);
			pq_putint((int32) MyProcPid, sizeof(int32));
			pq_putstr(DatumGetName(d)->data);
			pq_flush();
		}
		else
			elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
		ReleaseBuffer(b);
	}
	CommitTransactionCommand();
}

static int
AsyncExistsPendingNotify(char *relname)
{
	Dlelem	   *p;

	for (p = DLGetHead(pendingNotifies);
		 p != NULL;
		 p = DLGetSucc(p))
	{
		/* Use NAMEDATALEN for relname comparison.	  DZ - 26-08-1996 */
		if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
			return 1;
	}

	return 0;
}

static void
ClearPendingNotify()
{
	Dlelem	   *p;

	while ((p = DLRemHead(pendingNotifies)) != NULL)
		free(DLE_VAL(p));
}