Commit d6fa44fc authored by Andres Freund's avatar Andres Freund

Add skip-empty-xacts option to test_decoding for use in the regression tests.

The regression tests for contrib/test_decoding regularly failed on
postgres instances that were very slow. Either because the hardware
itself was slow or because very expensive debugging options like
CLOBBER_CACHE_ALWAYS were used.

The reason they failed was just that some additional transactions were
decoded. Analyze and vacuum, triggered by autovac.

To fix just add a option to test_decoding to only display transactions
in which a change was actually displayed. That's not pretty because it
removes information from the tests; but better than constantly failing
tests in very likely harmless ways.

Backpatch to 9.4 where logical decoding was introduced.

Discussion: 20140629142511.GA26930@awork2.anarazel.de
parent 9a0a12f6
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -58,19 +58,17 @@ SELECT txid_current() = 0; ...@@ -58,19 +58,17 @@ SELECT txid_current() = 0;
-- don't show yet, haven't committed -- don't show yet, haven't committed
INSERT INTO nobarf(data) VALUES('2'); INSERT INTO nobarf(data) VALUES('2');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
----------------------------------------------------------- -----------------------------------------------------------
BEGIN
COMMIT
BEGIN BEGIN
table public.nobarf: INSERT: id[integer]:1 data[text]:'1' table public.nobarf: INSERT: id[integer]:1 data[text]:'1'
COMMIT COMMIT
(5 rows) (3 rows)
COMMIT; COMMIT;
INSERT INTO nobarf(data) VALUES('3'); INSERT INTO nobarf(data) VALUES('3');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
----------------------------------------------------------- -----------------------------------------------------------
BEGIN BEGIN
......
...@@ -14,7 +14,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -14,7 +14,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
(1 row) (1 row)
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
-------------------------------------------------------------- --------------------------------------------------------------
BEGIN BEGIN
...@@ -39,7 +39,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -39,7 +39,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
ERROR: permission denied for relation lr_test ERROR: permission denied for relation lr_test
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
------ ------
(0 rows) (0 rows)
...@@ -57,7 +57,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -57,7 +57,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
ERROR: must be superuser or replication role to use replication slots ERROR: must be superuser or replication role to use replication slots
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
ERROR: permission denied for relation lr_test ERROR: permission denied for relation lr_test
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
ERROR: must be superuser or replication role to use replication slots ERROR: must be superuser or replication role to use replication slots
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
ERROR: must be superuser or replication role to use replication slots ERROR: must be superuser or replication role to use replication slots
......
...@@ -39,13 +39,9 @@ INSERT INTO test_prepared2 VALUES (9); ...@@ -39,13 +39,9 @@ INSERT INTO test_prepared2 VALUES (9);
DROP TABLE test_prepared1; DROP TABLE test_prepared1;
DROP TABLE test_prepared2; DROP TABLE test_prepared2;
-- show results -- show results
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
------------------------------------------------------------------------- -------------------------------------------------------------------------
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN BEGIN
table public.test_prepared1: INSERT: id[integer]:1 table public.test_prepared1: INSERT: id[integer]:1
COMMIT COMMIT
...@@ -68,11 +64,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc ...@@ -68,11 +64,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
BEGIN BEGIN
table public.test_prepared2: INSERT: id[integer]:9 table public.test_prepared2: INSERT: id[integer]:9
COMMIT COMMIT
BEGIN (22 rows)
COMMIT
BEGIN
COMMIT
(30 rows)
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot pg_drop_replication_slot
......
...@@ -9,15 +9,13 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -9,15 +9,13 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
INSERT INTO replication_example(somedata) VALUES (1); INSERT INTO replication_example(somedata) VALUES (1);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
---------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------
BEGIN
COMMIT
BEGIN BEGIN
table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:null table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:null
COMMIT COMMIT
(5 rows) (3 rows)
BEGIN; BEGIN;
INSERT INTO replication_example(somedata) VALUES (2); INSERT INTO replication_example(somedata) VALUES (2);
...@@ -58,7 +56,7 @@ INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5 ...@@ -58,7 +56,7 @@ INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5
COMMIT; COMMIT;
-- make old files go away -- make old files go away
CHECKPOINT; CHECKPOINT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data data
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN BEGIN
...@@ -70,33 +68,13 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc ...@@ -70,33 +68,13 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
table public.replication_example: INSERT: id[integer]:5 somedata[integer]:4 text[character varying]:null testcolumn1[integer]:2 testcolumn2[integer]:1 table public.replication_example: INSERT: id[integer]:5 somedata[integer]:4 text[character varying]:null testcolumn1[integer]:2 testcolumn2[integer]:1
COMMIT COMMIT
BEGIN BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
table public.replication_example: INSERT: id[integer]:6 somedata[integer]:5 text[character varying]:null testcolumn1[integer]:3 testcolumn2[integer]:null table public.replication_example: INSERT: id[integer]:6 somedata[integer]:5 text[character varying]:null testcolumn1[integer]:3 testcolumn2[integer]:null
COMMIT COMMIT
BEGIN BEGIN
table public.replication_example: INSERT: id[integer]:7 somedata[integer]:6 text[character varying]:null testcolumn1[integer]:4 testcolumn2[integer]:null table public.replication_example: INSERT: id[integer]:7 somedata[integer]:6 text[character varying]:null testcolumn1[integer]:4 testcolumn2[integer]:null
table public.replication_example: INSERT: id[integer]:8 somedata[integer]:7 text[character varying]:null testcolumn1[integer]:5 testcolumn2[integer]:null testcolumn3[integer]:1 table public.replication_example: INSERT: id[integer]:8 somedata[integer]:7 text[character varying]:null testcolumn1[integer]:5 testcolumn2[integer]:null testcolumn3[integer]:1
COMMIT COMMIT
(35 rows) (15 rows)
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot pg_drop_replication_slot
......
...@@ -48,13 +48,9 @@ CREATE TABLE toasted_copy ( ...@@ -48,13 +48,9 @@ CREATE TABLE toasted_copy (
); );
ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL; ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
\copy toasted_copy FROM STDIN \copy toasted_copy FROM STDIN
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
substr substr
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN BEGIN
table public.xpto: INSERT: id[integer]:1 toasted_col1[text]:'1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 table public.xpto: INSERT: id[integer]:1 toasted_col1[text]:'1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
COMMIT COMMIT
...@@ -71,12 +67,6 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', ...@@ -71,12 +67,6 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
table public.xpto: DELETE: id[integer]:1 table public.xpto: DELETE: id[integer]:1
COMMIT COMMIT
BEGIN BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
table public.toasted_key: INSERT: id[integer]:1 toasted_key[text]:'1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123 table public.toasted_key: INSERT: id[integer]:1 toasted_key[text]:'1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123
COMMIT COMMIT
BEGIN BEGIN
...@@ -89,10 +79,6 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', ...@@ -89,10 +79,6 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
table public.toasted_key: DELETE: toasted_key[text]:'123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567 table public.toasted_key: DELETE: toasted_key[text]:'123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567
COMMIT COMMIT
BEGIN BEGIN
COMMIT
BEGIN
COMMIT
BEGIN
table public.toasted_copy: INSERT: id[integer]:1 data[text]:'untoasted1' table public.toasted_copy: INSERT: id[integer]:1 data[text]:'untoasted1'
table public.toasted_copy: INSERT: id[integer]:2 data[text]:'toasted1-1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 table public.toasted_copy: INSERT: id[integer]:2 data[text]:'toasted1-1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
table public.toasted_copy: INSERT: id[integer]:3 data[text]:'untoasted2' table public.toasted_copy: INSERT: id[integer]:3 data[text]:'untoasted2'
...@@ -297,7 +283,7 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', ...@@ -297,7 +283,7 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
table public.toasted_copy: INSERT: id[integer]:202 data[text]:'untoasted199' table public.toasted_copy: INSERT: id[integer]:202 data[text]:'untoasted199'
table public.toasted_copy: INSERT: id[integer]:203 data[text]:'untoasted200' table public.toasted_copy: INSERT: id[integer]:203 data[text]:'untoasted200'
COMMIT COMMIT
(246 rows) (232 rows)
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot pg_drop_replication_slot
......
...@@ -50,7 +50,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte ...@@ -50,7 +50,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte
step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; } step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; }
step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; } step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; }
step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); } step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
......
...@@ -19,7 +19,7 @@ SELECT pg_drop_replication_slot('regression_slot'); ...@@ -19,7 +19,7 @@ SELECT pg_drop_replication_slot('regression_slot');
-- check that we're detecting a streaming rep slot used for logical decoding -- check that we're detecting a streaming rep slot used for logical decoding
SELECT 'init' FROM pg_create_physical_replication_slot('repl'); SELECT 'init' FROM pg_create_physical_replication_slot('repl');
SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('repl'); SELECT pg_drop_replication_slot('repl');
...@@ -64,11 +64,11 @@ ALTER TABLE replication_example RENAME COLUMN text TO somenum; ...@@ -64,11 +64,11 @@ ALTER TABLE replication_example RENAME COLUMN text TO somenum;
INSERT INTO replication_example(somedata, somenum) VALUES (4, 1); INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
-- collect all changes -- collect all changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
-- throw away changes, they contain oids -- throw away changes, they contain oids
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
...@@ -82,21 +82,21 @@ INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2); ...@@ -82,21 +82,21 @@ INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2);
COMMIT; COMMIT;
-- show changes -- show changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- hide changes bc of oid visible in full table rewrites -- hide changes bc of oid visible in full table rewrites
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
INSERT INTO tr_unique(data) VALUES(10); INSERT INTO tr_unique(data) VALUES(10);
ALTER TABLE tr_unique RENAME TO tr_pkey; ALTER TABLE tr_unique RENAME TO tr_pkey;
ALTER TABLE tr_pkey ADD COLUMN id serial primary key; ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO tr_pkey(data) VALUES(1); INSERT INTO tr_pkey(data) VALUES(1);
--show deletion with primary key --show deletion with primary key
DELETE FROM tr_pkey; DELETE FROM tr_pkey;
/* display results */ /* display results */
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/* /*
* check that disk spooling works * check that disk spooling works
...@@ -110,7 +110,7 @@ COMMIT; ...@@ -110,7 +110,7 @@ COMMIT;
/* display results, but hide most of the output */ /* display results, but hide most of the output */
SELECT count(*), min(data), max(data) SELECT count(*), min(data), max(data)
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
GROUP BY substring(data, 1, 24) GROUP BY substring(data, 1, 24)
ORDER BY 1,2; ORDER BY 1,2;
...@@ -138,7 +138,7 @@ INSERT INTO tr_sub(path) VALUES ('1-top-2-#1'); ...@@ -138,7 +138,7 @@ INSERT INTO tr_sub(path) VALUES ('1-top-2-#1');
RELEASE SAVEPOINT b; RELEASE SAVEPOINT b;
COMMIT; COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- check that we handle xlog assignments correctly -- check that we handle xlog assignments correctly
BEGIN; BEGIN;
...@@ -167,7 +167,7 @@ RELEASE SAVEPOINT subtop; ...@@ -167,7 +167,7 @@ RELEASE SAVEPOINT subtop;
INSERT INTO tr_sub(path) VALUES ('2-top-#1'); INSERT INTO tr_sub(path) VALUES ('2-top-#1');
COMMIT; COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- make sure rollbacked subtransactions aren't decoded -- make sure rollbacked subtransactions aren't decoded
BEGIN; BEGIN;
...@@ -180,7 +180,7 @@ ROLLBACK TO SAVEPOINT b; ...@@ -180,7 +180,7 @@ ROLLBACK TO SAVEPOINT b;
INSERT INTO tr_sub(path) VALUES ('3-top-2-#2'); INSERT INTO tr_sub(path) VALUES ('3-top-2-#2');
COMMIT; COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- test whether a known, but not yet logged toplevel xact, followed by a -- test whether a known, but not yet logged toplevel xact, followed by a
-- subxact commit is handled correctly -- subxact commit is handled correctly
...@@ -199,7 +199,7 @@ INSERT INTO tr_sub(path) VALUES ('5-top-1-#1'); ...@@ -199,7 +199,7 @@ INSERT INTO tr_sub(path) VALUES ('5-top-1-#1');
COMMIT; COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/* /*
...@@ -239,7 +239,7 @@ ALTER TABLE replication_metadata SET (user_catalog_table = false); ...@@ -239,7 +239,7 @@ ALTER TABLE replication_metadata SET (user_catalog_table = false);
INSERT INTO replication_metadata(relation, options) INSERT INTO replication_metadata(relation, options)
VALUES ('zaphod', NULL); VALUES ('zaphod', NULL);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/* /*
* check whether we handle updates/deletes correct with & without a pkey * check whether we handle updates/deletes correct with & without a pkey
...@@ -315,7 +315,7 @@ UPDATE toasttable ...@@ -315,7 +315,7 @@ UPDATE toasttable
SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i))
WHERE id = 1; WHERE id = 1;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i);
...@@ -327,10 +327,10 @@ WHERE id = 1; ...@@ -327,10 +327,10 @@ WHERE id = 1;
-- make sure we decode correctly even if the toast table is gone -- make sure we decode correctly even if the toast table is gone
DROP TABLE toasttable; DROP TABLE toasttable;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- done, free logical replication slot -- done, free logical replication slot
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
......
...@@ -32,10 +32,10 @@ BEGIN; ...@@ -32,10 +32,10 @@ BEGIN;
SELECT txid_current() = 0; SELECT txid_current() = 0;
-- don't show yet, haven't committed -- don't show yet, haven't committed
INSERT INTO nobarf(data) VALUES('2'); INSERT INTO nobarf(data) VALUES('2');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT; COMMIT;
INSERT INTO nobarf(data) VALUES('3'); INSERT INTO nobarf(data) VALUES('3');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
...@@ -11,7 +11,7 @@ CREATE TABLE lr_test(data text); ...@@ -11,7 +11,7 @@ CREATE TABLE lr_test(data text);
SET ROLE lr_superuser; SET ROLE lr_superuser;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
RESET ROLE; RESET ROLE;
...@@ -19,7 +19,7 @@ RESET ROLE; ...@@ -19,7 +19,7 @@ RESET ROLE;
SET ROLE lr_replication; SET ROLE lr_replication;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
RESET ROLE; RESET ROLE;
...@@ -27,7 +27,7 @@ RESET ROLE; ...@@ -27,7 +27,7 @@ RESET ROLE;
SET ROLE lr_normal; SET ROLE lr_normal;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
INSERT INTO lr_test VALUES('lr_superuser_init'); INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
RESET ROLE; RESET ROLE;
......
...@@ -45,6 +45,6 @@ DROP TABLE test_prepared1; ...@@ -45,6 +45,6 @@ DROP TABLE test_prepared1;
DROP TABLE test_prepared2; DROP TABLE test_prepared2;
-- show results -- show results
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
...@@ -6,7 +6,7 @@ DROP TABLE IF EXISTS replication_example; ...@@ -6,7 +6,7 @@ DROP TABLE IF EXISTS replication_example;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
INSERT INTO replication_example(somedata) VALUES (1); INSERT INTO replication_example(somedata) VALUES (1);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
BEGIN; BEGIN;
INSERT INTO replication_example(somedata) VALUES (2); INSERT INTO replication_example(somedata) VALUES (2);
...@@ -56,7 +56,7 @@ COMMIT; ...@@ -56,7 +56,7 @@ COMMIT;
-- make old files go away -- make old files go away
CHECKPOINT; CHECKPOINT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
DROP TABLE IF EXISTS replication_example; DROP TABLE IF EXISTS replication_example;
...@@ -259,5 +259,5 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL; ...@@ -259,5 +259,5 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
202 untoasted199 202 untoasted199
203 untoasted200 203 untoasted200
\. \.
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0'); SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
...@@ -41,6 +41,8 @@ typedef struct ...@@ -41,6 +41,8 @@ typedef struct
MemoryContext context; MemoryContext context;
bool include_xids; bool include_xids;
bool include_timestamp; bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
} TestDecodingData; } TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
...@@ -48,6 +50,10 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions * ...@@ -48,6 +50,10 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *
static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_shutdown(LogicalDecodingContext *ctx);
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ReorderBufferTXN *txn);
static void pg_output_begin(LogicalDecodingContext *ctx,
TestDecodingData *data,
ReorderBufferTXN *txn,
bool last_write);
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn); ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_change(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx,
...@@ -82,7 +88,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -82,7 +88,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option; ListCell *option;
TestDecodingData *data; TestDecodingData *data;
data = palloc(sizeof(TestDecodingData)); data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context, data->context = AllocSetContextCreate(ctx->context,
"text conversion context", "text conversion context",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
...@@ -90,6 +96,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -90,6 +96,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
data->include_xids = true; data->include_xids = true;
data->include_timestamp = false; data->include_timestamp = false;
data->skip_empty_xacts = false;
ctx->output_plugin_private = data; ctx->output_plugin_private = data;
...@@ -137,6 +144,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -137,6 +144,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
if (force_binary) if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
} }
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else else
{ {
ereport(ERROR, ereport(ERROR,
...@@ -164,12 +182,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) ...@@ -164,12 +182,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true); data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_begin(ctx, data, txn, true);
}
static void
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "BEGIN %u", txn->xid); appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
else else
appendStringInfoString(ctx->out, "BEGIN"); appendStringInfoString(ctx->out, "BEGIN");
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, last_write);
} }
/* COMMIT callback */ /* COMMIT callback */
...@@ -179,6 +207,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -179,6 +207,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "COMMIT %u", txn->xid); appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
...@@ -338,6 +369,14 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -338,6 +369,14 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContext old; MemoryContext old;
data = ctx->output_plugin_private; data = ctx->output_plugin_private;
/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
class_form = RelationGetForm(relation); class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation); tupdesc = RelationGetDescr(relation);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment