Commit b10f40bf authored by Tom Lane's avatar Tom Lane

Improve test coverage for LISTEN/NOTIFY.

We had no actual end-to-end test of NOTIFY message delivery.  In the
core async.sql regression test, testing this is problematic because psql
traditionally prints the PID of the sending backend, making the output
unstable.  We also have an isolation test script, but it likewise
failed to prove that delivery worked, because isolationtester.c had
no provisions for detecting/reporting NOTIFY messages.

Hence, add such provisions to isolationtester.c, and extend
async-notify.spec to include direct tests of basic NOTIFY functionality.

I also added tests showing that NOTIFY de-duplicates messages normally,
but not across subtransaction boundaries.  (That's the historical
behavior since we introduced subtransactions, though perhaps we ought
to change it.)

Patch by me, with suggestions/review by Andres Freund.

Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us
parent 44460d70
Parsed test spec with 2 sessions Parsed test spec with 2 sessions
starting permutation: listen begin check notify check starting permutation: listenc notify1 notify2 notify3 notifyf
step listen: LISTEN a; step listenc: LISTEN c1; LISTEN c2;
step begin: BEGIN; step notify1: NOTIFY c1;
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; notifier: NOTIFY "c1" with payload "" from notifier
step notify2: NOTIFY c2, 'payload';
notifier: NOTIFY "c2" with payload "payload" from notifier
step notify3: NOTIFY c3, 'payload3';
step notifyf: SELECT pg_notify('c2', NULL);
pg_notify
notifier: NOTIFY "c2" with payload "" from notifier
starting permutation: listenc notifyd1 notifyd2 notifys1
step listenc: LISTEN c1; LISTEN c2;
step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload';
notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "" from notifier
step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2';
notifier: NOTIFY "c1" with payload "" from notifier
notifier: NOTIFY "c1" with payload "p1" from notifier
notifier: NOTIFY "c1" with payload "p2" from notifier
step notifys1:
BEGIN;
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
SAVEPOINT s1;
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
ROLLBACK TO SAVEPOINT s2;
COMMIT;
notifier: NOTIFY "c1" with payload "payload" from notifier
notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payload" from notifier
notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
step llisten: LISTEN c1; LISTEN c2;
step notify1: NOTIFY c1;
step notify2: NOTIFY c2, 'payload';
step notify3: NOTIFY c3, 'payload3';
step notifyf: SELECT pg_notify('c2', NULL);
pg_notify
step lcheck: SELECT 1 AS x;
x
1
listener: NOTIFY "c1" with payload "" from notifier
listener: NOTIFY "c2" with payload "payload" from notifier
listener: NOTIFY "c2" with payload "" from notifier
starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck
step listenc: LISTEN c1; LISTEN c2;
step llisten: LISTEN c1; LISTEN c2;
step notify1: NOTIFY c1;
notifier: NOTIFY "c1" with payload "" from notifier
step notify2: NOTIFY c2, 'payload';
notifier: NOTIFY "c2" with payload "payload" from notifier
step notify3: NOTIFY c3, 'payload3';
step notifyf: SELECT pg_notify('c2', NULL);
pg_notify
notifier: NOTIFY "c2" with payload "" from notifier
step lcheck: SELECT 1 AS x;
x
1
listener: NOTIFY "c1" with payload "" from notifier
listener: NOTIFY "c2" with payload "payload" from notifier
listener: NOTIFY "c2" with payload "" from notifier
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
nonzero nonzero
f f
step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
count count
1000 1000
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
nonzero nonzero
t t
...@@ -23,10 +23,12 @@ ...@@ -23,10 +23,12 @@
/* /*
* conns[0] is the global setup, teardown, and watchdog connection. Additional * conns[0] is the global setup, teardown, and watchdog connection. Additional
* connections represent spec-defined sessions. * connections represent spec-defined sessions. We also track the backend
* PID, in numeric and string formats, for each connection.
*/ */
static PGconn **conns = NULL; static PGconn **conns = NULL;
static const char **backend_pids = NULL; static int *backend_pids = NULL;
static const char **backend_pid_strs = NULL;
static int nconns = 0; static int nconns = 0;
/* In dry run only output permutations to be run by the tester. */ /* In dry run only output permutations to be run by the tester. */
...@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps); ...@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */ #define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */ #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
static bool try_complete_step(Step *step, int flags); static bool try_complete_step(TestSpec *testspec, Step *step, int flags);
static int step_qsort_cmp(const void *a, const void *b); static int step_qsort_cmp(const void *a, const void *b);
static int step_bsearch_cmp(const void *a, const void *b); static int step_bsearch_cmp(const void *a, const void *b);
...@@ -159,9 +161,11 @@ main(int argc, char **argv) ...@@ -159,9 +161,11 @@ main(int argc, char **argv)
* extra for lock wait detection and global work. * extra for lock wait detection and global work.
*/ */
nconns = 1 + testspec->nsessions; nconns = 1 + testspec->nsessions;
conns = calloc(nconns, sizeof(PGconn *)); conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
atexit(disconnect_atexit); atexit(disconnect_atexit);
backend_pids = calloc(nconns, sizeof(*backend_pids));
for (i = 0; i < nconns; i++) for (i = 0; i < nconns; i++)
{ {
conns[i] = PQconnectdb(conninfo); conns[i] = PQconnectdb(conninfo);
...@@ -187,26 +191,9 @@ main(int argc, char **argv) ...@@ -187,26 +191,9 @@ main(int argc, char **argv)
blackholeNoticeProcessor, blackholeNoticeProcessor,
NULL); NULL);
/* Get the backend pid for lock wait checking. */ /* Save each connection's backend PID for subsequent use. */
res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()"); backend_pids[i] = PQbackendPID(conns[i]);
if (PQresultStatus(res) == PGRES_TUPLES_OK) backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
{
if (PQntuples(res) == 1 && PQnfields(res) == 1)
backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
else
{
fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
PQntuples(res), PQnfields(res));
exit(1);
}
}
else
{
fprintf(stderr, "backend pid query failed: %s",
PQerrorMessage(conns[i]));
exit(1);
}
PQclear(res);
} }
/* Set the session index fields in steps. */ /* Set the session index fields in steps. */
...@@ -231,9 +218,9 @@ main(int argc, char **argv) ...@@ -231,9 +218,9 @@ main(int argc, char **argv)
appendPQExpBufferStr(&wait_query, appendPQExpBufferStr(&wait_query,
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{"); "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
/* The spec syntax requires at least one session; assume that here. */ /* The spec syntax requires at least one session; assume that here. */
appendPQExpBufferStr(&wait_query, backend_pids[1]); appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
for (i = 2; i < nconns; i++) for (i = 2; i < nconns; i++)
appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]); appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
appendPQExpBufferStr(&wait_query, "}')"); appendPQExpBufferStr(&wait_query, "}')");
res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL); res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
...@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) ...@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
oldstep = waiting[w]; oldstep = waiting[w];
/* Wait for previous step on this connection. */ /* Wait for previous step on this connection. */
try_complete_step(oldstep, STEP_RETRY); try_complete_step(testspec, oldstep, STEP_RETRY);
/* Remove that step from the waiting[] array. */ /* Remove that step from the waiting[] array. */
if (w + 1 < nwaiting) if (w + 1 < nwaiting)
...@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) ...@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
nerrorstep = 0; nerrorstep = 0;
while (w < nwaiting) while (w < nwaiting)
{ {
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) if (try_complete_step(testspec, waiting[w],
STEP_NONBLOCK | STEP_RETRY))
{ {
/* Still blocked on a lock, leave it alone. */ /* Still blocked on a lock, leave it alone. */
w++; w++;
...@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) ...@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
} }
/* Try to complete this step without blocking. */ /* Try to complete this step without blocking. */
mustwait = try_complete_step(step, STEP_NONBLOCK); mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);
/* Check for completion of any steps that were previously waiting. */ /* Check for completion of any steps that were previously waiting. */
w = 0; w = 0;
nerrorstep = 0; nerrorstep = 0;
while (w < nwaiting) while (w < nwaiting)
{ {
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) if (try_complete_step(testspec, waiting[w],
STEP_NONBLOCK | STEP_RETRY))
w++; w++;
else else
{ {
...@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) ...@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
/* Wait for any remaining queries. */ /* Wait for any remaining queries. */
for (w = 0; w < nwaiting; ++w) for (w = 0; w < nwaiting; ++w)
{ {
try_complete_step(waiting[w], STEP_RETRY); try_complete_step(testspec, waiting[w], STEP_RETRY);
report_error_message(waiting[w]); report_error_message(waiting[w]);
} }
...@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) ...@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
* a lock, returns true. Otherwise, returns false. * a lock, returns true. Otherwise, returns false.
*/ */
static bool static bool
try_complete_step(Step *step, int flags) try_complete_step(TestSpec *testspec, Step *step, int flags)
{ {
PGconn *conn = conns[1 + step->session]; PGconn *conn = conns[1 + step->session];
fd_set read_set; fd_set read_set;
...@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags) ...@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
int sock = PQsocket(conn); int sock = PQsocket(conn);
int ret; int ret;
PGresult *res; PGresult *res;
PGnotify *notify;
bool canceled = false; bool canceled = false;
if (sock < 0) if (sock < 0)
...@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags) ...@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
bool waiting; bool waiting;
res = PQexecPrepared(conns[0], PREP_WAITING, 1, res = PQexecPrepared(conns[0], PREP_WAITING, 1,
&backend_pids[step->session + 1], &backend_pid_strs[step->session + 1],
NULL, NULL, 0); NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK || if (PQresultStatus(res) != PGRES_TUPLES_OK ||
PQntuples(res) != 1) PQntuples(res) != 1)
...@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags) ...@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags)
PQclear(res); PQclear(res);
} }
/* Report any available NOTIFY messages, too */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
/* Try to identify which session it came from */
const char *sendername = NULL;
char pidstring[32];
for (int i = 0; i < testspec->nsessions; i++)
{
if (notify->be_pid == backend_pids[i + 1])
{
sendername = testspec->sessions[i]->name;
break;
}
}
if (sendername == NULL)
{
/* Doesn't seem to be any test session, so show the hard way */
snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
sendername = pidstring;
}
printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
testspec->sessions[step->session]->name,
notify->relname, notify->extra, sendername);
PQfreemem(notify);
PQconsumeInput(conn);
}
return false; return false;
} }
......
# Verify that pg_notification_queue_usage correctly reports a non-zero result, # Tests for LISTEN/NOTIFY
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction.
session "listener" # Most of these tests use only the "notifier" session and hence exercise only
step "listen" { LISTEN a; } # self-notifies, which are convenient because they minimize timing concerns.
step "begin" { BEGIN; } # Note we assume that each step is delivered to the backend as a single Query
teardown { ROLLBACK; UNLISTEN *; } # message so it will run as one transaction.
session "notifier" session "notifier"
step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } step "listenc" { LISTEN c1; LISTEN c2; }
step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; } step "notify1" { NOTIFY c1; }
step "notify2" { NOTIFY c2, 'payload'; }
step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3
step "notifyf" { SELECT pg_notify('c2', NULL); }
step "notifyd1" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; }
step "notifyd2" { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; }
step "notifys1" {
BEGIN;
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
SAVEPOINT s1;
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
ROLLBACK TO SAVEPOINT s2;
COMMIT;
}
step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
teardown { UNLISTEN *; }
# The listener session is used for cross-backend notify checks.
session "listener"
step "llisten" { LISTEN c1; LISTEN c2; }
step "lcheck" { SELECT 1 AS x; }
step "lbegin" { BEGIN; }
teardown { UNLISTEN *; }
# Trivial cases.
permutation "listenc" "notify1" "notify2" "notify3" "notifyf"
# Check simple and less-simple deduplication.
permutation "listenc" "notifyd1" "notifyd2" "notifys1"
# Cross-backend notification delivery. We use a "select 1" to force the
# listener session to check for notifies. In principle we could just wait
# for delivery, but that would require extra support in isolationtester
# and might have portability-of-timing issues.
permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
# Again, with local delivery too.
permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction. We have to
# fill a page of the notify SLRU to make this happen, which is a good deal
# of traffic. To not bloat the expected output, we intentionally don't
# commit the listener's transaction, so that it never reports these events.
# Hence, this should be the last test in this script.
permutation "listen" "begin" "check" "notify" "check" permutation "llisten" "lbegin" "usage" "bignotify" "usage"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment