Commit 61633f79 authored by Andres Freund's avatar Andres Freund

Correct logical decoding restore behaviour for subtransactions.

Before initializing iteration over a subtransaction's changes, the last
few changes were not spilled to disk. That's correct if the transaction
didn't spill to disk, but otherwise... This bug can lead to missed or
misorderd subtransaction contents when they were spilled to disk.

Move spilling of the remaining in-memory changes to
ReorderBufferIterTXNInit(), where it can easily be applied to the top
transaction and, if present, subtransactions.

Since this code had too many bugs already, noticeably increase test
coverage.

Fixes: #14319
Reported-By: Huan Ruan
Discussion: <20160909012610.20024.58169@wrigleys.postgresql.org>
Backport: 9,4-, where logical decoding was added
parent d51924be
...@@ -38,7 +38,8 @@ submake-test_decoding: ...@@ -38,7 +38,8 @@ submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding $(MAKE) -C $(top_builddir)/contrib/test_decoding
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages decoding_into_rel binary prepared replorigin time messages \
spill
regresscheck: | submake-regress submake-test_decoding temp-install regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output $(MKDIR_P) regression_output
......
This diff is collapsed.
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE spill_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- spilling main xact
BEGIN;
INSERT INTO spill_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, nothing in main
BEGIN;
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, spilling main xact
BEGIN;
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, non-spilling main xact
BEGIN;
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- not-spilling subxact, spilling main xact
BEGIN;
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling main xact, spilling subxact
BEGIN;
INSERT INTO spill_test SELECT 'serialize-topbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-topbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
RELEASE SAVEPOINT s;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling main xact, not spilling subxact
BEGIN;
INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
RELEASE SAVEPOINT s;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, followed by another spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
RELEASE SAVEPOINT s2;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, followed by not spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
RELEASE SAVEPOINT s2;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- not spilling subxact, followed by spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
RELEASE SAVEPOINT s2;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, containing another spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
RELEASE SAVEPOINT s2;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- spilling subxact, containing a not spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
RELEASE SAVEPOINT s2;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- not spilling subxact, containing a spilling subxact
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
RELEASE SAVEPOINT s2;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
-- not spilling subxact, containing a spilling subxact that aborts and one that commits
BEGIN;
SAVEPOINT s1;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--1:'||g.i FROM generate_series(1, 5000) g(i);
SAVEPOINT s2;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--2:'||g.i FROM generate_series(5001, 10000) g(i);
ROLLBACK TO SAVEPOINT s2;
SAVEPOINT s3;
INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort-subbig-3:'||g.i FROM generate_series(5001, 10000) g(i);
RELEASE SAVEPOINT s1;
COMMIT;
SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
DROP TABLE spill_test;
SELECT pg_drop_replication_slot('regression_slot');
...@@ -935,8 +935,12 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -935,8 +935,12 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferChange *cur_change; ReorderBufferChange *cur_change;
if (txn->nentries != txn->nentries_mem) if (txn->nentries != txn->nentries_mem)
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
&state->entries[off].segno); &state->entries[off].segno);
}
cur_change = dlist_head_element(ReorderBufferChange, node, cur_change = dlist_head_element(ReorderBufferChange, node,
&txn->changes); &txn->changes);
...@@ -960,10 +964,13 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -960,10 +964,13 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferChange *cur_change; ReorderBufferChange *cur_change;
if (cur_txn->nentries != cur_txn->nentries_mem) if (cur_txn->nentries != cur_txn->nentries_mem)
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn, ReorderBufferRestoreChanges(rb, cur_txn,
&state->entries[off].fd, &state->entries[off].fd,
&state->entries[off].segno); &state->entries[off].segno);
}
cur_change = dlist_head_element(ReorderBufferChange, node, cur_change = dlist_head_element(ReorderBufferChange, node,
&cur_txn->changes); &cur_txn->changes);
...@@ -1367,10 +1374,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ...@@ -1367,10 +1374,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->origin_id = origin_id; txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn; txn->origin_lsn = origin_lsn;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
ReorderBufferSerializeTXN(rb, txn);
/* /*
* If this transaction didn't have any real changes in our database, it's * If this transaction didn't have any real changes in our database, it's
* OK not to have a snapshot. Note that ReorderBufferCommitChild will have * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment