Commit bbd3363e authored by Peter Eisentraut's avatar Peter Eisentraut

Refactor subscription tests to use PostgresNode's wait_for_catchup

This was nearly the same code.  Extend wait_for_catchup to allow waiting
for pg_current_wal_lsn() and use that in the subscription tests.  Also
change one use in the pg_rewind tests to use this.

Also remove some broken code in wait_for_catchup and
wait_for_slot_catchup.  The error message in case the waiting failed
wanted to show the current LSN, but the way it was written never
worked.  So since nobody ever cared, just remove it.
Reviewed-by: default avatarMichael Paquier <michael.paquier@gmail.com>
parent 4d41b2e0
...@@ -163,10 +163,7 @@ sub promote_standby ...@@ -163,10 +163,7 @@ sub promote_standby
# up standby # up standby
# Wait for the standby to receive and write all WAL. # Wait for the standby to receive and write all WAL.
my $wal_received_query = $node_master->wait_for_catchup('rewind_standby', 'write');
"SELECT pg_current_wal_lsn() = write_lsn FROM pg_stat_replication WHERE application_name = 'rewind_standby';";
$node_master->poll_query_until('postgres', $wal_received_query)
or die "Timed out while waiting for standby to receive and write WAL";
# Now promote standby and insert some new data on master, this will put # Now promote standby and insert some new data on master, this will put
# the master out-of-sync with the standby. # the master out-of-sync with the standby.
......
...@@ -1465,7 +1465,8 @@ sub lsn ...@@ -1465,7 +1465,8 @@ sub lsn
=item $node->wait_for_catchup(standby_name, mode, target_lsn) =item $node->wait_for_catchup(standby_name, mode, target_lsn)
Wait for the node with application_name standby_name (usually from node->name) Wait for the node with application_name standby_name (usually from node->name,
also works for logical subscriptions)
until its replication location in pg_stat_replication equals or passes the until its replication location in pg_stat_replication equals or passes the
upstream's WAL insert point at the time this function is called. By default upstream's WAL insert point at the time this function is called. By default
the replay_lsn is waited for, but 'mode' may be specified to wait for any of the replay_lsn is waited for, but 'mode' may be specified to wait for any of
...@@ -1477,6 +1478,7 @@ poll_query_until timeout. ...@@ -1477,6 +1478,7 @@ poll_query_until timeout.
Requires that the 'postgres' db exists and is accessible. Requires that the 'postgres' db exists and is accessible.
target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert'). target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
If omitted, pg_current_wal_lsn() is used.
This is not a test. It die()s on failure. This is not a test. It die()s on failure.
...@@ -1497,7 +1499,15 @@ sub wait_for_catchup ...@@ -1497,7 +1499,15 @@ sub wait_for_catchup
{ {
$standby_name = $standby_name->name; $standby_name = $standby_name->name;
} }
die 'target_lsn must be specified' unless defined($target_lsn); my $lsn_expr;
if (defined($target_lsn))
{
$lsn_expr = "'$target_lsn'";
}
else
{
$lsn_expr = 'pg_current_wal_lsn()'
}
print "Waiting for replication conn " print "Waiting for replication conn "
. $standby_name . "'s " . $standby_name . "'s "
. $mode . $mode
...@@ -1505,10 +1515,9 @@ sub wait_for_catchup ...@@ -1505,10 +1515,9 @@ sub wait_for_catchup
. $target_lsn . " on " . $target_lsn . " on "
. $self->name . "\n"; . $self->name . "\n";
my $query = my $query =
qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
$self->poll_query_until('postgres', $query) $self->poll_query_until('postgres', $query)
or die "timed out waiting for catchup, current location is " or die "timed out waiting for catchup";
. ($self->safe_psql('postgres', $query) || '(unknown)');
print "done\n"; print "done\n";
} }
...@@ -1550,8 +1559,7 @@ sub wait_for_slot_catchup ...@@ -1550,8 +1559,7 @@ sub wait_for_slot_catchup
my $query = my $query =
qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';]; qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
$self->poll_query_until('postgres', $query) $self->poll_query_until('postgres', $query)
or die "timed out waiting for catchup, current location is " or die "timed out waiting for catchup";
. ($self->safe_psql('postgres', $query) || '(unknown)');
print "done\n"; print "done\n";
} }
......
...@@ -60,11 +60,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -60,11 +60,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only"
); );
# Wait for subscriber to finish initialization $node_publisher->wait_for_catchup($appname);
my $caughtup_query =
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
# Also wait for initial table sync to finish # Also wait for initial table sync to finish
my $synced_query = my $synced_query =
...@@ -93,8 +89,7 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); ...@@ -93,8 +89,7 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_mixed VALUES (2, 'bar')"); "INSERT INTO tab_mixed VALUES (2, 'bar')");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins"); "SELECT count(*), min(a), max(a) FROM tab_ins");
...@@ -132,9 +127,7 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); ...@@ -132,9 +127,7 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
# Wait for subscription to catch up $node_publisher->wait_for_catchup($appname);
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_full"); "SELECT count(*), min(a), max(a) FROM tab_full");
...@@ -176,8 +169,7 @@ $node_publisher->safe_psql('postgres', ...@@ -176,8 +169,7 @@ $node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1001,1100)"); "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins"); "SELECT count(*), min(a), max(a) FROM tab_ins");
...@@ -200,8 +192,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -200,8 +192,7 @@ $node_subscriber->safe_psql('postgres',
); );
$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# note that data are different on provider and subscriber # note that data are different on provider and subscriber
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
......
...@@ -106,11 +106,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -106,11 +106,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
); );
# Wait for subscriber to finish initialization $node_publisher->wait_for_catchup($appname);
my $caughtup_query =
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
# Wait for initial sync to finish as well # Wait for initial sync to finish as well
my $synced_query = my $synced_query =
...@@ -246,8 +242,7 @@ $node_publisher->safe_psql( ...@@ -246,8 +242,7 @@ $node_publisher->safe_psql(
(4, '"yellow horse"=>"moaned"'); (4, '"yellow horse"=>"moaned"');
)); ));
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# Check the data on subscriber # Check the data on subscriber
my $result = $node_subscriber->safe_psql( my $result = $node_subscriber->safe_psql(
...@@ -368,8 +363,7 @@ $node_publisher->safe_psql( ...@@ -368,8 +363,7 @@ $node_publisher->safe_psql(
UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3; UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3;
)); ));
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# Check the data on subscriber # Check the data on subscriber
$result = $node_subscriber->safe_psql( $result = $node_subscriber->safe_psql(
...@@ -489,8 +483,7 @@ $node_publisher->safe_psql( ...@@ -489,8 +483,7 @@ $node_publisher->safe_psql(
DELETE FROM tst_hstore WHERE a = 1; DELETE FROM tst_hstore WHERE a = 1;
)); ));
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# Check the data on subscriber # Check the data on subscriber
$result = $node_subscriber->safe_psql( $result = $node_subscriber->safe_psql(
......
...@@ -39,19 +39,14 @@ $node_subscriber->safe_psql('postgres', ...@@ -39,19 +39,14 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = false)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = false)"
); );
# Wait for subscriber to finish initialization $node_publisher->wait_for_catchup($appname);
my $caughtup_query =
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk (bid) VALUES (1);"); "INSERT INTO tab_fk (bid) VALUES (1);");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);"); "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# Check data on subscriber # Check data on subscriber
my $result = $node_subscriber->safe_psql('postgres', my $result = $node_subscriber->safe_psql('postgres',
...@@ -69,8 +64,7 @@ $node_publisher->safe_psql('postgres', "DROP TABLE tab_fk CASCADE;"); ...@@ -69,8 +64,7 @@ $node_publisher->safe_psql('postgres', "DROP TABLE tab_fk CASCADE;");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);"); "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# FK is not enforced on subscriber # FK is not enforced on subscriber
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
...@@ -104,8 +98,7 @@ ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; ...@@ -104,8 +98,7 @@ ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);"); "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
$node_publisher->poll_query_until('postgres', $caughtup_query) $node_publisher->wait_for_catchup($appname);
or die "Timed out while waiting for subscriber to catch up";
# The row should be skipped on subscriber # The row should be skipped on subscriber
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
......
...@@ -37,11 +37,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -37,11 +37,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
); );
# Wait for subscriber to finish initialization $node_publisher->wait_for_catchup($appname);
my $caughtup_query =
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
# Also wait for initial table sync to finish # Also wait for initial table sync to finish
my $synced_query = my $synced_query =
...@@ -124,9 +120,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)"); ...@@ -124,9 +120,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"); "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)");
# Wait for subscription to catch up $node_publisher->wait_for_catchup($appname);
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next"); "SELECT count(*) FROM tab_rep_next");
...@@ -149,9 +143,7 @@ is($result, qq(10), ...@@ -149,9 +143,7 @@ is($result, qq(10),
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_rep_next SELECT generate_series(1,10)"); "INSERT INTO tab_rep_next SELECT generate_series(1,10)");
# Wait for subscription to catch up $node_publisher->wait_for_catchup($appname);
$node_publisher->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for subscriber to catch up";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next"); "SELECT count(*) FROM tab_rep_next");
......
...@@ -5,15 +5,6 @@ use PostgresNode; ...@@ -5,15 +5,6 @@ use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 1; use Test::More tests => 1;
sub wait_for_caught_up
{
my ($node, $appname) = @_;
$node->poll_query_until('postgres',
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
) or die "Timed out while waiting for subscriber to catch up";
}
my $node_publisher = get_new_node('publisher'); my $node_publisher = get_new_node('publisher');
$node_publisher->init( $node_publisher->init(
allows_streaming => 'logical', allows_streaming => 'logical',
...@@ -39,7 +30,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -39,7 +30,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;" "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
); );
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
# Wait for initial sync to finish as well # Wait for initial sync to finish as well
my $synced_query = my $synced_query =
...@@ -50,7 +41,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query) ...@@ -50,7 +41,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query)
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
is( $node_subscriber->safe_psql( is( $node_subscriber->safe_psql(
'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'} 'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'}
......
...@@ -5,15 +5,6 @@ use PostgresNode; ...@@ -5,15 +5,6 @@ use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 2; use Test::More tests => 2;
sub wait_for_caught_up
{
my ($node, $appname) = @_;
$node->poll_query_until('postgres',
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
) or die "Timed out while waiting for subscriber to catch up";
}
my $node_publisher = get_new_node('publisher'); my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
$node_publisher->start; $node_publisher->start;
...@@ -35,7 +26,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -35,7 +26,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;" "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
); );
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
# Wait for initial sync to finish as well # Wait for initial sync to finish as well
my $synced_query = my $synced_query =
...@@ -45,7 +36,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query) ...@@ -45,7 +36,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query)
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
is($node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}), is($node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}),
qq(1|one qq(1|one
...@@ -57,11 +48,11 @@ my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;"; ...@@ -57,11 +48,11 @@ my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;";
$node_subscriber->safe_psql('postgres', $ddl2); $node_subscriber->safe_psql('postgres', $ddl2);
$node_publisher->safe_psql('postgres', $ddl2); $node_publisher->safe_psql('postgres', $ddl2);
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);}); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);});
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
is($node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}), is($node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}),
qq(1|one|0 qq(1|one|0
......
...@@ -5,15 +5,6 @@ use PostgresNode; ...@@ -5,15 +5,6 @@ use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 1; use Test::More tests => 1;
sub wait_for_caught_up
{
my ($node, $appname) = @_;
$node->poll_query_until('postgres',
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
) or die "Timed out while waiting for subscriber to catch up";
}
my $node_publisher = get_new_node('publisher'); my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
$node_publisher->start; $node_publisher->start;
...@@ -35,7 +26,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -35,7 +26,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;" "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
); );
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
$node_subscriber->safe_psql('postgres', q{ $node_subscriber->safe_psql('postgres', q{
BEGIN; BEGIN;
......
...@@ -5,15 +5,6 @@ use PostgresNode; ...@@ -5,15 +5,6 @@ use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 3; use Test::More tests => 3;
sub wait_for_caught_up
{
my ($node, $appname) = @_;
$node->poll_query_until('postgres',
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
) or die "Timed out while waiting for subscriber to catch up";
}
# Create publisher node # Create publisher node
my $node_publisher = get_new_node('publisher'); my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
...@@ -42,7 +33,7 @@ $node_subscriber->safe_psql('postgres', ...@@ -42,7 +33,7 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
); );
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish # Also wait for initial table sync to finish
my $synced_query = my $synced_query =
...@@ -58,7 +49,7 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber'); ...@@ -58,7 +49,7 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# subscriber didn't change # subscriber didn't change
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)"); $node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)");
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
$result = $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
...@@ -70,7 +61,7 @@ is($result, qq(2|2|2), 'check extra columns contain local defaults'); ...@@ -70,7 +61,7 @@ is($result, qq(2|2|2), 'check extra columns contain local defaults');
$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); $node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); $node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
wait_for_caught_up($node_publisher, $appname); $node_publisher->wait_for_catchup($appname);
$result = $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment