Commit 9a7b7adc authored by Michael Paquier's avatar Michael Paquier

Make logical WAL sender report streaming state appropriately

WAL senders sending logically-decoded data fail to properly report in
"streaming" state when starting up, hence as long as one extra record is
not replayed, such WAL senders would remain in a "catchup" state, which
is inconsistent with the physical cousin.

This can be easily reproduced by for example using pg_recvlogical and
restarting the upstream server.  The TAP tests have been slightly
modified to detect the failure and strengthened so as future tests also
make sure that a node is in streaming state when waiting for its
catchup.

Backpatch down to 9.4 where this code has been introduced.

Reported-by: Sawada Masahiko
Author: Simon Riggs, Sawada Masahiko
Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran
Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
parent 39a96512
...@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (MyWalSnd->state == WALSNDSTATE_CATCHUP) if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{ {
ereport(DEBUG1, ereport(DEBUG1,
(errmsg("standby \"%s\" has now caught up with primary", (errmsg("\"%s\" has now caught up with upstream server",
application_name))); application_name)));
WalSndSetState(WALSNDSTATE_STREAMING); WalSndSetState(WALSNDSTATE_STREAMING);
} }
...@@ -2758,10 +2758,10 @@ XLogSendLogical(void) ...@@ -2758,10 +2758,10 @@ XLogSendLogical(void)
char *errm; char *errm;
/* /*
* Don't know whether we've caught up yet. We'll set it to true in * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
* WalSndWaitForWal, if we're actually waiting. We also set to true if * true in WalSndWaitForWal, if we're actually waiting. We also set to
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
* i.e. when we're shutting down. * didn't wait - i.e. when we're shutting down.
*/ */
WalSndCaughtUp = false; WalSndCaughtUp = false;
...@@ -2774,6 +2774,9 @@ XLogSendLogical(void) ...@@ -2774,6 +2774,9 @@ XLogSendLogical(void)
if (record != NULL) if (record != NULL)
{ {
/* XXX: Note that logical decoding cannot be used while in recovery */
XLogRecPtr flushPtr = GetFlushRecPtr();
/* /*
* Note the lack of any call to LagTrackerWrite() which is handled by * Note the lack of any call to LagTrackerWrite() which is handled by
* WalSndUpdateProgress which is called by output plugin through * WalSndUpdateProgress which is called by output plugin through
...@@ -2782,6 +2785,13 @@ XLogSendLogical(void) ...@@ -2782,6 +2785,13 @@ XLogSendLogical(void)
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr; sentPtr = logical_decoding_ctx->reader->EndRecPtr;
/*
* If we have sent a record that is at or beyond the flushed point, we
* have caught up.
*/
if (sentPtr >= flushPtr)
WalSndCaughtUp = true;
} }
else else
{ {
......
...@@ -1535,7 +1535,8 @@ also works for logical subscriptions) ...@@ -1535,7 +1535,8 @@ also works for logical subscriptions)
until its replication location in pg_stat_replication equals or passes the until its replication location in pg_stat_replication equals or passes the
upstream's WAL insert point at the time this function is called. By default upstream's WAL insert point at the time this function is called. By default
the replay_lsn is waited for, but 'mode' may be specified to wait for any of the replay_lsn is waited for, but 'mode' may be specified to wait for any of
sent|write|flush|replay. sent|write|flush|replay. The connection catching up must be in a streaming
state.
If there is no active replication connection from this peer, waits until If there is no active replication connection from this peer, waits until
poll_query_until timeout. poll_query_until timeout.
...@@ -1580,7 +1581,7 @@ sub wait_for_catchup ...@@ -1580,7 +1581,7 @@ sub wait_for_catchup
. $lsn_expr . " on " . $lsn_expr . " on "
. $self->name . "\n"; . $self->name . "\n";
my $query = my $query =
qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
$self->poll_query_until('postgres', $query) $self->poll_query_until('postgres', $query)
or croak "timed out waiting for catchup"; or croak "timed out waiting for catchup";
print "done\n"; print "done\n";
......
...@@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres', ...@@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1001,1100)"); "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
# Restart the publisher and check the state of the subscriber which
# should be in a streaming state after catching up.
$node_publisher->stop('fast');
$node_publisher->start;
$node_publisher->wait_for_catchup($appname); $node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment