async.c 15.4 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * async.c--
4
 *	  Asynchronous notification
5 6 7 8 9
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
10
 *	  $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.34 1998/06/27 04:53:29 momjian Exp $
11 12 13 14 15
 *
 *-------------------------------------------------------------------------
 */
/* New Async Notification Model:
 * 1. Multiple backends on same machine.  Multiple backends listening on
16
 *	  one relation.
17 18
 *
 * 2. One of the backend does a 'notify <relname>'.  For all backends that
19 20 21 22
 *	  are listening to this relation (all notifications take place at the
 *	  end of commit),
 *	  2.a  If the process is the same as the backend process that issued
 *		   notification (we are notifying something that we are listening),
Bruce Momjian's avatar
Bruce Momjian committed
23
 *		   signal the corresponding frontend over the comm channel.
24 25
 *	  2.b  For all other listening processes, we send kill(2) to wake up
 *		   the listening backend.
26
 * 3. Upon receiving a kill(2) signal from another backend process notifying
27 28 29 30 31
 *	  that one of the relation that we are listening is being notified,
 *	  we can be in either of two following states:
 *	  3.a  We are sleeping, wake up and signal our frontend.
 *	  3.b  We are in middle of another transaction, wait until the end of
 *		   of the current transaction and signal our frontend.
Bruce Momjian's avatar
Bruce Momjian committed
32
 * 4. Each frontend receives this notification and processes accordingly.
33 34 35 36 37 38 39 40 41 42 43
 *
 * -- jw, 12/28/93
 *
 */
/*
 * The following is the old model which does not work.
 */
/*
 * Model is:
 * 1. Multiple backends on same machine.
 *
44 45 46
 * 2. Query on one backend sends stuff over an asynchronous portal by
 *	  appending to a relation, and then doing an async. notification
 *	  (which takes place after commit) to all listeners on this relation.
47
 *
48 49 50
 * 3. Async. notification results in all backends listening on relation
 *	  to be woken up, by a process signal kill(2), with name of relation
 *	  passed in shared memory.
51 52
 *
 * 4. Each backend notifies its respective frontend over the comm
53
 *	  channel using the out-of-band channel.
54 55 56 57 58 59
 *
 * 5. Each frontend receives this notification and processes accordingly.
 *
 * #4,#5 are changing soon with pending rewrite of portal/protocol.
 *
 */
Marc G. Fournier's avatar
Marc G. Fournier committed
60 61 62 63
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
64
#include <sys/types.h>			/* Needed by in.h on Ultrix */
Marc G. Fournier's avatar
Marc G. Fournier committed
65
#include <netinet/in.h>
66

Bruce Momjian's avatar
Bruce Momjian committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
#include "postgres.h"

#include "access/heapam.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/catname.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "fmgr.h"
#include "lib/dllist.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
#include "nodes/memnodes.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h"
85

86 87 88
static int	notifyFrontEndPending = 0;
static int	notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
89 90


91 92 93
static int	AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
94
void		Async_Unlisten(char *relname, int pid);
95
static void Async_UnlistenOnExit(int code, char *relname);
96

97 98 99 100
/*
 *--------------------------------------------------------------
 * Async_NotifyHandler --
 *
101 102 103 104 105 106 107
 *		This is the signal handler for SIGUSR2.  When the backend
 *		is signaled, the backend can be in two states.
 *		1. If the backend is in the middle of another transaction,
 *		   we set the flag, notifyFrontEndPending, and wait until
 *		   the end of the transaction to notify the front end.
 *		2. If the backend is not in the middle of another transaction,
 *		   we notify the front end immediately.
108
 *
109
 *		-- jw, 12/28/93
110
 * Results:
111
 *		none
112 113
 *
 * Side effects:
114
 *		none
115 116
 */
void
Bruce Momjian's avatar
Bruce Momjian committed
117
Async_NotifyHandler(SIGNAL_ARGS)
118
{
119 120 121 122 123
	extern TransactionState CurrentTransactionState;

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{
124

125
#ifdef ASYNC_DEBUG
126
		elog(DEBUG, "Waking up sleeping backend process");
127
#endif
128
		Async_NotifyFrontEnd();
129

130 131 132
	}
	else
	{
133
#ifdef ASYNC_DEBUG
134 135 136
		elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
			 CurrentTransactionState->state,
			 CurrentTransactionState->blockState);
137
#endif
138 139
		notifyFrontEndPending = 1;
	}
140 141 142 143 144 145
}

/*
 *--------------------------------------------------------------
 * Async_Notify --
 *
146 147 148
 *		Adds the relation to the list of pending notifies.
 *		All notification happens at end of commit.
 *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
149
 *
150 151 152
 *		All notification of backend processes happens here,
 *		then each backend notifies its corresponding front end at
 *		the end of commit.
153
 *
154 155
 *		This correspond to 'notify <relname>' command
 *		-- jw, 12/28/93
156 157
 *
 * Results:
158
 *		XXX
159 160
 *
 * Side effects:
161
 *		All tuples for relname in pg_listener are updated.
162 163 164 165 166 167
 *
 *--------------------------------------------------------------
 */
void
Async_Notify(char *relname)
{
168

169 170 171 172 173 174 175 176 177 178 179 180 181 182
	HeapTuple	lTuple,
				rTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key;
	Buffer		b;
	Datum		d,
				value[3];
	bool		isnull;
	char		repl[3],
				nulls[3];

	char	   *notifyName;
183

184
#ifdef ASYNC_DEBUG
185
	elog(DEBUG, "Async_Notify: %s", relname);
186
#endif
187 188 189 190 191 192 193 194 195 196 197 198 199

	if (!pendingNotifies)
		pendingNotifies = DLNewList();

	/*
	 * Allocate memory from the global malloc pool because it needs to be
	 * referenced also when the transaction is finished.  DZ - 26-08-1996
	 */
	notifyName = strdup(relname);
	DLAddHead(pendingNotifies, DLNewElem(notifyName));

	ScanKeyEntryInitialize(&key, 0,
						   Anum_pg_listener_relname,
Bruce Momjian's avatar
Bruce Momjian committed
200
						   F_NAMEEQ,
201 202 203 204 205
						   PointerGetDatum(notifyName));

	lRel = heap_openr(ListenerRelationName);
	tdesc = RelationGetTupleDescriptor(lRel);
	RelationSetLockForWrite(lRel);
206
	sRel = heap_beginscan(lRel, 0, false, 1, &key);
207 208 209 210 211 212 213 214 215

	nulls[0] = nulls[1] = nulls[2] = ' ';
	repl[0] = repl[1] = repl[2] = ' ';
	repl[Anum_pg_listener_notify - 1] = 'r';
	value[0] = value[1] = value[2] = (Datum) 0;
	value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);

	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
	{
216
		d = heap_getattr(lTuple, Anum_pg_listener_notify,
217
						 tdesc, &isnull);
218 219 220 221 222 223
		if (!DatumGetInt32(d))
		{
			rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
			heap_replace(lRel, &lTuple->t_ctid, rTuple);
		}
		ReleaseBuffer(b);
224
	}
225 226 227 228
	heap_endscan(sRel);
	RelationUnsetLockForWrite(lRel);
	heap_close(lRel);
	notifyIssued = 1;
229 230 231 232 233 234
}

/*
 *--------------------------------------------------------------
 * Async_NotifyAtCommit --
 *
235 236 237
 *		Signal our corresponding frontend process on relations that
 *		were notified.	Signal all other backend process that
 *		are listening also.
238
 *
239
 *		-- jw, 12/28/93
240 241
 *
 * Results:
242
 *		XXX
243 244
 *
 * Side effects:
245 246 247
 *		Tuples in pg_listener that has our listenerpid are updated so
 *		that the notification is 0.  We do not want to notify frontend
 *		more than once.
248
 *
249
 *		-- jw, 12/28/93
250 251 252 253 254 255
 *
 *--------------------------------------------------------------
 */
void
Async_NotifyAtCommit()
{
256 257 258 259 260 261 262 263
	HeapTuple	lTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key;
	Datum		d;
	bool		isnull;
	Buffer		b;
264 265 266 267 268 269 270 271 272 273 274 275 276
	extern TransactionState CurrentTransactionState;

	if (!pendingNotifies)
		pendingNotifies = DLNewList();

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{

		if (notifyIssued)
		{						/* 'notify <relname>' issued by us */
			notifyIssued = 0;
			StartTransactionCommand();
277
#ifdef ASYNC_DEBUG
278
			elog(DEBUG, "Async_NotifyAtCommit.");
279
#endif
280 281
			ScanKeyEntryInitialize(&key, 0,
								   Anum_pg_listener_notify,
Bruce Momjian's avatar
Bruce Momjian committed
282
								   F_INT4EQ,
283 284 285
								   Int32GetDatum(1));
			lRel = heap_openr(ListenerRelationName);
			RelationSetLockForWrite(lRel);
286
			sRel = heap_beginscan(lRel, 0, false, 1, &key);
287 288 289 290
			tdesc = RelationGetTupleDescriptor(lRel);

			while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
			{
291
				d = heap_getattr(lTuple, Anum_pg_listener_relname,
292
								 tdesc, &isnull);
293 294 295

				if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
				{
296
					d = heap_getattr(lTuple, Anum_pg_listener_pid,
297
									 tdesc, &isnull);
298

Bruce Momjian's avatar
Bruce Momjian committed
299
					if (MyProcPid == DatumGetInt32(d))
300
					{
301
#ifdef ASYNC_DEBUG
302
						elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
303
#endif
304 305 306 307
						notifyFrontEndPending = 1;
					}
					else
					{
308
#ifdef ASYNC_DEBUG
309
						elog(DEBUG, "Notifying others");
310
#endif
311
#ifdef HAVE_KILL
312 313 314 315 316 317 318 319 320
						if (kill(DatumGetInt32(d), SIGUSR2) < 0)
						{
							if (errno == ESRCH)
								heap_delete(lRel, &lTuple->t_ctid);
						}
#endif
					}
				}
				ReleaseBuffer(b);
321
			}
322 323 324 325 326 327
			heap_endscan(sRel);
			RelationUnsetLockForWrite(lRel);
			heap_close(lRel);

			CommitTransactionCommand();
			ClearPendingNotify();
328
		}
329

330 331 332 333 334 335
		if (notifyFrontEndPending)
		{						/* we need to notify the frontend of all
								 * pending notifies. */
			notifyFrontEndPending = 1;
			Async_NotifyFrontEnd();
		}
336 337 338 339 340 341 342
	}
}

/*
 *--------------------------------------------------------------
 * Async_NotifyAtAbort --
 *
343 344 345
 *		Gets rid of pending notifies.  List elements are automatically
 *		freed through memory context.
 *
346 347
 *
 * Results:
348
 *		XXX
349 350
 *
 * Side effects:
351
 *		XXX
352 353 354 355 356 357
 *
 *--------------------------------------------------------------
 */
void
Async_NotifyAtAbort()
{
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
	extern TransactionState CurrentTransactionState;

	if (notifyIssued)
		ClearPendingNotify();
	notifyIssued = 0;
	if (pendingNotifies)
		DLFreeList(pendingNotifies);
	pendingNotifies = DLNewList();

	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
		(CurrentTransactionState->blockState == TRANS_DEFAULT))
	{
		if (notifyFrontEndPending)
		{						/* don't forget to notify front end */
			Async_NotifyFrontEnd();
		}
374 375 376 377 378 379 380
	}
}

/*
 *--------------------------------------------------------------
 * Async_Listen --
 *
381 382
 *		Register a backend (identified by its Unix PID) as listening
 *		on the specified relation.
383
 *
384
 *		This corresponds to the 'listen <relation>' command in SQL
385
 *
386 387
 *		One listener per relation, pg_listener relation is keyed
 *		on (relname,pid) to provide multiple listeners in future.
388 389
 *
 * Results:
390
 *		pg_listeners is updated.
391 392
 *
 * Side effects:
393
 *		XXX
394 395 396 397 398 399
 *
 *--------------------------------------------------------------
 */
void
Async_Listen(char *relname, int pid)
{
400 401 402 403 404 405 406 407 408 409 410 411 412 413
	Datum		values[Natts_pg_listener];
	char		nulls[Natts_pg_listener];
	TupleDesc	tdesc;
	HeapScanDesc s;
	HeapTuple	htup,
				tup;
	Relation	lDesc;
	Buffer		b;
	Datum		d;
	int			i;
	bool		isnull;
	int			alreadyListener = 0;
	char	   *relnamei;
	TupleDesc	tupDesc;
414

415
#ifdef ASYNC_DEBUG
416
	elog(DEBUG, "Async_Listen: %s", relname);
417
#endif
418 419 420 421
	for (i = 0; i < Natts_pg_listener; i++)
	{
		nulls[i] = ' ';
		values[i] = PointerGetDatum(NULL);
422
	}
423 424 425 426 427 428 429 430 431 432 433

	i = 0;
	values[i++] = (Datum) relname;
	values[i++] = (Datum) pid;
	values[i++] = (Datum) 0;	/* no notifies pending */

	lDesc = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lDesc);

	/* is someone already listening.  One listener per relation */
	tdesc = RelationGetTupleDescriptor(lDesc);
434
	s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
435 436
	while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
	{
437
		d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
438
						 &isnull);
439 440 441
		relnamei = DatumGetPointer(d);
		if (!strncmp(relnamei, relname, NAMEDATALEN))
		{
442
			d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
443
			pid = DatumGetInt32(d);
Bruce Momjian's avatar
Bruce Momjian committed
444
			if (pid == MyProcPid)
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
				alreadyListener = 1;
		}
		ReleaseBuffer(b);
	}
	heap_endscan(s);

	if (alreadyListener)
	{
		elog(NOTICE, "Async_Listen: We are already listening on %s",
			 relname);
		return;
	}

	tupDesc = lDesc->rd_att;
	tup = heap_formtuple(tupDesc,
						 values,
						 nulls);
	heap_insert(lDesc, tup);

	pfree(tup);

	/*
	 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
	 * listener on %s (possibly dead)",relname); }
	 */

	RelationUnsetLockForWrite(lDesc);
	heap_close(lDesc);

	/*
	 * now that we are listening, we should make a note to ourselves to
	 * unlisten prior to dying.
	 */
	relnamei = malloc(NAMEDATALEN);		/* persists to process exit */
479
	StrNCpy(relnamei, relname, NAMEDATALEN);
480
	on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
481 482 483 484 485 486
}

/*
 *--------------------------------------------------------------
 * Async_Unlisten --
 *
487 488 489 490 491
 *		Remove the backend from the list of listening backends
 *		for the specified relation.
 *
 *		This would correspond to the 'unlisten <relation>'
 *		command, but there isn't one yet.
492 493
 *
 * Results:
494
 *		pg_listeners is updated.
495 496
 *
 * Side effects:
497
 *		XXX
498 499 500
 *
 *--------------------------------------------------------------
 */
Bruce Momjian's avatar
Bruce Momjian committed
501
void
502 503
Async_Unlisten(char *relname, int pid)
{
504 505
	Relation	lDesc;
	HeapTuple	lTuple;
506 507 508 509 510 511 512 513 514 515 516 517

	lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
								 Int32GetDatum(pid),
								 0, 0);
	lDesc = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lDesc);

	if (lTuple != NULL)
		heap_delete(lDesc, &lTuple->t_ctid);

	RelationUnsetLockForWrite(lDesc);
	heap_close(lDesc);
518 519
}

520
static void
521
Async_UnlistenOnExit(int code,	/* from exitpg */
522
					 char *relname)
523
{
Bruce Momjian's avatar
Bruce Momjian committed
524
	Async_Unlisten((char *) relname, MyProcPid);
525 526 527 528 529 530
}

/*
 * --------------------------------------------------------------
 * Async_NotifyFrontEnd --
 *
531 532 533
 *		Perform an asynchronous notification to front end over
 *		portal comm channel.  The name of the relation which contains the
 *		data is sent to the front end.
534
 *
535 536
 *		We remove the notification flag from the pg_listener tuple
 *		associated with our process.
537 538
 *
 * Results:
539
 *		XXX
540 541 542
 *
 * --------------------------------------------------------------
 */
543
GlobalMemory notifyContext = NULL;
544

545
static void
546 547
Async_NotifyFrontEnd()
{
548
	extern CommandDest whereToSendOutput;
549 550 551 552 553 554 555 556 557 558 559 560
	HeapTuple	lTuple,
				rTuple;
	Relation	lRel;
	HeapScanDesc sRel;
	TupleDesc	tdesc;
	ScanKeyData key[2];
	Datum		d,
				value[3];
	char		repl[3],
				nulls[3];
	Buffer		b;
	bool		isnull;
561 562 563

	notifyFrontEndPending = 0;

564
#ifdef ASYNC_DEBUG
565
	elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
566
#endif
567 568 569 570

	StartTransactionCommand();
	ScanKeyEntryInitialize(&key[0], 0,
						   Anum_pg_listener_notify,
Bruce Momjian's avatar
Bruce Momjian committed
571
						   F_INT4EQ,
572 573 574
						   Int32GetDatum(1));
	ScanKeyEntryInitialize(&key[1], 0,
						   Anum_pg_listener_pid,
Bruce Momjian's avatar
Bruce Momjian committed
575
						   F_INT4EQ,
Bruce Momjian's avatar
Bruce Momjian committed
576
						   Int32GetDatum(MyProcPid));
577 578 579
	lRel = heap_openr(ListenerRelationName);
	RelationSetLockForWrite(lRel);
	tdesc = RelationGetTupleDescriptor(lRel);
580
	sRel = heap_beginscan(lRel, 0, false, 2, key);
581 582 583 584 585 586 587 588 589

	nulls[0] = nulls[1] = nulls[2] = ' ';
	repl[0] = repl[1] = repl[2] = ' ';
	repl[Anum_pg_listener_notify - 1] = 'r';
	value[0] = value[1] = value[2] = (Datum) 0;
	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);

	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
	{
590
		d = heap_getattr(lTuple, Anum_pg_listener_relname,
591
						 tdesc, &isnull);
592 593 594 595 596 597 598 599
		rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
		heap_replace(lRel, &lTuple->t_ctid, rTuple);

		/* notifying the front end */

		if (whereToSendOutput == Remote)
		{
			pq_putnchar("A", 1);
600
			pq_putint((int32) MyProcPid, sizeof(int32));
601 602 603 604 605 606
			pq_putstr(DatumGetName(d)->data);
			pq_flush();
		}
		else
			elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
		ReleaseBuffer(b);
607
	}
608
	CommitTransactionCommand();
609 610 611 612 613
}

static int
AsyncExistsPendingNotify(char *relname)
{
614
	Dlelem	   *p;
615 616 617 618 619 620

	for (p = DLGetHead(pendingNotifies);
		 p != NULL;
		 p = DLGetSucc(p))
	{
		/* Use NAMEDATALEN for relname comparison.	  DZ - 26-08-1996 */
621
		if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
622 623 624 625
			return 1;
	}

	return 0;
626 627 628 629 630
}

static void
ClearPendingNotify()
{
631
	Dlelem	   *p;
632

633 634 635
	while ((p = DLRemHead(pendingNotifies)) != NULL)
		free(DLE_VAL(p));
}