Commit d1e02722 authored by Tom Lane's avatar Tom Lane

Replace the pg_listener-based LISTEN/NOTIFY mechanism with an in-memory queue.

In addition, add support for a "payload" string to be passed along with
each notify event.

This implementation should be significantly more efficient than the old one,
and is also more compatible with Hot Standby usage.  There is not yet any
facility for HS slaves to receive notifications generated on the master,
although such a thing is possible in future.

Joachim Wieland, reviewed by Jeff Davis; also hacked on by me.
parent fc5173ad
<!-- $PostgreSQL: pgsql/doc/src/sgml/catalogs.sgml,v 2.221 2010/02/07 20:48:09 tgl Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/catalogs.sgml,v 2.222 2010/02/16 22:34:41 tgl Exp $ -->
<!-- <!--
Documentation of the system catalogs, directed toward PostgreSQL developers Documentation of the system catalogs, directed toward PostgreSQL developers
--> -->
...@@ -168,11 +168,6 @@ ...@@ -168,11 +168,6 @@
<entry>metadata for large objects</entry> <entry>metadata for large objects</entry>
</row> </row>
<row>
<entry><link linkend="catalog-pg-listener"><structname>pg_listener</structname></link></entry>
<entry>asynchronous notification support</entry>
</row>
<row> <row>
<entry><link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link></entry> <entry><link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link></entry>
<entry>schemas</entry> <entry>schemas</entry>
...@@ -3253,68 +3248,6 @@ ...@@ -3253,68 +3248,6 @@
</table> </table>
</sect1> </sect1>
<sect1 id="catalog-pg-listener">
<title><structname>pg_listener</structname></title>
<indexterm zone="catalog-pg-listener">
<primary>pg_listener</primary>
</indexterm>
<para>
The catalog <structname>pg_listener</structname> supports the
<xref linkend="sql-listen" endterm="sql-listen-title"> and
<xref linkend="sql-notify" endterm="sql-notify-title">
commands. A listener creates an entry in
<structname>pg_listener</structname> for each notification name
it is listening for. A notifier scans <structname>pg_listener</structname>
and updates each matching entry to show that a notification has occurred.
The notifier also sends a signal (using the PID recorded in the table)
to awaken the listener from sleep.
</para>
<table>
<title><structname>pg_listener</> Columns</title>
<tgroup cols="3">
<thead>
<row>
<entry>Name</entry>
<entry>Type</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry><structfield>relname</structfield></entry>
<entry><type>name</type></entry>
<entry>
Notify condition name. (The name need not match any actual
relation in the database; the name <structfield>relname</> is historical.)
</entry>
</row>
<row>
<entry><structfield>listenerpid</structfield></entry>
<entry><type>int4</type></entry>
<entry>PID of the server process that created this entry</entry>
</row>
<row>
<entry><structfield>notification</structfield></entry>
<entry><type>int4</type></entry>
<entry>
Zero if no event is pending for this listener. If an event is
pending, the PID of the server process that sent the notification
</entry>
</row>
</tbody>
</tgroup>
</table>
</sect1>
<sect1 id="catalog-pg-namespace"> <sect1 id="catalog-pg-namespace">
<title><structname>pg_namespace</structname></title> <title><structname>pg_namespace</structname></title>
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/func.sgml,v 1.503 2010/02/16 21:18:01 momjian Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/func.sgml,v 1.504 2010/02/16 22:34:42 tgl Exp $ -->
<chapter id="functions"> <chapter id="functions">
<title>Functions and Operators</title> <title>Functions and Operators</title>
...@@ -11529,6 +11529,12 @@ postgres=# select * from unnest2(array[[1,2],[3,4]]); ...@@ -11529,6 +11529,12 @@ postgres=# select * from unnest2(array[[1,2],[3,4]]);
</entry> </entry>
</row> </row>
<row>
<entry><literal><function>pg_listening_channels</function>()</literal></entry>
<entry><type>setof text</type></entry>
<entry>channel names that the session is currently listening on</entry>
</row>
<row> <row>
<entry><literal><function>inet_client_addr</function>()</literal></entry> <entry><literal><function>inet_client_addr</function>()</literal></entry>
<entry><type>inet</type></entry> <entry><type>inet</type></entry>
...@@ -11674,6 +11680,16 @@ SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, .. ...@@ -11674,6 +11680,16 @@ SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, ..
</para> </para>
</note> </note>
<indexterm>
<primary>pg_listening_channels</primary>
</indexterm>
<para>
<function>pg_listening_channels</function> returns a set of names of
channels that the current session is listening to. See <xref
linkend="sql-listen" endterm="sql-listen-title"> for more information.
</para>
<indexterm> <indexterm>
<primary>inet_client_addr</primary> <primary>inet_client_addr</primary>
</indexterm> </indexterm>
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/libpq.sgml,v 1.298 2010/02/16 20:58:13 momjian Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/libpq.sgml,v 1.299 2010/02/16 22:34:42 tgl Exp $ -->
<chapter id="libpq"> <chapter id="libpq">
<title><application>libpq</application> - C Library</title> <title><application>libpq</application> - C Library</title>
...@@ -4111,50 +4111,48 @@ typedef struct { ...@@ -4111,50 +4111,48 @@ typedef struct {
<productname>PostgreSQL</productname> offers asynchronous notification <productname>PostgreSQL</productname> offers asynchronous notification
via the <command>LISTEN</command> and <command>NOTIFY</command> via the <command>LISTEN</command> and <command>NOTIFY</command>
commands. A client session registers its interest in a particular commands. A client session registers its interest in a particular
notification condition with the <command>LISTEN</command> command (and notification channel with the <command>LISTEN</command> command (and
can stop listening with the <command>UNLISTEN</command> command). All can stop listening with the <command>UNLISTEN</command> command). All
sessions listening on a particular condition will be notified sessions listening on a particular channel will be notified
asynchronously when a <command>NOTIFY</command> command with that asynchronously when a <command>NOTIFY</command> command with that
condition name is executed by any session. No additional information channel name is executed by any session. A <quote>payload</> string can
is passed from the notifier to the listener. Thus, typically, any be passed to communicate additional data to the listeners.
actual data that needs to be communicated is transferred through a
database table. Commonly, the condition name is the same as the
associated table, but it is not necessary for there to be any associated
table.
</para> </para>
<para> <para>
<application>libpq</application> applications submit <application>libpq</application> applications submit
<command>LISTEN</command> and <command>UNLISTEN</command> commands as <command>LISTEN</command>, <command>UNLISTEN</command>,
and <command>NOTIFY</command> commands as
ordinary SQL commands. The arrival of <command>NOTIFY</command> ordinary SQL commands. The arrival of <command>NOTIFY</command>
messages can subsequently be detected by calling messages can subsequently be detected by calling
<function>PQnotifies</function>.<indexterm><primary>PQnotifies</></> <function>PQnotifies</function>.<indexterm><primary>PQnotifies</></>
</para> </para>
<para> <para>
The function <function>PQnotifies</function> The function <function>PQnotifies</function> returns the next notification
returns the next notification from a list of unhandled from a list of unhandled notification messages received from the server.
notification messages received from the server. It returns a null pointer if It returns a null pointer if there are no pending notifications. Once a
there are no pending notifications. Once a notification is notification is returned from <function>PQnotifies</>, it is considered
returned from <function>PQnotifies</>, it is considered handled and will be handled and will be removed from the list of notifications.
removed from the list of notifications.
<synopsis> <synopsis>
PGnotify *PQnotifies(PGconn *conn); PGnotify *PQnotifies(PGconn *conn);
typedef struct pgNotify { typedef struct pgNotify {
char *relname; /* notification condition name */ char *relname; /* notification channel name */
int be_pid; /* process ID of notifying server process */ int be_pid; /* process ID of notifying server process */
char *extra; /* notification parameter */ char *extra; /* notification payload string */
} PGnotify; } PGnotify;
</synopsis> </synopsis>
After processing a <structname>PGnotify</structname> object returned After processing a <structname>PGnotify</structname> object returned
by <function>PQnotifies</function>, be sure to free it with by <function>PQnotifies</function>, be sure to free it with
<function>PQfreemem</function>. It is sufficient to free the <function>PQfreemem</function>. It is sufficient to free the
<structname>PGnotify</structname> pointer; the <structname>PGnotify</structname> pointer; the
<structfield>relname</structfield> and <structfield>extra</structfield> <structfield>relname</structfield> and <structfield>extra</structfield>
fields do not represent separate allocations. (At present, the fields do not represent separate allocations. (The names of these fields
<structfield>extra</structfield> field is unused and will always point are historical; in particular, channel names need not have anything to
to an empty string.) do with relation names.)
</para> </para>
<para> <para>
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.80 2010/02/16 20:58:14 momjian Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.81 2010/02/16 22:34:43 tgl Exp $ -->
<chapter id="protocol"> <chapter id="protocol">
<title>Frontend/Backend Protocol</title> <title>Frontend/Backend Protocol</title>
...@@ -1117,7 +1117,7 @@ ...@@ -1117,7 +1117,7 @@
backend will send a NotificationResponse message (not to be backend will send a NotificationResponse message (not to be
confused with NoticeResponse!) whenever a confused with NoticeResponse!) whenever a
<command>NOTIFY</command> command is executed for the same <command>NOTIFY</command> command is executed for the same
notification name. channel name.
</para> </para>
<note> <note>
...@@ -3187,7 +3187,7 @@ NotificationResponse (B) ...@@ -3187,7 +3187,7 @@ NotificationResponse (B)
</term> </term>
<listitem> <listitem>
<para> <para>
The name of the condition that the notify has been raised on. The name of the channel that the notify has been raised on.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -3197,9 +3197,7 @@ NotificationResponse (B) ...@@ -3197,9 +3197,7 @@ NotificationResponse (B)
</term> </term>
<listitem> <listitem>
<para> <para>
Additional information passed from the notifying process. The <quote>payload</> string passed from the notifying process.
(Currently, this feature is unimplemented so the field
is always an empty string.)
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -4353,7 +4351,7 @@ the backend. ...@@ -4353,7 +4351,7 @@ the backend.
<para> <para>
The NotificationResponse ('<literal>A</>') message has an additional string The NotificationResponse ('<literal>A</>') message has an additional string
field, which is presently empty but might someday carry additional data passed field, which can carry a <quote>payload</> string passed
from the <command>NOTIFY</command> event sender. from the <command>NOTIFY</command> event sender.
</para> </para>
...@@ -4364,5 +4362,4 @@ string parameter; this has been removed. ...@@ -4364,5 +4362,4 @@ string parameter; this has been removed.
</sect1> </sect1>
</chapter> </chapter>
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/listen.sgml,v 1.23 2008/11/14 10:22:47 petere Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/listen.sgml,v 1.24 2010/02/16 22:34:43 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -21,7 +21,7 @@ PostgreSQL documentation ...@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
LISTEN <replaceable class="PARAMETER">name</replaceable> LISTEN <replaceable class="PARAMETER">channel</replaceable>
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -30,24 +30,23 @@ LISTEN <replaceable class="PARAMETER">name</replaceable> ...@@ -30,24 +30,23 @@ LISTEN <replaceable class="PARAMETER">name</replaceable>
<para> <para>
<command>LISTEN</command> registers the current session as a <command>LISTEN</command> registers the current session as a
listener on the notification condition <replaceable listener on the notification channel named <replaceable
class="PARAMETER">name</replaceable>. class="PARAMETER">channel</replaceable>.
If the current session is already registered as a listener for If the current session is already registered as a listener for
this notification condition, nothing is done. this notification channel, nothing is done.
</para> </para>
<para> <para>
Whenever the command <command>NOTIFY <replaceable Whenever the command <command>NOTIFY <replaceable
class="PARAMETER">name</replaceable></command> is invoked, either class="PARAMETER">channel</replaceable></command> is invoked, either
by this session or another one connected to the same database, all by this session or another one connected to the same database, all
the sessions currently listening on that notification condition are the sessions currently listening on that notification channel are
notified, and each will in turn notify its connected client notified, and each will in turn notify its connected client
application. See the discussion of <command>NOTIFY</command> for application.
more information.
</para> </para>
<para> <para>
A session can be unregistered for a given notify condition with the A session can be unregistered for a given notification channel with the
<command>UNLISTEN</command> command. A session's listen <command>UNLISTEN</command> command. A session's listen
registrations are automatically cleared when the session ends. registrations are automatically cleared when the session ends.
</para> </para>
...@@ -78,16 +77,31 @@ LISTEN <replaceable class="PARAMETER">name</replaceable> ...@@ -78,16 +77,31 @@ LISTEN <replaceable class="PARAMETER">name</replaceable>
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><replaceable class="PARAMETER">name</replaceable></term> <term><replaceable class="PARAMETER">channel</replaceable></term>
<listitem> <listitem>
<para> <para>
Name of a notify condition (any identifier). Name of a notification channel (any identifier).
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
</refsect1> </refsect1>
<refsect1>
<title>Notes</title>
<para>
<command>LISTEN</command> takes effect at transaction commit.
If <command>LISTEN</command> or <command>UNLISTEN</command> is executed
within a transaction that later rolls back, the set of notification
channels being listened to is unchanged.
</para>
<para>
A transaction that has executed <command>LISTEN</command> cannot be
prepared for two-phase commit.
</para>
</refsect1>
<refsect1> <refsect1>
<title>Examples</title> <title>Examples</title>
......
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/notify.sgml,v 1.31 2008/11/14 10:22:47 petere Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/notify.sgml,v 1.32 2010/02/16 22:34:43 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -21,7 +21,7 @@ PostgreSQL documentation ...@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
NOTIFY <replaceable class="PARAMETER">name</replaceable> NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -29,35 +29,39 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable> ...@@ -29,35 +29,39 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable>
<title>Description</title> <title>Description</title>
<para> <para>
The <command>NOTIFY</command> command sends a notification event to each The <command>NOTIFY</command> command sends a notification event together
client application that has previously executed with an optional <quote>payload</> string to each client application that
<command>LISTEN <replaceable class="parameter">name</replaceable></command> has previously executed
for the specified notification name in the current database. <command>LISTEN <replaceable class="parameter">channel</></command>
for the specified channel name in the current database.
</para> </para>
<para> <para>
<command>NOTIFY</command> provides a simple form of signal or <command>NOTIFY</command> provides a simple
interprocess communication mechanism for a collection of processes interprocess communication mechanism for a collection of processes
accessing the same <productname>PostgreSQL</productname> database. accessing the same <productname>PostgreSQL</productname> database.
Higher-level mechanisms can be built by using tables in the database to A payload string can be sent along with the notification, and
pass additional data (beyond a mere notification name) from notifier to higher-level mechanisms for passing structured data can be built by using
listener(s). tables in the database to pass additional data from notifier to listener(s).
</para> </para>
<para> <para>
The information passed to the client for a notification event includes the notification The information passed to the client for a notification event includes the
name and the notifying session's server process <acronym>PID</>. It is up to the notification channel
database designer to define the notification names that will be used in a given name, the notifying session's server process <acronym>PID</>, and the
database and what each one means. payload string, which is an empty string if it has not been specified.
</para> </para>
<para> <para>
Commonly, the notification name is the same as the name of some table in It is up to the database designer to define the channel names that will
be used in a given database and what each one means.
Commonly, the channel name is the same as the name of some table in
the database, and the notify event essentially means, <quote>I changed this table, the database, and the notify event essentially means, <quote>I changed this table,
take a look at it to see what's new</quote>. But no such association is enforced by take a look at it to see what's new</quote>. But no such association is enforced by
the <command>NOTIFY</command> and <command>LISTEN</command> commands. For the <command>NOTIFY</command> and <command>LISTEN</command> commands. For
example, a database designer could use several different notification names example, a database designer could use several different channel names
to signal different sorts of changes to a single table. to signal different sorts of changes to a single table. Alternatively,
the payload string could be used to differentiate various cases.
</para> </para>
<para> <para>
...@@ -89,19 +93,22 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable> ...@@ -89,19 +93,22 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable>
</para> </para>
<para> <para>
<command>NOTIFY</command> behaves like Unix signals in one important If the same channel name is signaled multiple times from the same
respect: if the same notification name is signaled multiple times in quick transaction with identical payload strings, the
succession, recipients might get only one notification event for several executions database server can decide to deliver a single notification only.
of <command>NOTIFY</command>. So it is a bad idea to depend on the number On the other hand, notifications with distinct payload strings will
of notifications received. Instead, use <command>NOTIFY</command> to wake up always be delivered as distinct notifications. Similarly, notifications from
applications that need to pay attention to something, and use a database different transactions will never get folded into one notification.
object (such as a sequence) to keep track of what happened or how many times Except for dropping later instances of duplicate notifications,
it happened. <command>NOTIFY</command> guarantees that notifications from the same
transaction get delivered in the order they were sent. It is also
guaranteed that messages from different transactions are delivered in
the order in which the transactions committed.
</para> </para>
<para> <para>
It is common for a client that executes <command>NOTIFY</command> It is common for a client that executes <command>NOTIFY</command>
to be listening on the same notification name itself. In that case to be listening on the same notification channel itself. In that case
it will get back a notification event, just like all the other it will get back a notification event, just like all the other
listening sessions. Depending on the application logic, this could listening sessions. Depending on the application logic, this could
result in useless work, for example, reading a database table to result in useless work, for example, reading a database table to
...@@ -111,12 +118,7 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable> ...@@ -111,12 +118,7 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable>
notification event message) is the same as one's own session's notification event message) is the same as one's own session's
<acronym>PID</> (available from <application>libpq</>). When they <acronym>PID</> (available from <application>libpq</>). When they
are the same, the notification event is one's own work bouncing are the same, the notification event is one's own work bouncing
back, and can be ignored. (Despite what was said in the preceding back, and can be ignored.
paragraph, this is a safe technique.
<productname>PostgreSQL</productname> keeps self-notifications
separate from notifications arriving from other sessions, so you
cannot miss an outside notification by ignoring your own
notifications.)
</para> </para>
</refsect1> </refsect1>
...@@ -125,16 +127,61 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable> ...@@ -125,16 +127,61 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable>
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><replaceable class="PARAMETER">name</replaceable></term> <term><replaceable class="PARAMETER">channel</replaceable></term>
<listitem> <listitem>
<para> <para>
Name of the notification to be signaled (any identifier). Name of the notification channel to be signaled (any identifier).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">payload</replaceable></term>
<listitem>
<para>
The <quote>payload</> string to be communicated along with the
notification. This string must be shorter than 8000 bytes, and
is treated as text.
(If binary data or large amounts of information need to be communicated,
it's best to put it in a database table and send the key of the record.)
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
</refsect1> </refsect1>
<refsect1>
<title>Notes</title>
<indexterm>
<primary>pg_notify</primary>
</indexterm>
<para>
To send a notification you can also use the function
<literal><function>pg_notify</function>(<type>text</type>,
<type>text</type>)</literal>. The function takes the channel name as the
first argument and the payload as the second. The function is much easier
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
<para>
There is a queue that holds notifications that have been sent but not
yet processed by all listening sessions. If this queue becomes full,
transactions calling <command>NOTIFY</command> will fail at commit.
The queue is quite large (8GB in a standard installation) and should be
sufficiently sized for almost every use case. However, no cleanup can take
place if a session executes <command>LISTEN</command> and then enters a
transaction for a very long time. Once the queue is half full you will see
warnings in the log file pointing you to the session that is preventing
cleanup. In this case you should make sure that this session ends its
current transaction so that cleanup can proceed.
</para>
<para>
A transaction that has executed <command>NOTIFY</command> cannot be
prepared for two-phase commit.
</para>
</refsect1>
<refsect1> <refsect1>
<title>Examples</title> <title>Examples</title>
...@@ -146,6 +193,12 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable> ...@@ -146,6 +193,12 @@ NOTIFY <replaceable class="PARAMETER">name</replaceable>
LISTEN virtual; LISTEN virtual;
NOTIFY virtual; NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448. Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
</programlisting> </programlisting>
</para> </para>
</refsect1> </refsect1>
......
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/unlisten.sgml,v 1.30 2008/11/14 10:22:47 petere Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/unlisten.sgml,v 1.31 2010/02/16 22:34:43 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -21,7 +21,7 @@ PostgreSQL documentation ...@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * } UNLISTEN { <replaceable class="PARAMETER">channel</replaceable> | * }
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -33,8 +33,8 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * } ...@@ -33,8 +33,8 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * }
registration for <command>NOTIFY</command> events. registration for <command>NOTIFY</command> events.
<command>UNLISTEN</command> cancels any existing registration of <command>UNLISTEN</command> cancels any existing registration of
the current <productname>PostgreSQL</productname> session as a the current <productname>PostgreSQL</productname> session as a
listener on the notification <replaceable listener on the notification channel named <replaceable
class="PARAMETER">name</replaceable>. The special wildcard class="PARAMETER">channel</replaceable>. The special wildcard
<literal>*</literal> cancels all listener registrations for the <literal>*</literal> cancels all listener registrations for the
current session. current session.
</para> </para>
...@@ -52,10 +52,10 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * } ...@@ -52,10 +52,10 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * }
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><replaceable class="PARAMETER">name</replaceable></term> <term><replaceable class="PARAMETER">channel</replaceable></term>
<listitem> <listitem>
<para> <para>
Name of a notification (any identifier). Name of a notification channel (any identifier).
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -83,6 +83,11 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * } ...@@ -83,6 +83,11 @@ UNLISTEN { <replaceable class="PARAMETER">name</replaceable> | * }
At the end of each session, <command>UNLISTEN *</command> is At the end of each session, <command>UNLISTEN *</command> is
automatically executed. automatically executed.
</para> </para>
<para>
A transaction that has executed <command>UNLISTEN</command> cannot be
prepared for two-phase commit.
</para>
</refsect1> </refsect1>
<refsect1> <refsect1>
...@@ -100,7 +105,7 @@ Asynchronous notification "virtual" received from server process with PID 8448. ...@@ -100,7 +105,7 @@ Asynchronous notification "virtual" received from server process with PID 8448.
<para> <para>
Once <command>UNLISTEN</> has been executed, further <command>NOTIFY</> Once <command>UNLISTEN</> has been executed, further <command>NOTIFY</>
commands will be ignored: messages will be ignored:
<programlisting> <programlisting>
UNLISTEN virtual; UNLISTEN virtual;
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/storage.sgml,v 1.31 2010/02/07 20:48:09 tgl Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/storage.sgml,v 1.32 2010/02/16 22:34:43 tgl Exp $ -->
<chapter id="storage"> <chapter id="storage">
...@@ -77,6 +77,11 @@ Item ...@@ -77,6 +77,11 @@ Item
(used for shared row locks)</entry> (used for shared row locks)</entry>
</row> </row>
<row>
<entry><filename>pg_notify</></entry>
<entry>Subdirectory containing LISTEN/NOTIFY status data</entry>
</row>
<row> <row>
<entry><filename>pg_stat_tmp</></entry> <entry><filename>pg_stat_tmp</></entry>
<entry>Subdirectory containing temporary files for the statistics <entry>Subdirectory containing temporary files for the statistics
......
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.48 2010/01/02 16:57:35 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.49 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -59,25 +59,6 @@ ...@@ -59,25 +59,6 @@
#include "miscadmin.h" #include "miscadmin.h"
/*
* Define segment size. A page is the same BLCKSZ as is used everywhere
* else in Postgres. The segment size can be chosen somewhat arbitrarily;
* we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
* or 64K transactions for SUBTRANS.
*
* Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
* page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
* xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
* 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need
* take no explicit notice of that fact in this module, except when comparing
* segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
*
* Note: this file currently assumes that segment file names will be four
* hex digits. This sets a lower bound on the segment size (64K transactions
* for 32-bit TransactionIds).
*/
#define SLRU_PAGES_PER_SEGMENT 32
#define SlruFileName(ctl, path, seg) \ #define SlruFileName(ctl, path, seg) \
snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg) snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
...@@ -183,6 +164,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ...@@ -183,6 +164,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
shared = (SlruShared) ShmemInitStruct(name, shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots, nlsns), SimpleLruShmemSize(nslots, nlsns),
&found); &found);
if (!shared)
elog(ERROR, "out of shared memory");
if (!IsUnderPostmaster) if (!IsUnderPostmaster)
{ {
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.12 2010/01/02 16:57:35 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.13 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include "access/multixact.h" #include "access/multixact.h"
#include "access/twophase_rmgr.h" #include "access/twophase_rmgr.h"
#include "commands/async.h"
#include "pgstat.h" #include "pgstat.h"
#include "storage/lock.h" #include "storage/lock.h"
...@@ -25,7 +24,6 @@ const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = ...@@ -25,7 +24,6 @@ const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{ {
NULL, /* END ID */ NULL, /* END ID */
lock_twophase_recover, /* Lock */ lock_twophase_recover, /* Lock */
NULL, /* notify/listen */
NULL, /* pgstat */ NULL, /* pgstat */
multixact_twophase_recover /* MultiXact */ multixact_twophase_recover /* MultiXact */
}; };
...@@ -34,7 +32,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = ...@@ -34,7 +32,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{ {
NULL, /* END ID */ NULL, /* END ID */
lock_twophase_postcommit, /* Lock */ lock_twophase_postcommit, /* Lock */
notify_twophase_postcommit, /* notify/listen */
pgstat_twophase_postcommit, /* pgstat */ pgstat_twophase_postcommit, /* pgstat */
multixact_twophase_postcommit /* MultiXact */ multixact_twophase_postcommit /* MultiXact */
}; };
...@@ -43,7 +40,6 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = ...@@ -43,7 +40,6 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{ {
NULL, /* END ID */ NULL, /* END ID */
lock_twophase_postabort, /* Lock */ lock_twophase_postabort, /* Lock */
NULL, /* notify/listen */
pgstat_twophase_postabort, /* pgstat */ pgstat_twophase_postabort, /* pgstat */
multixact_twophase_postabort /* MultiXact */ multixact_twophase_postabort /* MultiXact */
}; };
...@@ -52,7 +48,6 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1 ...@@ -52,7 +48,6 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1
{ {
NULL, /* END ID */ NULL, /* END ID */
lock_twophase_standby_recover, /* Lock */ lock_twophase_standby_recover, /* Lock */
NULL, /* notify/listen */
NULL, /* pgstat */ NULL, /* pgstat */
NULL /* MultiXact */ NULL /* MultiXact */
}; };
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.285 2010/02/13 16:15:46 sriggs Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.286 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1736,8 +1736,12 @@ CommitTransaction(void) ...@@ -1736,8 +1736,12 @@ CommitTransaction(void)
/* close large objects before lower-level cleanup */ /* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true); AtEOXact_LargeObject(true);
/* NOTIFY commit must come before lower-level cleanup */ /*
AtCommit_Notify(); * Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
* holding the notify-insertion lock.
*/
PreCommit_Notify();
/* Prevent cancel/die interrupt while cleaning up */ /* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
...@@ -1825,6 +1829,7 @@ CommitTransaction(void) ...@@ -1825,6 +1829,7 @@ CommitTransaction(void)
/* Check we've released all catcache entries */ /* Check we've released all catcache entries */
AtEOXact_CatCache(true); AtEOXact_CatCache(true);
AtCommit_Notify();
AtEOXact_GUC(true, 1); AtEOXact_GUC(true, 1);
AtEOXact_SPI(true); AtEOXact_SPI(true);
AtEOXact_on_commit_actions(true); AtEOXact_on_commit_actions(true);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# Makefile for backend/catalog # Makefile for backend/catalog
# #
# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.76 2010/01/06 19:56:29 tgl Exp $ # $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.77 2010/02/16 22:34:43 tgl Exp $
# #
#------------------------------------------------------------------------- #-------------------------------------------------------------------------
...@@ -30,7 +30,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ ...@@ -30,7 +30,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \ pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \
pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \ pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
......
...@@ -7,109 +7,280 @@ ...@@ -7,109 +7,280 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.150 2010/01/02 16:57:36 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.151 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* New Async Notification Model: * Async Notification Model as of 9.0:
*
* 1. Multiple backends on same machine. Multiple backends listening on * 1. Multiple backends on same machine. Multiple backends listening on
* one relation. (Note: "listening on a relation" is not really the * several channels. (Channels are also called "conditions" in other
* right way to think about it, since the notify names need not have * parts of the code.)
* anything to do with the names of relations actually in the database. *
* But this terminology is all over the code and docs, and I don't feel * 2. There is one central queue in disk-based storage (directory pg_notify/),
* like trying to replace it.) * with actively-used pages mapped into shared memory by the slru.c module.
* * All notification messages are placed in the queue and later read out
* 2. There is a tuple in relation "pg_listener" for each active LISTEN, * by listening backends.
* ie, each relname/listenerPID pair. The "notification" field of the *
* tuple is zero when no NOTIFY is pending for that listener, or the PID * There is no central knowledge of which backend listens on which channel;
* of the originating backend when a cross-backend NOTIFY is pending. * every backend has its own list of interesting channels.
* (We skip writing to pg_listener when doing a self-NOTIFY, so the *
* notification field should never be equal to the listenerPID field.) * Although there is only one queue, notifications are treated as being
* * database-local; this is done by including the sender's database OID
* 3. The NOTIFY statement itself (routine Async_Notify) just adds the target * in each notification message. Listening backends ignore messages
* relname to a list of outstanding NOTIFY requests. Actual processing * that don't match their database OID. This is important because it
* happens if and only if we reach transaction commit. At that time (in * ensures senders and receivers have the same database encoding and won't
* routine AtCommit_Notify) we scan pg_listener for matching relnames. * misinterpret non-ASCII text in the channel name or payload string.
* If the listenerPID in a matching tuple is ours, we just send a notify *
* message to our own front end. If it is not ours, and "notification" * Since notifications are not expected to survive database crashes,
* is not already nonzero, we set notification to our own PID and send a * we can simply clean out the pg_notify data at any reboot, and there
* PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by * is no need for WAL support or fsync'ing.
* listenerPID). *
* BTW: if the signal operation fails, we presume that the listener backend * 3. Every backend that is listening on at least one channel registers by
* crashed without removing this tuple, and remove the tuple for it. * entering its PID into the array in AsyncQueueControl. It then scans all
* * incoming notifications in the central queue and first compares the
* 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * database OID of the notification with its own database OID and then
* compares the notified channel with the list of channels that it listens
* to. In case there is a match it delivers the notification event to its
* frontend. Non-matching events are simply skipped.
*
* 4. The NOTIFY statement (routine Async_Notify) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
* Duplicate notifications from the same transaction are sent out as one
* notification only. This is done to save work when for example a trigger
* on a 2 million row table fires a notification for each row that has been
* changed. If the application needs to receive every single notification
* that has been sent, it can easily add some unique string into the extra
* payload parameter.
*
* When the transaction is ready to commit, PreCommit_Notify() adds the
* pending notifications to the head of the queue. The head pointer of the
* queue always points to the next free position and a position is just a
* page number and the offset in that page. This is done before marking the
* transaction as committed in clog. If we run into problems writing the
* notifications, we can still call elog(ERROR, ...) and the transaction
* will roll back.
*
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
* make the actual updates to the effective listen state (listenChannels).
*
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that
* are already up to date, though. We don't bother with a self-signal
* either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle * can call inbound-notify processing immediately if this backend is idle
* (ie, it is waiting for a frontend command and is not within a transaction * (ie, it is waiting for a frontend command and is not within a transaction
* block). Otherwise the handler may only set a flag, which will cause the * block). Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle. * processing to occur just before we next go idle.
* *
* 5. Inbound-notify processing consists of scanning pg_listener for tuples * Inbound-notify processing consists of reading all of the notifications
* matching our own listenerPID and having nonzero notification fields. * that have arrived since scanning last time. We read every notification
* For each such tuple, we send a message to our frontend and clear the * until we reach either a notification from an uncommitted transaction or
* notification field. BTW: this routine has to start/commit its own * the head pointer's position. Then we check if we were the laziest
* transaction, since by assumption it is only called from outside any * backend: if our pointer is set to the same position as the global tail
* transaction. * pointer is set, then we move the global tail pointer ahead to where the
* * second-laziest backend is (in general, we take the MIN of the current
* Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list * head position and all active backends' new tail pointers). Whenever we
* of pending actions. If we reach transaction commit, the changes are * move the global tail pointer we also truncate now-unused pages (i.e.,
* applied to pg_listener just before executing any pending NOTIFYs. This * delete files in pg_notify/ that are no longer used).
* method is necessary because to avoid race conditions, we must hold lock *
* on pg_listener from when we insert a new listener tuple until we commit. * An application that listens on the same channel it notifies will get
* To do that and not create undue hazard of deadlock, we don't want to
* touch pg_listener until we are otherwise done with the transaction;
* in particular it'd be uncool to still be taking user-commanded locks
* while holding the pg_listener lock.
*
* Although we grab ExclusiveLock on pg_listener for any operation,
* the lock is never held very long, so it shouldn't cause too much of
* a performance problem. (Previously we used AccessExclusiveLock, but
* there's no real reason to forbid concurrent reads.)
*
* An application that listens on the same relname it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's * by comparing be_pid in the NOTIFY message to the application's own backend's
* PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from * frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies. Note, * other backends will never be missed by ignoring self-notifies.
* however, that we do *not* guarantee that a separate frontend message will *
* be sent for every outside NOTIFY. Since there is only room for one * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
* originating PID in pg_listener, outside notifies occurring at about the * can be varied without affecting anything but performance. The maximum
* same time may be collapsed into a single message bearing the PID of the * amount of notification data that can be queued at one time is determined
* first outside backend to perform the NOTIFY. * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include "access/heapam.h" #include "access/slru.h"
#include "access/twophase_rmgr.h" #include "access/transam.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/pg_listener.h" #include "catalog/pg_database.h"
#include "commands/async.h" #include "commands/async.h"
#include "funcapi.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
#include "utils/tqual.h"
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
* must be kept small enough so that a notification message fits on one
* SLRU page.
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH 8000
/*
* Struct representing an entry in the global notify queue
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
* (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
* The "length" field should always be rounded up to the next QUEUEALIGN
* multiple so that all fields are properly aligned.
*/
typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
/* Currently, no field of AsyncQueueEntry requires more than int alignment */
#define QUEUEALIGN(len) INTALIGN(len)
#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
/*
* Struct describing a queue position, and assorted macros for working with it
*/
typedef struct QueuePosition
{
int page; /* SLRU page number */
int offset; /* byte offset within page */
} QueuePosition;
#define QUEUE_POS_PAGE(x) ((x).page)
#define QUEUE_POS_OFFSET(x) ((x).offset)
#define SET_QUEUE_POS(x,y,z) \
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)
#define QUEUE_POS_EQUAL(x,y) \
((x).page == (y).page && (x).offset == (y).offset)
/* choose logically smaller QueuePosition */
#define QUEUE_POS_MIN(x,y) \
(asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
#define InvalidPid (-1)
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
*
* The AsyncQueueControl structure is protected by the AsyncQueueLock.
*
* When holding the lock in SHARED mode, backends may only inspect their own
* entries as well as the head and tail pointers. Consequently we can allow a
* backend to update its own record while holding only SHARED lock (since no
* other backend will inspect it).
*
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
*
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
* SendProcSignal fast.
*/
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* the global tail is equivalent to the
tail of the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
/*
* The SLRU buffer area through which we access the notification queue
*/
static SlruCtlData AsyncCtlData;
#define AsyncCtl (&AsyncCtlData)
#define QUEUE_PAGESIZE BLCKSZ
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
* slru.c currently assumes that all filenames are four characters of hex
* digits. That means that we can use segments 0000 through FFFF.
* Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
* the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
*
* It's of course possible to enhance slru.c, but this gives us so much
* space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
/*
* listenChannels identifies the channels we are actually listening to
* (ie, have committed a LISTEN on). It is a simple list of channel names,
* allocated in TopMemoryContext.
*/
static List *listenChannels = NIL; /* list of C strings */
/* /*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above, * all actions requested in the current transaction. As explained above,
* we don't actually modify pg_listener until we reach transaction commit. * we don't actually change listenChannels until we reach transaction commit.
* *
* The list is kept in CurTransactionContext. In subtransactions, each * The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but * subtransaction has its own list in its own CurTransactionContext, but
...@@ -126,7 +297,7 @@ typedef enum ...@@ -126,7 +297,7 @@ typedef enum
typedef struct typedef struct
{ {
ListenActionKind action; ListenActionKind action;
char condname[1]; /* actually, as long as needed */ char channel[1]; /* actually, as long as needed */
} ListenAction; } ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */ static List *pendingActions = NIL; /* list of ListenAction */
...@@ -134,9 +305,9 @@ static List *pendingActions = NIL; /* list of ListenAction */ ...@@ -134,9 +305,9 @@ static List *pendingActions = NIL; /* list of ListenAction */
static List *upperPendingActions = NIL; /* list of upper-xact lists */ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/* /*
* State for outbound notifies consists of a list of all relnames NOTIFYed * State for outbound notifies consists of a list of all channels+payloads
* in the current transaction. We do not actually perform a NOTIFY until * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
* and unless the transaction commits. pendingNotifies is NIL if no * until and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction. * NOTIFYs have been done in the current transaction.
* *
* The list is kept in CurTransactionContext. In subtransactions, each * The list is kept in CurTransactionContext. In subtransactions, each
...@@ -149,12 +320,18 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ ...@@ -149,12 +320,18 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
* condition name, it will get a self-notify at commit. This is a bit odd * condition name, it will get a self-notify at commit. This is a bit odd
* but is consistent with our historical behavior. * but is consistent with our historical behavior.
*/ */
static List *pendingNotifies = NIL; /* list of C strings */ typedef struct Notification
{
char *channel; /* channel name */
char *payload; /* payload string (can be empty) */
} Notification;
static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/* /*
* State for inbound notifies consists of two flags: one saying whether * State for inbound notifications consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify * the signal handler is currently allowed to call ProcessIncomingNotify
* directly, and one saying whether the signal has occurred but the handler * directly, and one saying whether the signal has occurred but the handler
* was not allowed to call ProcessIncomingNotify at the time. * was not allowed to call ProcessIncomingNotify at the time.
...@@ -168,57 +345,259 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0; ...@@ -168,57 +345,259 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */ /* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false; static bool unlistenExitRegistered = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
/* has this backend executed its first LISTEN in the current transaction? */
static bool backendHasExecutedInitialListen = false;
/* GUC parameter */
bool Trace_notify = false; bool Trace_notify = false;
/* local function prototypes */
static void queue_listen(ListenActionKind action, const char *condname); static bool asyncQueuePagePrecedesPhysically(int p, int q);
static bool asyncQueuePagePrecedesLogically(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg); static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_Listen(Relation lRel, const char *relname); static void Exec_ListenPreCommit(void);
static void Exec_Unlisten(Relation lRel, const char *relname); static void Exec_ListenCommit(const char *channel);
static void Exec_UnlistenAll(Relation lRel); static void Exec_UnlistenCommit(const char *channel);
static void Send_Notify(Relation lRel); static void Exec_UnlistenAllCommit(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
char *page_buffer);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void); static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID); static void NotifyMyFrontEnd(const char *channel,
static bool AsyncExistsPendingNotify(const char *relname); const char *payload,
int32 srcPid);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void); static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
*
* asyncQueuePagePrecedesPhysically just checks numerically without any magic
* if one page precedes another one. This is wrong for normal operation but
* is helpful when clearing pg_notify/ during startup.
*
* asyncQueuePagePrecedesLogically compares using wraparound logic, as is
* required by slru.c.
*/
static bool
asyncQueuePagePrecedesPhysically(int p, int q)
{
return p < q;
}
static bool
asyncQueuePagePrecedesLogically(int p, int q)
{
int diff;
/*
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should
* be in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
diff = p - q;
if (diff >= ((QUEUE_MAX_PAGE+1)/2))
diff -= QUEUE_MAX_PAGE+1;
else if (diff < -((QUEUE_MAX_PAGE+1)/2))
diff += QUEUE_MAX_PAGE+1;
return diff < 0;
}
/*
* Report space needed for our shared memory area
*/
Size
AsyncShmemSize(void)
{
Size size;
/* This had better match AsyncShmemInit */
size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
size = add_size(size, sizeof(AsyncQueueControl));
size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
return size;
}
/*
* Initialize our shared memory area
*/
void
AsyncShmemInit(void)
{
bool found;
int slotno;
Size size;
/*
* Create or attach to the AsyncQueueControl structure.
*
* The used entries in the backend[] array run from 1 to MaxBackends.
* sizeof(AsyncQueueControl) already includes space for the unused zero'th
* entry, but we need to add on space for the used entries.
*/
size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
size = add_size(size, sizeof(AsyncQueueControl));
asyncQueueControl = (AsyncQueueControl *)
ShmemInitStruct("Async Queue Control", size, &found);
if (!asyncQueueControl)
elog(ERROR, "out of shared memory");
if (!found)
{
/* First time through, so initialize it */
int i;
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
/*
* Set up SLRU management of the pg_notify data.
*/
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
AsyncCtlLock, "pg_notify");
/* Override default assumption that writes should be fsync'd */
AsyncCtl->do_fsync = false;
if (!found)
{
/*
* During start or reboot, clean out the pg_notify directory.
*
* Since we want to remove every file, we temporarily use
* asyncQueuePagePrecedesPhysically() and pass INT_MAX as the
* comparison value; every file in the directory should therefore
* appear to be less than that.
*/
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
(void) SlruScanDirectory(AsyncCtl, INT_MAX, true);
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
/* Now initialize page zero to empty */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
/* This write is just to verify that pg_notify/ is writable */
SimpleLruWritePage(AsyncCtl, slotno, NULL);
LWLockRelease(AsyncCtlLock);
}
}
/*
* pg_notify -
* SQL function to send a notification event
*/
Datum
pg_notify(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
if (PG_ARGISNULL(0))
channel = "";
else
channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
if (PG_ARGISNULL(1))
payload = "";
else
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
Async_Notify(channel, payload);
PG_RETURN_VOID();
}
/* /*
* Async_Notify * Async_Notify
* *
* This is executed by the SQL notify command. * This is executed by the SQL notify command.
* *
* Adds the relation to the list of pending notifies. * Adds the message to the list of pending notifies.
* Actual notification happens during transaction commit. * Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/ */
void void
Async_Notify(const char *relname) Async_Notify(const char *channel, const char *payload)
{ {
Notification *n;
MemoryContext oldcontext;
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", relname); elog(DEBUG1, "Async_Notify(%s)", channel);
/* no point in making duplicate entries in the list ... */ /* a channel name must be specified */
if (!AsyncExistsPendingNotify(relname)) if (!channel || !strlen(channel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name cannot be empty")));
if (strlen(channel) >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name too long")));
if (payload)
{ {
if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("payload string too long")));
}
/* no point in making duplicate entries in the list ... */
if (AsyncExistsPendingNotify(channel, payload))
return;
/* /*
* The name list needs to live until end of transaction, so store it * The notification list needs to live until end of transaction, so store
* in the transaction context. * it in the transaction context.
*/ */
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(CurTransactionContext); oldcontext = MemoryContextSwitchTo(CurTransactionContext);
n = (Notification *) palloc(sizeof(Notification));
n->channel = pstrdup(channel);
if (payload)
n->payload = pstrdup(payload);
else
n->payload = "";
/* /*
* Ordering of the list isn't important. We choose to put new entries * We want to preserve the order so we need to append every
* on the front, as this might make duplicate-elimination a tad faster * notification. See comments at AsyncExistsPendingNotify().
* when the same condition is signaled many times in a row.
*/ */
pendingNotifies = lcons(pstrdup(relname), pendingNotifies); pendingNotifies = lappend(pendingNotifies, n);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
}
} }
/* /*
...@@ -226,11 +605,11 @@ Async_Notify(const char *relname) ...@@ -226,11 +605,11 @@ Async_Notify(const char *relname)
* Common code for listen, unlisten, unlisten all commands. * Common code for listen, unlisten, unlisten all commands.
* *
* Adds the request to the list of pending actions. * Adds the request to the list of pending actions.
* Actual update of pg_listener happens during transaction commit. * Actual update of the listenChannels list happens during transaction
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * commit.
*/ */
static void static void
queue_listen(ListenActionKind action, const char *condname) queue_listen(ListenActionKind action, const char *channel)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
ListenAction *actrec; ListenAction *actrec;
...@@ -244,9 +623,9 @@ queue_listen(ListenActionKind action, const char *condname) ...@@ -244,9 +623,9 @@ queue_listen(ListenActionKind action, const char *condname)
oldcontext = MemoryContextSwitchTo(CurTransactionContext); oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */ /* space for terminating null is included in sizeof(ListenAction) */
actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
actrec->action = action; actrec->action = action;
strcpy(actrec->condname, condname); strcpy(actrec->channel, channel);
pendingActions = lappend(pendingActions, actrec); pendingActions = lappend(pendingActions, actrec);
...@@ -259,12 +638,12 @@ queue_listen(ListenActionKind action, const char *condname) ...@@ -259,12 +638,12 @@ queue_listen(ListenActionKind action, const char *condname)
* This is executed by the SQL listen command. * This is executed by the SQL listen command.
*/ */
void void
Async_Listen(const char *relname) Async_Listen(const char *channel)
{ {
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
queue_listen(LISTEN_LISTEN, relname); queue_listen(LISTEN_LISTEN, channel);
} }
/* /*
...@@ -273,16 +652,16 @@ Async_Listen(const char *relname) ...@@ -273,16 +652,16 @@ Async_Listen(const char *relname)
* This is executed by the SQL unlisten command. * This is executed by the SQL unlisten command.
*/ */
void void
Async_Unlisten(const char *relname) Async_Unlisten(const char *channel)
{ {
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */ /* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered) if (pendingActions == NIL && !unlistenExitRegistered)
return; return;
queue_listen(LISTEN_UNLISTEN, relname); queue_listen(LISTEN_UNLISTEN, channel);
} }
/* /*
...@@ -304,28 +683,63 @@ Async_UnlistenAll(void) ...@@ -304,28 +683,63 @@ Async_UnlistenAll(void)
} }
/* /*
* Async_UnlistenOnExit * SQL function: return a set of the channel names this backend is actively
* listening to.
* *
* Clean up the pg_listener table at backend exit. * Note: this coding relies on the fact that the listenChannels list cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
ListCell **lcp;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* allocate memory for user context */
lcp = (ListCell **) palloc(sizeof(ListCell *));
*lcp = list_head(listenChannels);
funcctx->user_fctx = (void *) lcp;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
lcp = (ListCell **) funcctx->user_fctx;
while (*lcp != NULL)
{
char *channel = (char *) lfirst(*lcp);
*lcp = lnext(*lcp);
SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
}
SRF_RETURN_DONE(funcctx);
}
/*
* Async_UnlistenOnExit
* *
* This is executed if we have done any LISTENs in this backend. * This is executed at backend exit if we have done any LISTENs in this
* It might not be necessary anymore, if the user UNLISTENed everything, * backend. It might not be necessary anymore, if the user UNLISTENed
* but we don't try to detect that case. * everything, but we don't try to detect that case.
*/ */
static void static void
Async_UnlistenOnExit(int code, Datum arg) Async_UnlistenOnExit(int code, Datum arg)
{ {
/* Exec_UnlistenAllCommit();
* We need to start/commit a transaction for the unlisten, but if there is
* already an active transaction we had better abort that one first.
* Otherwise we'd end up committing changes that probably ought to be
* discarded.
*/
AbortOutOfAnyTransaction();
/* Now we can do the unlisten */
StartTransactionCommand();
Async_UnlistenAll();
CommitTransactionCommand();
} }
/* /*
...@@ -337,71 +751,144 @@ Async_UnlistenOnExit(int code, Datum arg) ...@@ -337,71 +751,144 @@ Async_UnlistenOnExit(int code, Datum arg)
void void
AtPrepare_Notify(void) AtPrepare_Notify(void)
{ {
ListCell *p; /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
if (pendingActions || pendingNotifies)
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
if (pendingActions)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN or NOTIFY")));
}
/* We can deal with pending NOTIFY though */ /*
foreach(p, pendingNotifies) * PreCommit_Notify
*
* This is called at transaction commit, before actually committing to
* clog.
*
* If there are pending LISTEN actions, make sure we are listed in the
* shared-memory listener array. This must happen before commit to
* ensure we don't miss any notifies from transactions that commit
* just after ours.
*
* If there are outbound notify requests in the pendingNotifies list,
* add them to the global queue. We do that before commit so that
* we can still throw error if we run out of queue space.
*/
void
PreCommit_Notify(void)
{
ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
return; /* no relevant statements in this xact */
if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify");
Assert(backendHasExecutedInitialListen == false);
/* Preflight for any pending listen/unlisten actions */
foreach(p, pendingActions)
{ {
const char *relname = (const char *) lfirst(p); ListenAction *actrec = (ListenAction *) lfirst(p);
RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, switch (actrec->action)
relname, strlen(relname) + 1); {
case LISTEN_LISTEN:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
break;
case LISTEN_UNLISTEN_ALL:
/* there is no Exec_UnlistenAllPreCommit() */
break;
} }
}
/* Queue any pending notifies */
if (pendingNotifies)
{
ListCell *nextNotify;
/* /*
* We can clear the state immediately, rather than needing a separate * Make sure that we have an XID assigned to the current transaction.
* PostPrepare call, because if the transaction fails we'd just discard * GetCurrentTransactionId is cheap if we already have an XID, but
* the state anyway. * not so cheap if we don't, and we'd prefer not to do that work
* while holding AsyncQueueLock.
*/ */
ClearPendingActionsAndNotifies(); (void) GetCurrentTransactionId();
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
* order, and in particular that there are never uncommitted queue
* entries ahead of committed ones, so an uncommitted transaction
* can't block delivery of deliverable notifications.
*
* We use a heavyweight lock so that it'll automatically be released
* after either commit or abort. This also allows deadlocks to be
* detected, though really a deadlock shouldn't be possible here.
*
* The lock is on "database 0", which is pretty ugly but it doesn't
* seem worth inventing a special locktag category just for this.
* (Historical note: before PG 9.0, a similar lock on "database 0" was
* used by the flatfiles mechanism.)
*/
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
nextNotify = list_head(pendingNotifies);
while (nextNotify != NULL)
{
/*
* Add the pending notifications to the queue. We acquire and
* release AsyncQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
* A full queue is very uncommon and should really not happen,
* given that we have so much space available in the SLRU pages.
* Nevertheless we need to deal with this possibility. Note that
* when we get here we are in the process of committing our
* transaction, but we have not yet committed to clog, so at this
* point in time we can still roll the transaction back.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
LWLockRelease(AsyncQueueLock);
}
}
} }
/* /*
* AtCommit_Notify * AtCommit_Notify
* *
* This is called at transaction commit. * This is called at transaction commit, after committing to clog.
*
* If there are pending LISTEN/UNLISTEN actions, insert or delete
* tuples in pg_listener accordingly.
* *
* If there are outbound notify requests in the pendingNotifies list, * Update listenChannels and clear transaction-local state.
* scan pg_listener for matching tuples, and either signal the other
* backend or send a message to our own frontend.
*
* NOTE: we are still inside the current transaction, therefore can
* piggyback on its committing of changes.
*/ */
void void
AtCommit_Notify(void) AtCommit_Notify(void)
{ {
Relation lRel;
ListCell *p; ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
return; /* no relevant statements in this xact */
/* /*
* NOTIFY is disabled if not normal processing mode. This test used to be * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
* in xact.c, but it seems cleaner to do it here. * return as soon as possible
*/ */
if (!IsNormalProcessingMode()) if (!pendingActions && !pendingNotifies)
{
ClearPendingActionsAndNotifies();
return; return;
}
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify"); elog(DEBUG1, "AtCommit_Notify");
/* Acquire ExclusiveLock on pg_listener */
lRel = heap_open(ListenerRelationId, ExclusiveLock);
/* Perform any pending listen/unlisten actions */ /* Perform any pending listen/unlisten actions */
foreach(p, pendingActions) foreach(p, pendingActions)
{ {
...@@ -410,268 +897,609 @@ AtCommit_Notify(void) ...@@ -410,268 +897,609 @@ AtCommit_Notify(void)
switch (actrec->action) switch (actrec->action)
{ {
case LISTEN_LISTEN: case LISTEN_LISTEN:
Exec_Listen(lRel, actrec->condname); Exec_ListenCommit(actrec->channel);
break; break;
case LISTEN_UNLISTEN: case LISTEN_UNLISTEN:
Exec_Unlisten(lRel, actrec->condname); Exec_UnlistenCommit(actrec->channel);
break; break;
case LISTEN_UNLISTEN_ALL: case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAll(lRel); Exec_UnlistenAllCommit();
break; break;
} }
}
/*
* If we did an initial LISTEN, listenChannels now has the entry, so
* we no longer need or want the flag to be set.
*/
backendHasExecutedInitialListen = false;
/* And clean up */
ClearPendingActionsAndNotifies();
}
/*
* Exec_ListenPreCommit --- subroutine for PreCommit_Notify
*
* This function must make sure we are ready to catch any incoming messages.
*/
static void
Exec_ListenPreCommit(void)
{
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
*/
if (listenChannels != NIL || backendHasExecutedInitialListen)
return;
if (Trace_notify)
elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
/*
* We need this variable to detect an aborted initial LISTEN.
* In that case we would set up our pointer but not listen on any channel.
* This flag gets cleared in AtCommit_Notify or AtAbort_Notify().
*/
backendHasExecutedInitialListen = true;
/*
* Before registering, make sure we will unlisten before dying.
* (Note: this action does not get undone if we abort later.)
*/
if (!unlistenExitRegistered)
{
on_shmem_exit(Async_UnlistenOnExit, 0);
unlistenExitRegistered = true;
}
/*
* This is our first LISTEN, so establish our pointer.
*
* We set our pointer to the global tail pointer and then move it forward
* over already-committed notifications. This ensures we cannot miss any
* not-yet-committed notifications. We might get a few more but that
* doesn't hurt.
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock);
/*
* Try to move our pointer forward as far as possible. This will skip over
* already-committed notifications. Still, we could get notifications that
* have already committed before we started to LISTEN.
*
* Note that we are not yet listening on anything, so we won't deliver
* any notification to the frontend.
*
* This will also advance the global tail pointer if possible.
*/
asyncQueueReadAllNotifications();
}
/*
* Exec_ListenCommit --- subroutine for AtCommit_Notify
*
* Add the channel to the list of channels we are listening on.
*/
static void
Exec_ListenCommit(const char *channel)
{
MemoryContext oldcontext;
/* We must CCI after each action in case of conflicting actions */ /* Do nothing if we are already listening on this channel */
CommandCounterIncrement(); if (IsListeningOn(channel))
return;
/*
* Add the new channel name to listenChannels.
*
* XXX It is theoretically possible to get an out-of-memory failure here,
* which would be bad because we already committed. For the moment it
* doesn't seem worth trying to guard against that, but maybe improve this
* later.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
}
/*
* Exec_UnlistenCommit --- subroutine for AtCommit_Notify
*
* Remove the specified channel name from listenChannels.
*/
static void
Exec_UnlistenCommit(const char *channel)
{
ListCell *q;
ListCell *prev;
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
prev = NULL;
foreach(q, listenChannels)
{
char *lchan = (char *) lfirst(q);
if (strcmp(lchan, channel) == 0)
{
listenChannels = list_delete_cell(listenChannels, q, prev);
pfree(lchan);
break;
}
prev = q;
} }
/* Perform any pending notifies */ /*
if (pendingNotifies) * We do not complain about unlistening something not being listened;
Send_Notify(lRel); * should we?
*/
/* If no longer listening to anything, get out of listener array */
if (listenChannels == NIL)
asyncQueueUnregister();
}
/*
* Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
*
* Unlisten on all channels for this backend.
*/
static void
Exec_UnlistenAllCommit(void)
{
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
list_free_deep(listenChannels);
listenChannels = NIL;
asyncQueueUnregister();
}
/*
* ProcessCompletedNotifies --- send out signals and self-notifies
*
* This is called from postgres.c just before going idle at the completion
* of a transaction. If we issued any notifications in the just-completed
* transaction, send signals to other backends to process them, and also
* process the queue ourselves to send messages to our own frontend.
*
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
* AtCommit_Notify would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
* Note that we send signals and process the queue even if the transaction
* eventually aborted. This is because we need to clean out whatever got
* added to the queue.
*
* NOTE: we are outside of any transaction here.
*/
void
ProcessCompletedNotifies(void)
{
bool signalled;
/* Nothing to do if we didn't send any notifications */
if (!backendHasSentNotifications)
return;
/*
* We reset the flag immediately; otherwise, if any sort of error
* occurs below, we'd be locked up in an infinite loop, because
* control will come right back here after error cleanup.
*/
backendHasSentNotifications = false;
if (Trace_notify)
elog(DEBUG1, "ProcessCompletedNotifies");
/*
* We must run asyncQueueReadAllNotifications inside a transaction,
* else bad things happen if it gets an error.
*/
StartTransactionCommand();
/* Send signals to other backends */
signalled = SignalBackends();
if (listenChannels != NIL)
{
/* Read the queue ourselves, and send relevant stuff to the frontend */
asyncQueueReadAllNotifications();
}
else if (!signalled)
{
/* /*
* We do NOT release the lock on pg_listener here; we need to hold it * If we found no other listening backends, and we aren't listening
* until end of transaction (which is about to happen, anyway) to ensure * ourselves, then we must execute asyncQueueAdvanceTail to flush
* that notified backends see our tuple updates when they look. Else they * the queue, because ain't nobody else gonna do it. This prevents
* might disregard the signal, which would make the application programmer * queue overflow when we're sending useless notifies to nobody.
* very unhappy. Also, this prevents race conditions when we have just * (A new listener could have joined since we looked, but if so this
* inserted a listening tuple. * is harmless.)
*/ */
heap_close(lRel, NoLock); asyncQueueAdvanceTail();
}
ClearPendingActionsAndNotifies(); CommitTransactionCommand();
if (Trace_notify) /* We don't need pq_flush() here since postgres.c will do one shortly */
elog(DEBUG1, "AtCommit_Notify: done");
} }
/* /*
* Exec_Listen --- subroutine for AtCommit_Notify * Test whether we are actively listening on the given channel name.
* *
* Register the current backend as listening on the specified relation. * Note: this function is executed for every notification found in the queue.
* Perhaps it is worth further optimization, eg convert the list to a sorted
* array so we can binary-search it. In practice the list is likely to be
* fairly short, though.
*/ */
static void static bool
Exec_Listen(Relation lRel, const char *relname) IsListeningOn(const char *channel)
{ {
HeapScanDesc scan; ListCell *p;
HeapTuple tuple;
Datum values[Natts_pg_listener];
bool nulls[Natts_pg_listener];
NameData condname;
bool alreadyListener = false;
if (Trace_notify)
elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
/* Detect whether we are already listening on this relname */ foreach(p, listenChannels)
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); char *lchan = (char *) lfirst(p);
if (listener->listenerpid == MyProcPid && if (strcmp(lchan, channel) == 0)
strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) return true;
{
alreadyListener = true;
/* No need to scan the rest of the table */
break;
}
} }
heap_endscan(scan); return false;
}
if (alreadyListener)
return;
/* /*
* OK to insert a new tuple * Remove our entry from the listeners array when we are no longer listening
* on any channel. NB: must not fail if we're already not listening.
*/ */
memset(nulls, false, sizeof(nulls)); static void
asyncQueueUnregister(void)
{
bool advanceTail;
Assert(listenChannels == NIL); /* else caller error */
namestrcpy(&condname, relname); LWLockAcquire(AsyncQueueLock, LW_SHARED);
values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); /* check if entry is valid and oldest ... */
values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid); advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */ QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
LWLockRelease(AsyncQueueLock);
tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls); /* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
}
/*
* Test whether there is room to insert more notification messages.
*
* Caller must hold at least shared AsyncQueueLock.
*/
static bool
asyncQueueIsFull(void)
{
int nexthead;
int boundary;
simple_heap_insert(lRel, tuple); /*
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
* pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (compare the truncation logic in
* asyncQueueAdvanceTail).
*
* Note that this test is *not* dependent on how much space there is on
* the current head page. This is necessary because asyncQueueAddEntries
* might try to create the next head page in any case.
*/
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */
boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedesLogically(nexthead, boundary);
}
#ifdef NOT_USED /* currently there are no indexes */ /*
CatalogUpdateIndexes(lRel, tuple); * Advance the QueuePosition to the next entry, assuming that the current
#endif * entry is of length entryLength. If we jump to a new page the function
* returns true, else false.
*/
static bool
asyncQueueAdvance(QueuePosition *position, int entryLength)
{
int pageno = QUEUE_POS_PAGE(*position);
int offset = QUEUE_POS_OFFSET(*position);
bool pageJump = false;
heap_freetuple(tuple); /*
* Move to the next writing position: First jump over what we have just
* written or read.
*/
offset += entryLength;
Assert(offset <= QUEUE_PAGESIZE);
/* /*
* now that we are listening, make sure we will unlisten before dying. * In a second step check if another entry can possibly be written to the
* page. If so, stay here, we have reached the next position. If not, then
* we need to move on to the next page.
*/ */
if (!unlistenExitRegistered) if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
{ {
on_shmem_exit(Async_UnlistenOnExit, 0); pageno++;
unlistenExitRegistered = true; if (pageno > QUEUE_MAX_PAGE)
pageno = 0; /* wrap around */
offset = 0;
pageJump = true;
} }
SET_QUEUE_POS(*position, pageno, offset);
return pageJump;
} }
/* /*
* Exec_Unlisten --- subroutine for AtCommit_Notify * Fill the AsyncQueueEntry at *qe with an outbound notification message.
*
* Remove the current backend from the list of listening backends
* for the specified relation.
*/ */
static void static void
Exec_Unlisten(Relation lRel, const char *relname) asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
{ {
HeapScanDesc scan; size_t channellen = strlen(n->channel);
HeapTuple tuple; size_t payloadlen = strlen(n->payload);
int entryLength;
Assert(channellen < NAMEDATALEN);
Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
/* The terminators are already included in AsyncQueueEntryEmptySize */
entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
entryLength = QUEUEALIGN(entryLength);
qe->length = entryLength;
qe->dboid = MyDatabaseId;
qe->xid = GetCurrentTransactionId();
qe->srcPid = MyProcPid;
memcpy(qe->data, n->channel, channellen + 1);
memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
}
if (Trace_notify) /*
elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); * Add pending notifications to the queue.
*
* We go page by page here, i.e. we stop once we have to go to a new page but
* we will be called again and then fill that next page. If an entry does not
* fit into the current page, we write a dummy entry with an InvalidOid as the
* database OID in order to fill the page. So every page is always used up to
* the last byte which simplifies reading the page later.
*
* We are passed the list cell containing the next notification to write
* and return the first still-unwritten cell back. Eventually we will return
* NULL indicating all is done.
*
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
* locally in this function.
*/
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
AsyncQueueEntry qe;
int pageno;
int offset;
int slotno;
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
/* Fetch the current page */
pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
/* Note we mark the page dirty before writing in it */
AsyncCtl->shared->page_dirty[slotno] = true;
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while (nextNotify != NULL)
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); Notification *n = (Notification *) lfirst(nextNotify);
if (listener->listenerpid == MyProcPid && /* Construct a valid queue entry in local variable qe */
strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) asyncQueueNotificationToEntry(n, &qe);
offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
/* Check whether the entry really fits on the current page */
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
nextNotify = lnext(nextNotify);
}
else
{ {
/* Found the matching tuple, delete it */ /*
simple_heap_delete(lRel, &tuple->t_self); * Write a dummy entry to fill up the page. Actually readers will
* only check dboid and since it won't match any reader's database
* OID, they will ignore this entry and move on.
*/
qe.length = QUEUE_PAGESIZE - offset;
qe.dboid = InvalidOid;
qe.data[0] = '\0'; /* empty channel */
qe.data[1] = '\0'; /* empty payload */
}
/* Now copy qe into the shared buffer page */
memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
&qe,
qe.length);
/* Advance QUEUE_HEAD appropriately, and note if page is full */
if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
{
/* /*
* We assume there can be only one match, so no need to scan the * Page is full, so we're done here, but first fill the next
* rest of the table * page with zeroes. The reason to do this is to ensure that
* slru.c's idea of the head page is always the same as ours,
* which avoids boundary problems in SimpleLruTruncate. The
* test in asyncQueueIsFull() ensured that there is room to
* create this page without overrunning the queue.
*/ */
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
/* And exit the loop */
break; break;
} }
} }
heap_endscan(scan);
/* LWLockRelease(AsyncCtlLock);
* We do not complain about unlistening something not being listened;
* should we? return nextNotify;
*/
} }
/* /*
* Exec_UnlistenAll --- subroutine for AtCommit_Notify * Check whether the queue is at least half full, and emit a warning if so.
*
* This is unlikely given the size of the queue, but possible.
* The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
* *
* Update pg_listener to unlisten all relations for this backend. * Caller must hold exclusive AsyncQueueLock.
*/ */
static void static void
Exec_UnlistenAll(Relation lRel) asyncQueueFillWarning(void)
{ {
HeapScanDesc scan; int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
HeapTuple lTuple; int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
ScanKeyData key[1]; int occupied;
double fillDegree;
TimestampTz t;
if (Trace_notify) occupied = headPage - tailPage;
elog(DEBUG1, "Exec_UnlistenAll");
if (occupied == 0)
return; /* fast exit for common case */
if (occupied < 0)
{
/* head has wrapped around, tail not yet */
occupied += QUEUE_MAX_PAGE+1;
}
fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2);
if (fillDegree < 0.5)
return;
/* Find and delete all entries with my listenerPID */ t = GetCurrentTimestamp();
ScanKeyInit(&key[0],
Anum_pg_listener_listenerpid, if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
BTEqualStrategyNumber, F_INT4EQ, t, QUEUE_FULL_WARN_INTERVAL))
Int32GetDatum(MyProcPid)); {
scan = heap_beginscan(lRel, SnapshotNow, 1, key); QueuePosition min = QUEUE_HEAD;
int32 minPid = InvalidPid;
int i;
for (i = 1; i <= MaxBackends; i++)
{
if (QUEUE_BACKEND_PID(i) != InvalidPid)
{
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
minPid = QUEUE_BACKEND_PID(i);
}
}
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) ereport(WARNING,
simple_heap_delete(lRel, &lTuple->t_self); (errmsg("pg_notify queue is %.0f%% full", fillDegree * 100),
(minPid != InvalidPid ?
errdetail("PID %d is among the slowest backends.", minPid)
: 0),
(minPid != InvalidPid ?
errhint("Cleanup can only proceed if this backend ends its current transaction.")
: 0)));
heap_endscan(scan); asyncQueueControl->lastQueueFillWarn = t;
}
} }
/* /*
* Send_Notify --- subroutine for AtCommit_Notify * Send signals to all listening backends (except our own).
* *
* Scan pg_listener for tuples matching our pending notifies, and * Returns true if we sent at least one signal.
* either signal the other backend or send a message to our own frontend. *
* Since we need EXCLUSIVE lock anyway we also check the position of the other
* backends and in case one is already up-to-date we don't signal it.
* This can happen if concurrent notifying transactions have sent a signal and
* the signaled backend has read the other notifications and ours in the same
* step.
*
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/ */
static void static bool
Send_Notify(Relation lRel) SignalBackends(void)
{ {
TupleDesc tdesc = RelationGetDescr(lRel); bool signalled = false;
HeapScanDesc scan; int32 *pids;
HeapTuple lTuple, BackendId *ids;
rTuple; int count;
Datum value[Natts_pg_listener]; int i;
bool repl[Natts_pg_listener], int32 pid;
nulls[Natts_pg_listener];
/* preset data to update notify column to MyProcPid */
memset(nulls, false, sizeof(nulls));
memset(repl, false, sizeof(repl));
repl[Anum_pg_listener_notification - 1] = true;
memset(value, 0, sizeof(value));
value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
int32 listenerPID = listener->listenerpid;
if (!AsyncExistsPendingNotify(relname))
continue;
if (listenerPID == MyProcPid)
{
/* /*
* Self-notify: no need to bother with table update. Indeed, we * Identify all backends that are listening and not already up-to-date.
* *must not* clear the notification field in this path, or we * We don't want to send signals while holding the AsyncQueueLock, so
* could lose an outside notify, which'd be bad for applications * we just build a list of target PIDs.
* that ignore self-notify messages. *
* XXX in principle these pallocs could fail, which would be bad.
* Maybe preallocate the arrays? But in practice this is only run
* in trivial transactions, so there should surely be space available.
*/ */
if (Trace_notify) pids = (int32 *) palloc(MaxBackends * sizeof(int32));
elog(DEBUG1, "AtCommit_Notify: notifying self"); ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
count = 0;
NotifyMyFrontEnd(relname, listenerPID); LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
} for (i = 1; i <= MaxBackends; i++)
else
{ {
if (Trace_notify) pid = QUEUE_BACKEND_PID(i);
elog(DEBUG1, "AtCommit_Notify: notifying pid %d", if (pid != InvalidPid && pid != MyProcPid)
listenerPID); {
QueuePosition pos = QUEUE_BACKEND_POS(i);
/* if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
* If someone has already notified this listener, we don't bother
* modifying the table, but we do still send a NOTIFY_INTERRUPT
* signal, just in case that backend missed the earlier signal for
* some reason. It's OK to send the signal first, because the
* other guy can't read pg_listener until we unlock it.
*
* Note: we don't have the other guy's BackendId available, so
* this will incur a search of the ProcSignal table. That's
* probably not worth worrying about.
*/
if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
InvalidBackendId) < 0)
{ {
/* pids[count] = pid;
* Get rid of pg_listener entry if it refers to a PID that no ids[count] = i;
* longer exists. Presumably, that backend crashed without count++;
* deleting its pg_listener entries. This code used to only
* delete the entry if errno==ESRCH, but as far as I can see
* we should just do it for any failure (certainly at least
* for EPERM too...)
*/
simple_heap_delete(lRel, &lTuple->t_self);
} }
else if (listener->notification == 0)
{
/* Rewrite the tuple with my PID in notification column */
rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, rTuple);
#endif
} }
} }
LWLockRelease(AsyncQueueLock);
/* Now send signals */
for (i = 0; i < count; i++)
{
pid = pids[i];
/*
* Note: assuming things aren't broken, a signal failure here could
* only occur if the target backend exited since we released
* AsyncQueueLock; which is unlikely but certainly possible.
* So we just log a low-level debug message if it happens.
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
else
signalled = true;
} }
heap_endscan(scan); pfree(pids);
pfree(ids);
return signalled;
} }
/* /*
...@@ -685,6 +1513,24 @@ Send_Notify(Relation lRel) ...@@ -685,6 +1513,24 @@ Send_Notify(Relation lRel)
void void
AtAbort_Notify(void) AtAbort_Notify(void)
{ {
/*
* If we LISTEN but then roll back the transaction we have set our pointer
* but have not made any entry in listenChannels. In that case, remove
* our pointer again.
*/
if (backendHasExecutedInitialListen)
{
/*
* Checking listenChannels should be redundant but it can't hurt doing
* it for safety reasons.
*/
if (listenChannels == NIL)
asyncQueueUnregister();
backendHasExecutedInitialListen = false;
}
/* And clean up */
ClearPendingActionsAndNotifies(); ClearPendingActionsAndNotifies();
} }
...@@ -939,31 +1785,299 @@ DisableNotifyInterrupt(void) ...@@ -939,31 +1785,299 @@ DisableNotifyInterrupt(void)
return result; return result;
} }
/*
* Read all pending notifications from the queue, and deliver appropriate
* ones to my frontend. Stop when we reach queue head or an uncommitted
* notification.
*/
static void
asyncQueueReadAllNotifications(void)
{
QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
union {
char buf[QUEUE_PAGESIZE];
AsyncQueueEntry align;
} page_buffer;
/* Fetch current state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
LWLockRelease(AsyncQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
/* Nothing to do, we have read all notifications already. */
return;
}
/*----------
* Note that we deliver everything that we see in the queue and that
* matches our _current_ listening state.
* Especially we do not take into account different commit times.
* Consider the following example:
*
* Backend 1: Backend 2:
*
* transaction starts
* NOTIFY foo;
* commit starts
* transaction starts
* LISTEN foo;
* commit starts
* commit to clog
* commit to clog
*
* It could happen that backend 2 sees the notification from backend 1 in
* the queue. Even though the notifying transaction committed before
* the listening transaction, we still deliver the notification.
*
* The idea is that an additional notification does not do any harm, we
* just need to make sure that we do not miss a notification.
*
* It is possible that we fail while trying to send a message to our
* frontend (for example, because of encoding conversion failure).
* If that happens it is critical that we not try to send the same
* message over and over again. Therefore, we place a PG_TRY block
* here that will forcibly advance our backend position before we lose
* control to an error. (We could alternatively retake AsyncQueueLock
* and move the position before handling each individual message, but
* that seems like too much lock traffic.)
*----------
*/
PG_TRY();
{
bool reachedStop;
do
{
int curpage = QUEUE_POS_PAGE(pos);
int curoffset = QUEUE_POS_OFFSET(pos);
int slotno;
int copysize;
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the AsyncCtlLock while we are examining the entries and
* possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
InvalidTransactionId);
if (curpage == QUEUE_POS_PAGE(head))
{
/* we only want to read as far as head */
copysize = QUEUE_POS_OFFSET(head) - curoffset;
if (copysize < 0)
copysize = 0; /* just for safety */
}
else
{
/* fetch all the rest of the page */
copysize = QUEUE_PAGESIZE - curoffset;
}
memcpy(page_buffer.buf + curoffset,
AsyncCtl->shared->page_buffer[slotno] + curoffset,
copysize);
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease(AsyncCtlLock);
/*
* Process messages up to the stop position, end of page, or an
* uncommitted message.
*
* Our stop position is what we found to be the head's position
* when we entered this function. It might have changed
* already. But if it has, we will receive (or have already
* received and queued) another signal and come here again.
*
* We are not holding AsyncQueueLock here! The queue can only
* extend beyond the head pointer (see above) and we leave our
* backend's pointer where it is so nobody will truncate or
* rewrite pages under us. Especially we don't want to hold a lock
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf);
} while (!reachedStop);
}
PG_CATCH();
{
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
PG_RE_THROW();
}
PG_END_TRY();
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
}
/*
* Fetch notifications from the shared queue, beginning at position current,
* and deliver relevant ones to my frontend.
*
* The current page must have been fetched into page_buffer from shared
* memory. (We could access the page right in shared memory, but that
* would imply holding the AsyncCtlLock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* uncommitted transaction, or reach the end of the page.
*
* The function returns true once we have reached the stop position or an
* uncommitted notification, and false if we have finished with the page.
* In other words: once it returns true there is no need to look further.
*/
static bool
asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
char *page_buffer)
{
bool reachedStop = false;
bool reachedEndOfPage;
AsyncQueueEntry *qe;
do
{
if (QUEUE_POS_EQUAL(*current, stop))
break;
qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current));
/*
* Advance *current over this message, possibly to the next page.
* As noted in the comments for asyncQueueReadAllNotifications, we
* must do this before possibly failing while processing the message.
*/
reachedEndOfPage = asyncQueueAdvance(current, qe->length);
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
if (TransactionIdDidCommit(qe->xid))
{
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
if (IsListeningOn(channel))
{
/* payload follows channel name */
char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
}
}
else if (TransactionIdDidAbort(qe->xid))
{
/*
* If the source transaction aborted, we just ignore its
* notifications.
*/
}
else
{
/*
* The transaction has neither committed nor aborted so far,
* so we can't process its message yet. Break out of the loop.
*/
reachedStop = true;
break;
}
}
/* Loop back if we're not at end of page */
} while (!reachedEndOfPage);
if (QUEUE_POS_EQUAL(*current, stop))
reachedStop = true;
return reachedStop;
}
/*
* Advance the shared queue tail variable to the minimum of all the
* per-backend tail pointers. Truncate pg_notify space if possible.
*/
static void
asyncQueueAdvanceTail(void)
{
QueuePosition min;
int i;
int oldtailpage;
int newtailpage;
int boundary;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (i = 1; i <= MaxBackends; i++)
{
if (QUEUE_BACKEND_PID(i) != InvalidPid)
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
LWLockRelease(AsyncQueueLock);
/*
* We can truncate something if the global tail advanced across an SLRU
* segment boundary.
*
* XXX it might be better to truncate only once every several segments,
* to reduce the number of directory scans.
*/
newtailpage = QUEUE_POS_PAGE(min);
boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
{
/*
* SimpleLruTruncate() will ask for AsyncCtlLock but will also
* release the lock again.
*/
SimpleLruTruncate(AsyncCtl, newtailpage);
}
}
/* /*
* ProcessIncomingNotify * ProcessIncomingNotify
* *
* Deal with arriving NOTIFYs from other backends. * Deal with arriving NOTIFYs from other backends.
* This is called either directly from the PROCSIG_NOTIFY_INTERRUPT * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
* signal handler, or the next time control reaches the outer idle loop. * signal handler, or the next time control reaches the outer idle loop.
* Scan pg_listener for arriving notifies, report them to my front end, * Scan the queue for arriving notifications and report them to my front
* and clear the notification field in pg_listener until next time. * end.
* *
* NOTE: since we are outside any transaction, we must create our own. * NOTE: since we are outside any transaction, we must create our own.
*/ */
static void static void
ProcessIncomingNotify(void) ProcessIncomingNotify(void)
{ {
Relation lRel;
TupleDesc tdesc;
ScanKeyData key[1];
HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
Datum value[Natts_pg_listener];
bool repl[Natts_pg_listener],
nulls[Natts_pg_listener];
bool catchup_enabled; bool catchup_enabled;
/* Do nothing if we aren't actively listening */
if (listenChannels == NIL)
return;
/* Must prevent catchup interrupt while I am running */ /* Must prevent catchup interrupt while I am running */
catchup_enabled = DisableCatchupInterrupt(); catchup_enabled = DisableCatchupInterrupt();
...@@ -974,62 +2088,13 @@ ProcessIncomingNotify(void) ...@@ -974,62 +2088,13 @@ ProcessIncomingNotify(void)
notifyInterruptOccurred = 0; notifyInterruptOccurred = 0;
StartTransactionCommand();
lRel = heap_open(ListenerRelationId, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
/* Scan only entries with my listenerPID */
ScanKeyInit(&key[0],
Anum_pg_listener_listenerpid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(MyProcPid));
scan = heap_beginscan(lRel, SnapshotNow, 1, key);
/* Prepare data for rewriting 0 into notification field */
memset(nulls, false, sizeof(nulls));
memset(repl, false, sizeof(repl));
repl[Anum_pg_listener_notification - 1] = true;
memset(value, 0, sizeof(value));
value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
int32 sourcePID = listener->notification;
if (sourcePID != 0)
{
/* Notify the frontend */
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
relname, (int) sourcePID);
NotifyMyFrontEnd(relname, sourcePID);
/* /*
* Rewrite the tuple with 0 in notification column. * We must run asyncQueueReadAllNotifications inside a transaction,
* else bad things happen if it gets an error.
*/ */
rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); StartTransactionCommand();
simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, rTuple);
#endif
}
}
heap_endscan(scan);
/* asyncQueueReadAllNotifications();
* We do NOT release the lock on pg_listener here; we need to hold it
* until end of transaction (which is about to happen, anyway) to ensure
* that other backends see our tuple updates when they look. Otherwise, a
* transaction started after this one might mistakenly think it doesn't
* need to send this backend a new NOTIFY.
*/
heap_close(lRel, NoLock);
CommitTransactionCommand(); CommitTransactionCommand();
...@@ -1051,20 +2116,17 @@ ProcessIncomingNotify(void) ...@@ -1051,20 +2116,17 @@ ProcessIncomingNotify(void)
* Send NOTIFY message to my front end. * Send NOTIFY message to my front end.
*/ */
static void static void
NotifyMyFrontEnd(char *relname, int32 listenerPID) NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{ {
if (whereToSendOutput == DestRemote) if (whereToSendOutput == DestRemote)
{ {
StringInfoData buf; StringInfoData buf;
pq_beginmessage(&buf, 'A'); pq_beginmessage(&buf, 'A');
pq_sendint(&buf, listenerPID, sizeof(int32)); pq_sendint(&buf, srcPid, sizeof(int32));
pq_sendstring(&buf, relname); pq_sendstring(&buf, channel);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{ pq_sendstring(&buf, payload);
/* XXX Add parameter string here later */
pq_sendstring(&buf, "");
}
pq_endmessage(&buf); pq_endmessage(&buf);
/* /*
...@@ -1074,20 +2136,51 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID) ...@@ -1074,20 +2136,51 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID)
*/ */
} }
else else
elog(INFO, "NOTIFY for %s", relname); elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
} }
/* Does pendingNotifies include the given relname? */ /* Does pendingNotifies include the given channel/payload? */
static bool static bool
AsyncExistsPendingNotify(const char *relname) AsyncExistsPendingNotify(const char *channel, const char *payload)
{ {
ListCell *p; ListCell *p;
Notification *n;
if (pendingNotifies == NIL)
return false;
if (payload == NULL)
payload = "";
/*----------
* We need to append new elements to the end of the list in order to keep
* the order. However, on the other hand we'd like to check the list
* backwards in order to make duplicate-elimination a tad faster when the
* same condition is signaled many times in a row. So as a compromise we
* check the tail element first which we can access directly. If this
* doesn't match, we check the whole list.
*
* As we are not checking our parents' lists, we can still get duplicates
* in combination with subtransactions, like in:
*
* begin;
* notify foo '1';
* savepoint foo;
* notify foo '1';
* commit;
*----------
*/
n = (Notification *) llast(pendingNotifies);
if (strcmp(n->channel, channel) == 0 &&
strcmp(n->payload, payload) == 0)
return true;
foreach(p, pendingNotifies) foreach(p, pendingNotifies)
{ {
const char *prelname = (const char *) lfirst(p); n = (Notification *) lfirst(p);
if (strcmp(prelname, relname) == 0) if (strcmp(n->channel, channel) == 0 &&
strcmp(n->payload, payload) == 0)
return true; return true;
} }
...@@ -1108,21 +2201,3 @@ ClearPendingActionsAndNotifies(void) ...@@ -1108,21 +2201,3 @@ ClearPendingActionsAndNotifies(void)
pendingActions = NIL; pendingActions = NIL;
pendingNotifies = NIL; pendingNotifies = NIL;
} }
/*
* 2PC processing routine for COMMIT PREPARED case.
*
* (We don't have to do anything for ROLLBACK PREPARED.)
*/
void
notify_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
/*
* Set up to issue the NOTIFY at the end of my own current transaction.
* (XXX this has some issues if my own transaction later rolls back, or if
* there is any significant delay before I commit. OK for now because we
* disallow COMMIT PREPARED inside a transaction block.)
*/
Async_Notify((char *) recdata);
}
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.461 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.462 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -2777,6 +2777,7 @@ _copyNotifyStmt(NotifyStmt *from) ...@@ -2777,6 +2777,7 @@ _copyNotifyStmt(NotifyStmt *from)
NotifyStmt *newnode = makeNode(NotifyStmt); NotifyStmt *newnode = makeNode(NotifyStmt);
COPY_STRING_FIELD(conditionname); COPY_STRING_FIELD(conditionname);
COPY_STRING_FIELD(payload);
return newnode; return newnode;
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1325,6 +1325,7 @@ static bool ...@@ -1325,6 +1325,7 @@ static bool
_equalNotifyStmt(NotifyStmt *a, NotifyStmt *b) _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
{ {
COMPARE_STRING_FIELD(conditionname); COMPARE_STRING_FIELD(conditionname);
COMPARE_STRING_FIELD(payload);
return true; return true;
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $
* *
* NOTES * NOTES
* Every node type that can appear in stored rules' parsetrees *must* * Every node type that can appear in stored rules' parsetrees *must*
...@@ -1820,6 +1820,7 @@ _outNotifyStmt(StringInfo str, NotifyStmt *node) ...@@ -1820,6 +1820,7 @@ _outNotifyStmt(StringInfo str, NotifyStmt *node)
WRITE_NODE_TYPE("NOTIFY"); WRITE_NODE_TYPE("NOTIFY");
WRITE_STRING_FIELD(conditionname); WRITE_STRING_FIELD(conditionname);
WRITE_STRING_FIELD(payload);
} }
static void static void
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.231 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.232 2010/02/16 22:34:43 tgl Exp $
* *
* NOTES * NOTES
* Path and Plan nodes do not have any readfuncs support, because we * Path and Plan nodes do not have any readfuncs support, because we
...@@ -231,6 +231,7 @@ _readNotifyStmt(void) ...@@ -231,6 +231,7 @@ _readNotifyStmt(void)
READ_LOCALS(NotifyStmt); READ_LOCALS(NotifyStmt);
READ_STRING_FIELD(conditionname); READ_STRING_FIELD(conditionname);
READ_STRING_FIELD(payload);
READ_DONE(); READ_DONE();
} }
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.708 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.709 2010/02/16 22:34:49 tgl Exp $
* *
* HISTORY * HISTORY
* AUTHOR DATE MAJOR EVENT * AUTHOR DATE MAJOR EVENT
...@@ -400,7 +400,7 @@ static TypeName *TableFuncTypeName(List *columns); ...@@ -400,7 +400,7 @@ static TypeName *TableFuncTypeName(List *columns);
%type <ival> Iconst SignedIconst %type <ival> Iconst SignedIconst
%type <list> Iconst_list %type <list> Iconst_list
%type <str> Sconst comment_text %type <str> Sconst comment_text notify_payload
%type <str> RoleId opt_granted_by opt_boolean ColId_or_Sconst %type <str> RoleId opt_granted_by opt_boolean ColId_or_Sconst
%type <list> var_list %type <list> var_list
%type <str> ColId ColLabel var_name type_function_name param_name %type <str> ColId ColLabel var_name type_function_name param_name
...@@ -6123,14 +6123,20 @@ DropRuleStmt: ...@@ -6123,14 +6123,20 @@ DropRuleStmt:
* *
*****************************************************************************/ *****************************************************************************/
NotifyStmt: NOTIFY ColId NotifyStmt: NOTIFY ColId notify_payload
{ {
NotifyStmt *n = makeNode(NotifyStmt); NotifyStmt *n = makeNode(NotifyStmt);
n->conditionname = $2; n->conditionname = $2;
n->payload = $3;
$$ = (Node *)n; $$ = (Node *)n;
} }
; ;
notify_payload:
',' Sconst { $$ = $2; }
| /*EMPTY*/ { $$ = NULL; }
;
ListenStmt: LISTEN ColId ListenStmt: LISTEN ColId
{ {
ListenStmt *n = makeNode(ListenStmt); ListenStmt *n = makeNode(ListenStmt);
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.103 2010/01/15 09:19:03 heikki Exp $ * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.104 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "access/nbtree.h" #include "access/nbtree.h"
#include "access/subtrans.h" #include "access/subtrans.h"
#include "access/twophase.h" #include "access/twophase.h"
#include "commands/async.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "postmaster/autovacuum.h" #include "postmaster/autovacuum.h"
...@@ -122,6 +123,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ...@@ -122,6 +123,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, WalRcvShmemSize()); size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize()); size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize()); size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
#ifdef EXEC_BACKEND #ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize()); size = add_size(size, ShmemBackendArraySize());
#endif #endif
...@@ -225,6 +227,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ...@@ -225,6 +227,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
*/ */
BTreeShmemInit(); BTreeShmemInit();
SyncScanShmemInit(); SyncScanShmemInit();
AsyncShmemInit();
#ifdef EXEC_BACKEND #ifdef EXEC_BACKEND
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.55 2010/01/02 16:57:52 momjian Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.56 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "access/clog.h" #include "access/clog.h"
#include "access/multixact.h" #include "access/multixact.h"
#include "access/subtrans.h" #include "access/subtrans.h"
#include "commands/async.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pg_trace.h" #include "pg_trace.h"
#include "storage/ipc.h" #include "storage/ipc.h"
...@@ -174,6 +175,9 @@ NumLWLocks(void) ...@@ -174,6 +175,9 @@ NumLWLocks(void)
/* multixact.c needs two SLRU areas */ /* multixact.c needs two SLRU areas */
numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
/* async.c needs one per Async buffer */
numLocks += NUM_ASYNC_BUFFERS;
/* /*
* Add any requested by loadable modules; for backwards-compatibility * Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.589 2010/02/16 20:15:14 momjian Exp $ * $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.590 2010/02/16 22:34:50 tgl Exp $
* *
* NOTES * NOTES
* this is the "main" module of the postgres backend and * this is the "main" module of the postgres backend and
...@@ -3779,7 +3779,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3779,7 +3779,8 @@ PostgresMain(int argc, char *argv[], const char *username)
* collector, and to update the PS stats display. We avoid doing * collector, and to update the PS stats display. We avoid doing
* those every time through the message loop because it'd slow down * those every time through the message loop because it'd slow down
* processing of batched messages, and because we don't want to report * processing of batched messages, and because we don't want to report
* uncommitted updates (that confuses autovacuum). * uncommitted updates (that confuses autovacuum). The notification
* processor wants a call too, if we are not in a transaction block.
*/ */
if (send_ready_for_query) if (send_ready_for_query)
{ {
...@@ -3795,6 +3796,7 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3795,6 +3796,7 @@ PostgresMain(int argc, char *argv[], const char *username)
} }
else else
{ {
ProcessCompletedNotifies();
pgstat_report_stat(false); pgstat_report_stat(false);
set_ps_display("idle", false); set_ps_display("idle", false);
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.332 2010/02/14 18:42:15 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.333 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -926,17 +926,17 @@ standard_ProcessUtility(Node *parsetree, ...@@ -926,17 +926,17 @@ standard_ProcessUtility(Node *parsetree,
case T_NotifyStmt: case T_NotifyStmt:
{ {
NotifyStmt *stmt = (NotifyStmt *) parsetree; NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery();
Async_Notify(stmt->conditionname); PreventCommandDuringRecovery();
Async_Notify(stmt->conditionname, stmt->payload);
} }
break; break;
case T_ListenStmt: case T_ListenStmt:
{ {
ListenStmt *stmt = (ListenStmt *) parsetree; ListenStmt *stmt = (ListenStmt *) parsetree;
PreventCommandDuringRecovery();
PreventCommandDuringRecovery();
CheckRestrictedOperation("LISTEN"); CheckRestrictedOperation("LISTEN");
Async_Listen(stmt->conditionname); Async_Listen(stmt->conditionname);
} }
...@@ -945,8 +945,8 @@ standard_ProcessUtility(Node *parsetree, ...@@ -945,8 +945,8 @@ standard_ProcessUtility(Node *parsetree,
case T_UnlistenStmt: case T_UnlistenStmt:
{ {
UnlistenStmt *stmt = (UnlistenStmt *) parsetree; UnlistenStmt *stmt = (UnlistenStmt *) parsetree;
PreventCommandDuringRecovery();
PreventCommandDuringRecovery();
CheckRestrictedOperation("UNLISTEN"); CheckRestrictedOperation("UNLISTEN");
if (stmt->conditionname) if (stmt->conditionname)
Async_Unlisten(stmt->conditionname); Async_Unlisten(stmt->conditionname);
...@@ -1105,8 +1105,8 @@ standard_ProcessUtility(Node *parsetree, ...@@ -1105,8 +1105,8 @@ standard_ProcessUtility(Node *parsetree,
case T_ReindexStmt: case T_ReindexStmt:
{ {
ReindexStmt *stmt = (ReindexStmt *) parsetree; ReindexStmt *stmt = (ReindexStmt *) parsetree;
PreventCommandDuringRecovery();
PreventCommandDuringRecovery();
switch (stmt->kind) switch (stmt->kind)
{ {
case OBJECT_INDEX: case OBJECT_INDEX:
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.322 2010/02/14 18:42:16 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.323 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -3465,6 +3465,11 @@ get_utility_query_def(Query *query, deparse_context *context) ...@@ -3465,6 +3465,11 @@ get_utility_query_def(Query *query, deparse_context *context)
0, PRETTYINDENT_STD, 1); 0, PRETTYINDENT_STD, 1);
appendStringInfo(buf, "NOTIFY %s", appendStringInfo(buf, "NOTIFY %s",
quote_identifier(stmt->conditionname)); quote_identifier(stmt->conditionname));
if (stmt->payload)
{
appendStringInfoString(buf, ", ");
simple_quote_literal(buf, stmt->payload);
}
} }
else else
{ {
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* Portions taken from FreeBSD. * Portions taken from FreeBSD.
* *
* $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.184 2010/01/26 16:18:12 tgl Exp $ * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.185 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -2458,6 +2458,7 @@ main(int argc, char *argv[]) ...@@ -2458,6 +2458,7 @@ main(int argc, char *argv[])
"pg_xlog", "pg_xlog",
"pg_xlog/archive_status", "pg_xlog/archive_status",
"pg_clog", "pg_clog",
"pg_notify",
"pg_subtrans", "pg_subtrans",
"pg_twophase", "pg_twophase",
"pg_multixact/members", "pg_multixact/members",
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* *
* Copyright (c) 2000-2010, PostgreSQL Global Development Group * Copyright (c) 2000-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.143 2010/01/02 16:57:59 momjian Exp $ * $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.144 2010/02/16 22:34:50 tgl Exp $
*/ */
#include "postgres_fe.h" #include "postgres_fe.h"
#include "common.h" #include "common.h"
...@@ -555,6 +555,11 @@ PrintNotifications(void) ...@@ -555,6 +555,11 @@ PrintNotifications(void)
while ((notify = PQnotifies(pset.db))) while ((notify = PQnotifies(pset.db)))
{ {
/* for backward compatibility, only show payload if nonempty */
if (notify->extra[0])
fprintf(pset.queryFout, _("Asynchronous notification \"%s\" with payload \"%s\" received from server process with PID %d.\n"),
notify->relname, notify->extra, notify->be_pid);
else
fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
notify->relname, notify->be_pid); notify->relname, notify->be_pid);
fflush(pset.queryFout); fflush(pset.queryFout);
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* *
* Copyright (c) 2000-2010, PostgreSQL Global Development Group * Copyright (c) 2000-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.193 2010/02/15 02:55:01 itagaki Exp $ * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.194 2010/02/16 22:34:50 tgl Exp $
*/ */
/*---------------------------------------------------------------------- /*----------------------------------------------------------------------
...@@ -1864,7 +1864,7 @@ psql_completion(char *text, int start, int end) ...@@ -1864,7 +1864,7 @@ psql_completion(char *text, int start, int end)
/* NOTIFY */ /* NOTIFY */
else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0) else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0)
COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s'"); COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s'");
/* OPTIONS */ /* OPTIONS */
else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0) else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0)
...@@ -2105,7 +2105,7 @@ psql_completion(char *text, int start, int end) ...@@ -2105,7 +2105,7 @@ psql_completion(char *text, int start, int end)
/* UNLISTEN */ /* UNLISTEN */
else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0) else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'"); COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'");
/* UPDATE */ /* UPDATE */
/* If prev. word is UPDATE suggest a list of tables */ /* If prev. word is UPDATE suggest a list of tables */
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/slru.h,v 1.25 2010/01/02 16:58:00 momjian Exp $ * $PostgreSQL: pgsql/src/include/access/slru.h,v 1.26 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -17,6 +17,25 @@ ...@@ -17,6 +17,25 @@
#include "storage/lwlock.h" #include "storage/lwlock.h"
/*
* Define SLRU segment size. A page is the same BLCKSZ as is used everywhere
* else in Postgres. The segment size can be chosen somewhat arbitrarily;
* we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
* or 64K transactions for SUBTRANS.
*
* Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
* page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
* xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
* 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need
* take no explicit notice of that fact in slru.c, except when comparing
* segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
*
* Note: slru.c currently assumes that segment file names will be four hex
* digits. This sets a lower bound on the segment size (64K transactions
* for 32-bit TransactionIds).
*/
#define SLRU_PAGES_PER_SEGMENT 32
/* /*
* Page status codes. Note that these do not include the "dirty" bit. * Page status codes. Note that these do not include the "dirty" bit.
* page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states; * page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states;
...@@ -55,8 +74,8 @@ typedef struct SlruSharedData ...@@ -55,8 +74,8 @@ typedef struct SlruSharedData
/* /*
* Optional array of WAL flush LSNs associated with entries in the SLRU * Optional array of WAL flush LSNs associated with entries in the SLRU
* pages. If not zero/NULL, we must flush WAL before writing pages (true * pages. If not zero/NULL, we must flush WAL before writing pages (true
* for pg_clog, false for multixact and pg_subtrans). group_lsn[] has * for pg_clog, false for multixact, pg_subtrans, pg_notify). group_lsn[]
* lsn_groups_per_page entries per buffer slot, each containing the * has lsn_groups_per_page entries per buffer slot, each containing the
* highest LSN known for a contiguous group of SLRU entries on that slot's * highest LSN known for a contiguous group of SLRU entries on that slot's
* page. * page.
*/ */
...@@ -94,7 +113,7 @@ typedef struct SlruCtlData ...@@ -94,7 +113,7 @@ typedef struct SlruCtlData
/* /*
* This flag tells whether to fsync writes (true for pg_clog and multixact * This flag tells whether to fsync writes (true for pg_clog and multixact
* stuff, false for pg_subtrans). * stuff, false for pg_subtrans and pg_notify).
*/ */
bool do_fsync; bool do_fsync;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.11 2010/01/02 16:58:00 momjian Exp $ * $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.12 2010/02/16 22:34:50 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,9 +23,8 @@ typedef uint8 TwoPhaseRmgrId; ...@@ -23,9 +23,8 @@ typedef uint8 TwoPhaseRmgrId;
*/ */
#define TWOPHASE_RM_END_ID 0 #define TWOPHASE_RM_END_ID 0
#define TWOPHASE_RM_LOCK_ID 1 #define TWOPHASE_RM_LOCK_ID 1
#define TWOPHASE_RM_NOTIFY_ID 2 #define TWOPHASE_RM_PGSTAT_ID 2
#define TWOPHASE_RM_PGSTAT_ID 3 #define TWOPHASE_RM_MULTIXACT_ID 3
#define TWOPHASE_RM_MULTIXACT_ID 4
#define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID
extern const TwoPhaseCallback twophase_recover_callbacks[]; extern const TwoPhaseCallback twophase_recover_callbacks[];
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.584 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.585 2010/02/16 22:34:54 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201002121 #define CATALOG_VERSION_NO 201002161
#endif #endif
/*-------------------------------------------------------------------------
*
* pg_listener.h
* Asynchronous notification
*
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/catalog/pg_listener.h,v 1.28 2010/01/05 01:06:56 tgl Exp $
*
* NOTES
* the genbki.pl script reads this file and generates .bki
* information from the DATA() statements.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_LISTENER_H
#define PG_LISTENER_H
#include "catalog/genbki.h"
/* ----------------------------------------------------------------
* pg_listener definition.
*
* cpp turns this into typedef struct FormData_pg_listener
* ----------------------------------------------------------------
*/
#define ListenerRelationId 2614
CATALOG(pg_listener,2614) BKI_WITHOUT_OIDS
{
NameData relname;
int4 listenerpid;
int4 notification;
} FormData_pg_listener;
/* ----------------
* Form_pg_listener corresponds to a pointer to a tuple with
* the format of pg_listener relation.
* ----------------
*/
typedef FormData_pg_listener *Form_pg_listener;
/* ----------------
* compiler constants for pg_listener
* ----------------
*/
#define Natts_pg_listener 3
#define Anum_pg_listener_relname 1
#define Anum_pg_listener_listenerpid 2
#define Anum_pg_listener_notification 3
/* ----------------
* initial contents of pg_listener are NOTHING.
* ----------------
*/
#endif /* PG_LISTENER_H */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.568 2010/02/07 20:48:11 tgl Exp $ * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.569 2010/02/16 22:34:56 tgl Exp $
* *
* NOTES * NOTES
* The script catalog/genbki.pl reads this file and generates .bki * The script catalog/genbki.pl reads this file and generates .bki
...@@ -4133,6 +4133,10 @@ DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 f f f t t s ...@@ -4133,6 +4133,10 @@ DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 f f f t t s
DESCR("get the available time zone names"); DESCR("get the available time zone names");
DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
DESCR("trigger description with pretty-print option"); DESCR("trigger description with pretty-print option");
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
/* non-persistent series generator */ /* non-persistent series generator */
DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ )); DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));
......
...@@ -6,28 +6,44 @@ ...@@ -6,28 +6,44 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/commands/async.h,v 1.39 2010/01/02 16:58:03 momjian Exp $ * $PostgreSQL: pgsql/src/include/commands/async.h,v 1.40 2010/02/16 22:34:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef ASYNC_H #ifndef ASYNC_H
#define ASYNC_H #define ASYNC_H
#include "fmgr.h"
/*
* The number of SLRU page buffers we use for the notification queue.
*/
#define NUM_ASYNC_BUFFERS 8
extern bool Trace_notify; extern bool Trace_notify;
extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
/* notify-related SQL statements */ /* notify-related SQL statements */
extern void Async_Notify(const char *relname); extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *relname); extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *relname); extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void); extern void Async_UnlistenAll(void);
/* notify-related SQL functions */
extern Datum pg_listening_channels(PG_FUNCTION_ARGS);
extern Datum pg_notify(PG_FUNCTION_ARGS);
/* perform (or cancel) outbound notify processing at transaction commit */ /* perform (or cancel) outbound notify processing at transaction commit */
extern void PreCommit_Notify(void);
extern void AtCommit_Notify(void); extern void AtCommit_Notify(void);
extern void AtAbort_Notify(void); extern void AtAbort_Notify(void);
extern void AtSubStart_Notify(void); extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void); extern void AtSubCommit_Notify(void);
extern void AtSubAbort_Notify(void); extern void AtSubAbort_Notify(void);
extern void AtPrepare_Notify(void); extern void AtPrepare_Notify(void);
extern void ProcessCompletedNotifies(void);
/* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */ /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */
extern void HandleNotifyInterrupt(void); extern void HandleNotifyInterrupt(void);
...@@ -40,7 +56,4 @@ extern void HandleNotifyInterrupt(void); ...@@ -40,7 +56,4 @@ extern void HandleNotifyInterrupt(void);
extern void EnableNotifyInterrupt(void); extern void EnableNotifyInterrupt(void);
extern bool DisableNotifyInterrupt(void); extern bool DisableNotifyInterrupt(void);
extern void notify_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len);
#endif /* ASYNC_H */ #endif /* ASYNC_H */
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.429 2010/02/12 17:33:20 tgl Exp $ * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.430 2010/02/16 22:34:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -2097,6 +2097,7 @@ typedef struct NotifyStmt ...@@ -2097,6 +2097,7 @@ typedef struct NotifyStmt
{ {
NodeTag type; NodeTag type;
char *conditionname; /* condition name to notify */ char *conditionname; /* condition name to notify */
char *payload; /* the payload string, or NULL if none */
} NotifyStmt; } NotifyStmt;
/* ---------------------- /* ----------------------
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.44 2010/02/07 20:48:13 tgl Exp $ * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.45 2010/02/16 22:34:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -68,6 +68,8 @@ typedef enum LWLockId ...@@ -68,6 +68,8 @@ typedef enum LWLockId
AutovacuumScheduleLock, AutovacuumScheduleLock,
SyncScanLock, SyncScanLock,
RelationMappingLock, RelationMappingLock,
AsyncCtlLock,
AsyncQueueLock,
/* Individual lock IDs end here */ /* Individual lock IDs end here */
FirstBufMappingLock, FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
......
...@@ -532,9 +532,9 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS; ...@@ -532,9 +532,9 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS;
CREATE ROLE temp_reset_user; CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user;
-- look changes -- look changes
SELECT relname FROM pg_listener; SELECT pg_listening_channels();
relname pg_listening_channels
----------- -----------------------
foo_event foo_event
(1 row) (1 row)
...@@ -571,9 +571,9 @@ SELECT current_user = 'temp_reset_user'; ...@@ -571,9 +571,9 @@ SELECT current_user = 'temp_reset_user';
-- discard everything -- discard everything
DISCARD ALL; DISCARD ALL;
-- look again -- look again
SELECT relname FROM pg_listener; SELECT pg_listening_channels();
relname pg_listening_channels
--------- -----------------------
(0 rows) (0 rows)
SELECT name FROM pg_prepared_statements; SELECT name FROM pg_prepared_statements;
......
...@@ -107,7 +107,6 @@ SELECT relname, relhasindex ...@@ -107,7 +107,6 @@ SELECT relname, relhasindex
pg_language | t pg_language | t
pg_largeobject | t pg_largeobject | t
pg_largeobject_metadata | t pg_largeobject_metadata | t
pg_listener | f
pg_namespace | t pg_namespace | t
pg_opclass | t pg_opclass | t
pg_operator | t pg_operator | t
...@@ -154,7 +153,7 @@ SELECT relname, relhasindex ...@@ -154,7 +153,7 @@ SELECT relname, relhasindex
timetz_tbl | f timetz_tbl | f
tinterval_tbl | f tinterval_tbl | f
varchar_tbl | f varchar_tbl | f
(143 rows) (142 rows)
-- --
-- another sanity check: every system catalog that has OIDs should have -- another sanity check: every system catalog that has OIDs should have
......
...@@ -165,7 +165,7 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS; ...@@ -165,7 +165,7 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS;
CREATE ROLE temp_reset_user; CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user;
-- look changes -- look changes
SELECT relname FROM pg_listener; SELECT pg_listening_channels();
SELECT name FROM pg_prepared_statements; SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors; SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay; SHOW vacuum_cost_delay;
...@@ -174,7 +174,7 @@ SELECT current_user = 'temp_reset_user'; ...@@ -174,7 +174,7 @@ SELECT current_user = 'temp_reset_user';
-- discard everything -- discard everything
DISCARD ALL; DISCARD ALL;
-- look again -- look again
SELECT relname FROM pg_listener; SELECT pg_listening_channels();
SELECT name FROM pg_prepared_statements; SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors; SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay; SHOW vacuum_cost_delay;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment