Commit 58b5ae9d authored by Amit Kapila's avatar Amit Kapila

Add additional tests to test streaming of in-progress transactions.

This covers the functionality tests for streaming in-progress
subtransactions, streaming transactions containing rollback to savepoints,
and streaming transactions having DDLs.

Author: Tomas Vondra, Amit Kapila and Dilip Kumar
Reviewed-by: Dilip Kumar
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent 88709176
# Test streaming of large transaction containing large subtransactions
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 2;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Insert, update and delete enough rows to exceed 64kB limit.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(1667|1667|1667), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
$node_subscriber->stop;
$node_publisher->stop;
# Test streaming of large transaction with DDL and subtransactions
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 3;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|0|0), 'check initial data was copied to subscriber');
# a small (non-streamed) transaction with DDL and DML
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (3, md5(3::text));
ALTER TABLE test_tab ADD COLUMN c INT;
SAVEPOINT s1;
INSERT INTO test_tab VALUES (4, md5(4::text), -4);
COMMIT;
});
# large (streamed) transaction with DDL and DML
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i);
ALTER TABLE test_tab ADD COLUMN d INT;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i);
COMMIT;
});
# a small (non-streamed) transaction with DDL and DML
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001);
ALTER TABLE test_tab ADD COLUMN e INT;
SAVEPOINT s1;
INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002);
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab");
is($result, qq(2002|1999|1002|1), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
# A large (streamed) transaction with DDL and DML. One of the DDL is performed
# after DML to ensure that we invalidate the schema sent for test_tab so that
# the next transaction has to send the schema again.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i);
ALTER TABLE test_tab ADD COLUMN f INT;
COMMIT;
});
# A small transaction that won't get streamed. This is just to ensure that we
# send the schema again to reflect the last column added in the previous test.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i);
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab");
is($result, qq(5005|5002|4005|3004|5), 'check data was copied to subscriber for both streaming and non-streaming transactions');
$node_subscriber->stop;
$node_publisher->stop;
# Test streaming of large transaction containing multiple subtransactions and rollbacks
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 4;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
# large (streamed) transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
ROLLBACK TO s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
ROLLBACK TO s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
SAVEPOINT s5;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(2000|0), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
# large (streamed) transaction with subscriber receiving out of order
# subtransaction ROLLBACKs
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
RELEASE s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
ROLLBACK TO s1;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(2500|0), 'check rollback to savepoint was reflected on subscriber');
# large (streamed) transaction with subscriber receiving rollback
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
ROLLBACK;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(2500|0), 'check rollback was reflected on subscriber');
$node_subscriber->stop;
$node_publisher->stop;
# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
# rollbacks
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 2;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
# large (streamed) transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
ALTER TABLE test_tab ADD COLUMN c INT;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
ALTER TABLE test_tab ADD COLUMN d INT;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
ALTER TABLE test_tab ADD COLUMN e INT;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
ALTER TABLE test_tab DROP COLUMN c;
ROLLBACK TO s1;
INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
is($result, qq(1000|500), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
$node_subscriber->stop;
$node_publisher->stop;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment