Commit a271a1b5 authored by Amit Kapila's avatar Amit Kapila

Allow decoding at prepare time in ReorderBuffer.

This patch allows PREPARE-time decoding of two-phase transactions (if the
output plugin supports this capability), in which case the transactions
are replayed at PREPARE and then committed later when COMMIT PREPARED
arrives.

Now that we decode the changes before the commit, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).

We detect such failures with a special sqlerrcode
ERRCODE_TRANSACTION_ROLLBACK introduced by commit 7259736a and stop
decoding the remaining changes. Then we rollback the changes when rollback
prepared is encountered.

Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, Arseny Sher, and Dilip Kumar
Tested-by: Takamichi Osumi
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
parent ca3b3748
...@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" ...@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \ decoding_into_rel binary prepared replorigin time messages \
spill slot truncate stream stats spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
......
-- Test prepared transactions. When two-phase-commit is enabled, transactions are
-- decoded at PREPARE time rather than at COMMIT PREPARED time.
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
CREATE TABLE test_prepared1(id integer primary key);
CREATE TABLE test_prepared2(id integer primary key);
-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
BEGIN;
INSERT INTO test_prepared1 VALUES (1);
INSERT INTO test_prepared1 VALUES (2);
-- should show nothing because the xact has not been prepared yet.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
PREPARE TRANSACTION 'test_prepared#1';
-- should show both the above inserts and the PREPARE TRANSACTION.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:1
table public.test_prepared1: INSERT: id[integer]:2
PREPARE TRANSACTION 'test_prepared#1'
(4 rows)
COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:1
table public.test_prepared1: INSERT: id[integer]:2
PREPARE TRANSACTION 'test_prepared#1'
COMMIT PREPARED 'test_prepared#1'
(5 rows)
-- Test that rollback of a prepared xact is decoded.
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:3
PREPARE TRANSACTION 'test_prepared#2'
(3 rows)
ROLLBACK PREPARED 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------
ROLLBACK PREPARED 'test_prepared#2'
(1 row)
-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
BEGIN;
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (4, 'frakbar');
PREPARE TRANSACTION 'test_prepared#3';
-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
SELECT 'test_prepared_1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
relation | locktype | mode
-----------------+----------+---------------------
test_prepared_1 | relation | RowExclusiveLock
test_prepared_1 | relation | AccessExclusiveLock
(2 rows)
-- The insert should show the newly altered column but not the DDL.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
PREPARE TRANSACTION 'test_prepared#3'
(3 rows)
-- Test that we decode correctly while an uncommitted prepared xact
-- with ddl exists.
--
-- Use a separate table for the concurrent transaction because the lock from
-- the ALTER will stop us inserting into the other one.
--
INSERT INTO test_prepared2 VALUES (5);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared2: INSERT: id[integer]:5
COMMIT
(3 rows)
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
PREPARE TRANSACTION 'test_prepared#3'
COMMIT PREPARED 'test_prepared#3'
(4 rows)
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
INSERT INTO test_prepared2 VALUES (7);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:6 data[text]:null
COMMIT
BEGIN
table public.test_prepared2: INSERT: id[integer]:7
COMMIT
(6 rows)
-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
-- logical decoding.
BEGIN;
INSERT INTO test_prepared1 VALUES (8, 'othercol');
CLUSTER test_prepared1 USING test_prepared1_pkey;
INSERT INTO test_prepared1 VALUES (9, 'othercol2');
PREPARE TRANSACTION 'test_prepared_lock';
SELECT 'test_prepared1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
relation | locktype | mode
----------------+----------+---------------------
test_prepared1 | relation | RowExclusiveLock
test_prepared1 | relation | ShareLock
test_prepared1 | relation | AccessExclusiveLock
(3 rows)
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
-- call should return within a second.
SET statement_timeout = '1s';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
PREPARE TRANSACTION 'test_prepared_lock'
(4 rows)
RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
PREPARE TRANSACTION 'test_prepared_lock'
COMMIT PREPARED 'test_prepared_lock'
(5 rows)
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
BEGIN;
CREATE TABLE test_prepared_savepoint (a int);
INSERT INTO test_prepared_savepoint VALUES (1);
SAVEPOINT test_savepoint;
INSERT INTO test_prepared_savepoint VALUES (2);
ROLLBACK TO SAVEPOINT test_savepoint;
PREPARE TRANSACTION 'test_prepared_savepoint';
-- should show only 1, not 2
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
------------------------------------------------------------
BEGIN
table public.test_prepared_savepoint: INSERT: a[integer]:1
PREPARE TRANSACTION 'test_prepared_savepoint'
(3 rows)
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
------------------------------------------------------------
BEGIN
table public.test_prepared_savepoint: INSERT: a[integer]:1
PREPARE TRANSACTION 'test_prepared_savepoint'
COMMIT PREPARED 'test_prepared_savepoint'
(4 rows)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
INSERT INTO test_prepared1 VALUES (20);
PREPARE TRANSACTION 'test_prepared_nodecode';
-- should show nothing
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
COMMIT PREPARED 'test_prepared_nodecode';
-- should be decoded now
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
COMMIT
(3 rows)
-- Test 8:
-- cleanup and make sure results are also empty
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
-- show results. There should be nothing to show
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
-- Test streaming of two-phase commits
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
(24 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
PREPARE TRANSACTION 'test1'
COMMIT PREPARED 'test1'
(23 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
(1 row)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
COMMIT
(22 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
-- Test prepared transactions. When two-phase-commit is enabled, transactions are
-- decoded at PREPARE time rather than at COMMIT PREPARED time.
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE test_prepared1(id integer primary key);
CREATE TABLE test_prepared2(id integer primary key);
-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
BEGIN;
INSERT INTO test_prepared1 VALUES (1);
INSERT INTO test_prepared1 VALUES (2);
-- should show nothing because the xact has not been prepared yet.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
PREPARE TRANSACTION 'test_prepared#1';
-- should show both the above inserts and the PREPARE TRANSACTION.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that rollback of a prepared xact is decoded.
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
ROLLBACK PREPARED 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
BEGIN;
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (4, 'frakbar');
PREPARE TRANSACTION 'test_prepared#3';
-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
SELECT 'test_prepared_1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
-- The insert should show the newly altered column but not the DDL.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that we decode correctly while an uncommitted prepared xact
-- with ddl exists.
--
-- Use a separate table for the concurrent transaction because the lock from
-- the ALTER will stop us inserting into the other one.
--
INSERT INTO test_prepared2 VALUES (5);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
INSERT INTO test_prepared2 VALUES (7);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
-- logical decoding.
BEGIN;
INSERT INTO test_prepared1 VALUES (8, 'othercol');
CLUSTER test_prepared1 USING test_prepared1_pkey;
INSERT INTO test_prepared1 VALUES (9, 'othercol2');
PREPARE TRANSACTION 'test_prepared_lock';
SELECT 'test_prepared1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
-- call should return within a second.
SET statement_timeout = '1s';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
BEGIN;
CREATE TABLE test_prepared_savepoint (a int);
INSERT INTO test_prepared_savepoint VALUES (1);
SAVEPOINT test_savepoint;
INSERT INTO test_prepared_savepoint VALUES (2);
ROLLBACK TO SAVEPOINT test_savepoint;
PREPARE TRANSACTION 'test_prepared_savepoint';
-- should show only 1, not 2
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
INSERT INTO test_prepared1 VALUES (20);
PREPARE TRANSACTION 'test_prepared_nodecode';
-- should show nothing
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared_nodecode';
-- should be decoded now
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test 8:
-- cleanup and make sure results are also empty
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
-- show results. There should be nothing to show
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
-- Test streaming of two-phase commits
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
...@@ -165,7 +165,58 @@ COMMIT 693 ...@@ -165,7 +165,58 @@ COMMIT 693
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo> <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
$ pg_recvlogical -d postgres --slot=test --drop-slot $ pg_recvlogical -d postgres --slot=test --drop-slot
</programlisting> </programlisting>
</sect1>
<para>
The following example shows SQL interface that can be used to decode prepared
transactions. Before you use two-phase commit commands, you must set
<varname>max_prepared_transactions</varname> to at least 1. You must also set
the option 'two-phase-commit' to 1 while calling
<function>pg_logical_slot_get_changes</function>. Note that we will stream
the entire transaction after the commit if it is not already decoded.
</para>
<programlisting>
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('5');
postgres=*# PREPARE TRANSACTION 'test_prepared1';
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
(3 rows)
postgres=# COMMIT PREPARED 'test_prepared1';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+--------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
postgres=#-- you can also rollback a prepared transaction
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('6');
postgres=*# PREPARE TRANSACTION 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/168A180 | 530 | BEGIN 530
0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
(3 rows)
postgres=# ROLLBACK PREPARED 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+----------------------------------------------
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
(1 row)
</programlisting>
</sect1>
<sect1 id="logicaldecoding-explanation"> <sect1 id="logicaldecoding-explanation">
<title>Logical Decoding Concepts</title> <title>Logical Decoding Concepts</title>
...@@ -1126,4 +1177,55 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction ...@@ -1126,4 +1177,55 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction
</para> </para>
</sect1> </sect1>
<sect1 id="logicaldecoding-two-phase-commits">
<title>Two-phase commit support for Logical Decoding</title>
<para>
With the basic output plugin callbacks (eg., <function>begin_cb</function>,
<function>change_cb</function>, <function>commit_cb</function> and
<function>message_cb</function>) two-phase commit commands like
<command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command>
and <command>ROLLBACK PREPARED</command> are not decoded. While the
<command>PREPARE TRANSACTION</command> is ignored,
<command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command>
and <command>ROLLBACK PREPARED</command> is decoded as a
<command>ROLLBACK</command>.
</para>
<para>
To support the streaming of two-phase commands, an output plugin needs to
provide additional callbacks. There are multiple two-phase commit callbacks
that are required, (<function>begin_prepare_cb</function>,
<function>prepare_cb</function>, <function>commit_prepared_cb</function>,
<function>rollback_prepared_cb</function> and
<function>stream_prepare_cb</function>) and an optional callback
(<function>filter_prepare_cb</function>).
</para>
<para>
If the output plugin callbacks for decoding two-phase commit commands are
provided, then on <command>PREPARE TRANSACTION</command>, the changes of
that transaction are decoded, passed to the output plugin, and the
<function>prepare_cb</function> callback is invoked. This differs from the
basic decoding setup where changes are only passed to the output plugin
when a transaction is committed. The start of a prepared transaction is
indicated by the <function>begin_prepare_cb</function> callback.
</para>
<para>
When a prepared transaction is rollbacked using the
<command>ROLLBACK PREPARED</command>, then the
<function>rollback_prepared_cb</function> callback is invoked and when the
prepared transaction is committed using <command>COMMIT PREPARED</command>,
then the <function>commit_prepared_cb</function> callback is invoked.
</para>
<para>
Optionally the output plugin can specify a name pattern in the
<function>filter_prepare_cb</function> and transactions with gid containing
that name pattern will not be decoded as a two-phase commit transaction.
</para>
</sect1>
</chapter> </chapter>
...@@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf ...@@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid); xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid); xl_xact_parsed_abort *parsed, TransactionId xid,
bool two_phase);
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_prepare *parsed);
/* common function to decode tuples */ /* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf, Oid dbId,
RepOriginId origin_id);
/* /*
* Take every XLogReadRecord()ed record and perform the actions required to * Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding * decode it using the output plugin already setup in the logical decoding
...@@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_commit *xlrec; xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed; xl_xact_parsed_commit parsed;
TransactionId xid; TransactionId xid;
bool two_phase = false;
xlrec = (xl_xact_commit *) XLogRecGetData(r); xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
...@@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else else
xid = parsed.twophase_xid; xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid); /*
* We would like to process the transaction in a two-phase
* manner iff output plugin supports two-phase commits and
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_COMMIT_PREPARED)
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break; break;
} }
case XLOG_XACT_ABORT: case XLOG_XACT_ABORT:
...@@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec; xl_xact_abort *xlrec;
xl_xact_parsed_abort parsed; xl_xact_parsed_abort parsed;
TransactionId xid; TransactionId xid;
bool two_phase = false;
xlrec = (xl_xact_abort *) XLogRecGetData(r); xlrec = (xl_xact_abort *) XLogRecGetData(r);
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
...@@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else else
xid = parsed.twophase_xid; xid = parsed.twophase_xid;
DecodeAbort(ctx, buf, &parsed, xid); /*
* We would like to process the transaction in a two-phase
* manner iff output plugin supports two-phase commits and
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_ABORT_PREPARED)
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break; break;
} }
case XLOG_XACT_ASSIGNMENT: case XLOG_XACT_ASSIGNMENT:
...@@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
} }
break; break;
case XLOG_XACT_PREPARE: case XLOG_XACT_PREPARE:
{
xl_xact_parsed_prepare parsed;
xl_xact_prepare *xlrec;
/* /* ok, parse it */
* Currently decoding ignores PREPARE TRANSACTION and will just xlrec = (xl_xact_prepare *) XLogRecGetData(r);
* decode the transaction when the COMMIT PREPARED is sent or ParsePrepareRecord(XLogRecGetInfo(buf->record),
* throw away the transaction's contents when a ROLLBACK PREPARED xlrec, &parsed);
* is received. In the future we could add code to expose prepared
* transactions in the changestream allowing for a kind of /*
* distributed 2PC. * We would like to process the transaction in a two-phase
*/ * manner iff output plugin supports two-phase commits and
ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); * doesn't filter the transaction at prepare time.
break; */
if (FilterPrepare(ctx, parsed.twophase_gid))
{
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
buf->origptr);
break;
}
DecodePrepare(ctx, buf, &parsed);
break;
}
default: default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
} }
...@@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
} }
} }
/*
* Ask output plugin whether we want to skip this PREPARE and send
* this transaction as a regular commit later.
*/
static inline bool
FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
{
/*
* Skip if decoding of two-phase transactions at PREPARE time is not
* enabled. In that case, all two-phase transactions are considered
* filtered out and will be applied as regular transactions at COMMIT
* PREPARED.
*/
if (!ctx->twophase)
return true;
/*
* The filter_prepare callback is optional. When not supplied, all
* prepared transactions should go through.
*/
if (ctx->callbacks.filter_prepare_cb == NULL)
return false;
return filter_prepare_cb_wrapper(ctx, gid);
}
static inline bool static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{ {
...@@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* /*
* Consolidated commit record handling between the different form of commit * Consolidated commit record handling between the different form of commit
* records. * records.
*
* 'two_phase' indicates that caller wants to process the transaction in two
* phases, first process prepare if not already done and then process
* commit_prepared.
*/ */
static void static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid) xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase)
{ {
XLogRecPtr origin_lsn = InvalidXLogRecPtr; XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz commit_time = parsed->xact_time; TimestampTz commit_time = parsed->xact_time;
...@@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions * the reorderbuffer to forget the content of the (sub-)transactions
* if not. * if not.
* *
* There can be several reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
* LSN. This can happen because we previously decoded it and now just
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
* 4) We are doing fast-forwarding
*
* We can't just use ReorderBufferAbort() here, because we need to execute * We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if * the transaction's invalidations. This currently won't be needed if
* we're just skipping over the transaction because currently we only do * we're just skipping over the transaction because currently we only do
...@@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* relevant syscaches. * relevant syscaches.
* --- * ---
*/ */
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id))
{ {
for (i = 0; i < parsed->nsubxacts; i++) for (i = 0; i < parsed->nsubxacts; i++)
{ {
...@@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr); buf->origptr, buf->endptr);
} }
/*
* Send the final commit record if the transaction data is already
* decoded, otherwise, process the entire transaction.
*/
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
else
{
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time, origin_id, origin_lsn);
}
/*
* Update the decoding stats at transaction prepare/commit/abort. It is
* not clear that sending more or less frequently than this would be
* better.
*/
UpdateDecodingStats(ctx);
}
/*
* Decode PREPARE record. Similar logic as in DecodeCommit.
*
* Note that we don't skip prepare even if have detected concurrent abort
* because it is quite possible that we had already sent some changes before we
* detect abort in which case we need to abort those changes in the subscriber.
* To abort such changes, we do send the prepare and then the rollback prepared
* which is what happened on the publisher-side as well. Now, we can invent a
* new abort API wherein in such cases we send abort and skip sending prepared
* and rollback prepared but then it is not that straightforward because we
* might have streamed this transaction by that time in which case it is
* handled when the rollback is encountered. It is not impossible to optimize
* the concurrent abort case but it can introduce design complexity w.r.t
* handling different cases so leaving it for now as it doesn't seem worth it.
*/
static void
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_prepare *parsed)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogRecPtr origin_lsn = parsed->origin_lsn;
TimestampTz prepare_time = parsed->xact_time;
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
int i;
TransactionId xid = parsed->twophase_xid;
if (parsed->origin_timestamp != 0)
prepare_time = parsed->origin_timestamp;
/*
* Remember the prepare info for a txn so that it can be used later in
* commit prepared if required. See ReorderBufferFinishPrepared.
*/
if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
buf->endptr, prepare_time, origin_id,
origin_lsn))
return;
/* We can't start streaming unless a consistent state is reached. */
if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
{
ReorderBufferSkipPrepare(ctx->reorder, xid);
return;
}
/*
* Check whether we need to process this transaction. See
* DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
* transaction.
*
* We can't call ReorderBufferForget as we did in DecodeCommit as the txn
* hasn't yet been committed, removing this txn before a commit might
* result in the computation of an incorrect restart_lsn. See
* SnapBuildProcessRunningXacts. But we need to process cache
* invalidations if there are any for the reasons mentioned in
* DecodeCommit.
*/
if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
{
ReorderBufferSkipPrepare(ctx->reorder, xid);
ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
return;
}
/* Tell the reorderbuffer about the surviving subtransactions. */
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
}
/* replay actions of all transaction + subtransactions in order */ /* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
commit_time, origin_id, origin_lsn);
/* /*
* Update the decoding stats at transaction commit/abort. It is not clear * Update the decoding stats at transaction prepare/commit/abort. It is
* that sending more or less frequently than this would be better. * not clear that sending more or less frequently than this would be
* better.
*/ */
UpdateDecodingStats(ctx); UpdateDecodingStats(ctx);
} }
/* /*
* Get the data from the various forms of abort records and pass it on to * Get the data from the various forms of abort records and pass it on to
* snapbuild.c and reorderbuffer.c * snapbuild.c and reorderbuffer.c.
*
* 'two_phase' indicates to finish prepared transaction.
*/ */
static void static void
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid) xl_xact_parsed_abort *parsed, TransactionId xid,
bool two_phase)
{ {
int i; int i;
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz abort_time = parsed->xact_time;
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
bool skip_xact;
for (i = 0; i < parsed->nsubxacts; i++) if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
origin_lsn = parsed->origin_lsn;
abort_time = parsed->origin_timestamp;
}
/*
* Check whether we need to process this transaction. See
* DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
* transaction.
*/
skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
/*
* Send the final rollback record for a prepared transaction unless we
* need to skip it. For non-two-phase xacts, simply forget the xact.
*/
if (two_phase && !skip_xact)
{ {
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
buf->record->EndRecPtr); abort_time, origin_id, origin_lsn,
parsed->twophase_gid, false);
} }
else
{
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
buf->record->EndRecPtr);
}
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/* update the decoding stats */ /* update the decoding stats */
UpdateDecodingStats(ctx); UpdateDecodingStats(ctx);
...@@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) ...@@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
header->t_infomask2 = xlhdr.t_infomask2; header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff; header->t_hoff = xlhdr.t_hoff;
} }
/*
* Check whether we are interested in this specific transaction.
*
* There can be several reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
* LSN. This can happen because we previously decoded it and now just
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
* 4) We are doing fast-forwarding
*/
static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
...@@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) ...@@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
Assert(!ctx->fast_forward); Assert(!ctx->fast_forward);
/*
* Skip if decoding of two-phase transactions at PREPARE time is not
* enabled. In that case, all two-phase transactions are considered
* filtered out and will be applied as regular transactions at COMMIT
* PREPARED.
*/
if (!ctx->twophase)
return true;
/* Push callback + info on the error context stack */ /* Push callback + info on the error context stack */
state.ctx = ctx; state.ctx = ctx;
state.callback_name = "filter_prepare"; state.callback_name = "filter_prepare";
......
...@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn ...@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change); char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno); TransactionId xid, XLogSegNo segno);
...@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* free data that's contained */ /* free data that's contained */
if (txn->gid != NULL)
{
pfree(txn->gid);
txn->gid = NULL;
}
if (txn->tuplecid_hash != NULL) if (txn->tuplecid_hash != NULL)
{ {
hash_destroy(txn->tuplecid_hash); hash_destroy(txn->tuplecid_hash);
...@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
} }
/* /*
* Discard changes from a transaction (and subtransactions), after streaming * Discard changes from a transaction (and subtransactions), either after
* them. Keep the remaining info - transactions, tuplecids, invalidations and * streaming or decoding them at PREPARE. Keep the remaining info -
* snapshots. * transactions, tuplecids, invalidations and snapshots.
*
* We additionaly remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
*
* 'txn_prepared' indicates that we have decoded the transaction at prepare
* time.
*/ */
static void static void
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{ {
dlist_mutable_iter iter; dlist_mutable_iter iter;
...@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(rbtxn_is_known_subxact(subtxn)); Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0); Assert(subtxn->nsubtxns == 0);
ReorderBufferTruncateTXN(rb, subtxn); ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
} }
/* cleanup changes in the txn */ /* cleanup changes in the txn */
...@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* about the toplevel xact (we send the XID in all messages), but we never * about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts. * stream XIDs of empty subxacts.
*/ */
if ((!txn->toptxn) || (txn->nentries_mem != 0)) if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED; txn->txn_flags |= RBTXN_IS_STREAMED;
if (txn_prepared)
{
/*
* If this is a prepared txn, cleanup the tuplecids we stored for
* decoding catalog snapshot access. They are always stored in the
* toplevel transaction.
*/
dlist_foreach_modify(iter, &txn->tuplecids)
{
ReorderBufferChange *change;
change = dlist_container(ReorderBufferChange, node, iter.cur);
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
/* Remove the change from its containing list. */
dlist_delete(&change->node);
ReorderBufferReturnChange(rb, change, true);
}
}
/* /*
* Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
* memory. We could also keep the hash table and update it with new ctid * memory. We could also keep the hash table and update it with new ctid
...@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) ...@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
} }
/* /*
* If the transaction was (partially) streamed, we need to commit it in a * If the transaction was (partially) streamed, we need to prepare or commit
* 'streamed' way. That is, we first stream the remaining part of the * it in a 'streamed' way. That is, we first stream the remaining part of the
* transaction, and then invoke stream_commit message. * transaction, and then invoke stream_prepare or stream_commit message as per
* the case.
*/ */
static void static void
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
...@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn); ReorderBufferStreamTXN(rb, txn);
rb->stream_commit(rb, txn, txn->final_lsn); if (rbtxn_prepared(txn))
{
/*
* Note, we send stream prepare even if a concurrent abort is
* detected. See DecodePrepare for more information.
*/
rb->stream_prepare(rb, txn, txn->final_lsn);
ReorderBufferCleanupTXN(rb, txn); /*
* This is a PREPARED transaction, part of a two-phase commit. The
* full cleanup will happen as part of the COMMIT PREPAREDs, so now
* just truncate txn by removing changes and tuple_cids.
*/
ReorderBufferTruncateTXN(rb, txn, true);
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
else
{
rb->stream_commit(rb, txn, txn->final_lsn);
ReorderBufferCleanupTXN(rb, txn);
}
} }
/* /*
* Set xid to detect concurrent aborts. * Set xid to detect concurrent aborts.
* *
* While streaming an in-progress transaction there is a possibility that the * While streaming an in-progress transaction or decoding a prepared
* (sub)transaction might get aborted concurrently. In such case if the * transaction there is a possibility that the (sub)transaction might get
* (sub)transaction has catalog update then we might decode the tuple using * aborted concurrently. In such case if the (sub)transaction has catalog
* wrong catalog version. For example, suppose there is one catalog tuple with * update then we might decode the tuple using wrong catalog version. For
* (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
* and after that we will have two tuples (xmin: 500, xmax: 501) and * the transaction 501 updates the catalog tuple and after that we will have
* (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
* say 502 updates the same catalog tuple then the first tuple will be changed * aborted and some other transaction say 502 updates the same catalog tuple
* to (xmin: 500, xmax: 502). So, the problem is that when we try to decode * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
* the tuple inserted/updated in 501 after the catalog update, we will see the * problem is that when we try to decode the tuple inserted/updated in 501
* catalog tuple with (xmin: 500, xmax: 502) as visible because it will * after the catalog update, we will see the catalog tuple with (xmin: 500,
* consider that the tuple is deleted by xid 502 which is not visible to our * xmax: 502) as visible because it will consider that the tuple is deleted by
* snapshot. And when we will try to decode with that catalog tuple, it can * xid 502 which is not visible to our snapshot. And when we will try to
* lead to a wrong result or a crash. So, it is necessary to detect * decode with that catalog tuple, it can lead to a wrong result or a crash.
* concurrent aborts to allow streaming of in-progress transactions. * So, it is necessary to detect concurrent aborts to allow streaming of
* in-progress transactions or decoding of prepared transactions.
* *
* For detecting the concurrent abort we set CheckXidAlive to the current * For detecting the concurrent abort we set CheckXidAlive to the current
* (sub)transaction's xid for which this change belongs to. And, during * (sub)transaction's xid for which this change belongs to. And, during
...@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* and discard the already streamed changes on such an error. We might have * and discard the already streamed changes on such an error. We might have
* already streamed some of the changes for the aborted (sub)transaction, but * already streamed some of the changes for the aborted (sub)transaction, but
* that is fine because when we decode the abort we will stream abort message * that is fine because when we decode the abort we will stream abort message
* to truncate the changes in the subscriber. * to truncate the changes in the subscriber. Similarly, for prepared
* transactions, we stop decoding if concurrent abort is detected and then
* rollback the changes when rollback prepared is encountered. See
* DecodePreare.
*/ */
static inline void static inline void
SetupCheckXidLive(TransactionId xid) SetupCheckXidLive(TransactionId xid)
...@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert) ReorderBufferChange *specinsert)
{ {
/* Discard the changes that we just streamed */ /* Discard the changes that we just streamed */
ReorderBufferTruncateTXN(rb, txn); ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Free all resources allocated for toast reconstruction */ /* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn); ReorderBufferToastReset(rb, txn);
...@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = NULL; specinsert = NULL;
} }
/* Stop the stream. */ /*
rb->stream_stop(rb, txn, last_lsn); * For the streaming case, stop the stream and remember the command ID and
* snapshot for the streaming run.
/* Remember the command ID and snapshot for the streaming run */ */
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); if (rbtxn_is_streamed(txn))
{
rb->stream_stop(rb, txn, last_lsn);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
}
} }
/* /*
* Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
* *
* Send data of a transaction (and its subtransactions) to the * Send data of a transaction (and its subtransactions) to the
* output plugin. We iterate over the top and subtransactions (using a k-way * output plugin. We iterate over the top and subtransactions (using a k-way
...@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
else else
StartTransactionCommand(); StartTransactionCommand();
/* We only need to send begin/commit for non-streamed transactions. */ /*
* We only need to send begin/begin-prepare for non-streamed
* transactions.
*/
if (!streaming) if (!streaming)
rb->begin(rb, txn); {
if (rbtxn_prepared(txn))
rb->begin_prepare(rb, txn);
else
rb->begin(rb, txn);
}
ReorderBufferIterTXNInit(rb, txn, &iterstate); ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
...@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
prev_lsn = change->lsn; prev_lsn = change->lsn;
/* Set the current xid to detect concurrent aborts. */ /*
if (streaming) * Set the current xid to detect concurrent aborts. This is
* required for the cases when we decode the changes before the
* COMMIT record is processed.
*/
if (streaming || rbtxn_prepared(change->txn))
{ {
curtxn = change->txn; curtxn = change->txn;
SetupCheckXidLive(curtxn->xid); SetupCheckXidLive(curtxn->xid);
...@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
} }
} }
else else
rb->commit(rb, txn, commit_lsn); {
/*
* Call either PREPARE (for two-phase transactions) or COMMIT (for
* regular ones).
*/
if (rbtxn_prepared(txn))
rb->prepare(rb, txn, commit_lsn);
else
rb->commit(rb, txn, commit_lsn);
}
/* this is just a sanity check against bad output plugin behaviour */ /* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
...@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
RollbackAndReleaseCurrentSubTransaction(); RollbackAndReleaseCurrentSubTransaction();
/* /*
* If we are streaming the in-progress transaction then discard the * We are here due to one of the four reasons: 1. Decoding an
* changes that we just streamed, and mark the transactions as * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
* streamed (if they contained changes). Otherwise, remove all the * prepared txn that was (partially) streamed. 4. Decoding a committed
* changes and deallocate the ReorderBufferTXN. * txn.
*
* For 1, we allow truncation of txn data by removing the changes
* already streamed but still keeping other things like invalidations,
* snapshot, and tuplecids. For 2 and 3, we indicate
* ReorderBufferTruncateTXN to do more elaborate truncation of txn
* data as the entire transaction has been decoded except for commit.
* For 4, as the entire txn has been decoded, we can fully clean up
* the TXN reorder buffer.
*/ */
if (streaming) if (streaming || rbtxn_prepared(txn))
{ {
ReorderBufferTruncateTXN(rb, txn); ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */ /* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId; CheckXidAlive = InvalidTransactionId;
} }
...@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* /*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
* abort of the (sub)transaction we are streaming. We need to do the * abort of the (sub)transaction we are streaming or preparing. We
* cleanup and return gracefully on this error, see SetupCheckXidLive. * need to do the cleanup and return gracefully on this error, see
* SetupCheckXidLive.
*/ */
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
{ {
/* /*
* This error can only occur when we are sending the data in * This error can occur either when we are sending the data in
* streaming mode and the streaming is not finished yet. * streaming mode and the streaming is not finished yet or when we
* are sending the data out on a PREPARE during a two-phase
* commit.
*/ */
Assert(streaming); Assert(streaming || rbtxn_prepared(txn));
Assert(stream_started); Assert(stream_started || rbtxn_prepared(txn));
/* Cleanup the temporary error state. */ /* Cleanup the temporary error state. */
FlushErrorState(); FlushErrorState();
...@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* ReorderBufferCommitChild(), even if previously assigned to the toplevel * ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild. * transaction with ReorderBufferAssignChild.
* *
* This interface is called once a toplevel commit is read for both streamed * This interface is called once a prepare or toplevel commit is read for both
* as well as non-streamed transactions. * streamed as well as non-streamed transactions.
*/ */
void static void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferReplay(ReorderBufferTXN *txn,
ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn) RepOriginId origin_id, XLogRecPtr origin_lsn)
{ {
ReorderBufferTXN *txn;
Snapshot snapshot_now; Snapshot snapshot_now;
CommandId command_id = FirstCommandId; CommandId command_id = FirstCommandId;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
/* unknown transaction, nothing to replay */
if (txn == NULL)
return;
txn->final_lsn = commit_lsn; txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn; txn->end_lsn = end_lsn;
txn->commit_time = commit_time; txn->commit_time = commit_time;
...@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ...@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (txn->base_snapshot == NULL) if (txn->base_snapshot == NULL)
{ {
Assert(txn->ninvalidations == 0); Assert(txn->ninvalidations == 0);
ReorderBufferCleanupTXN(rb, txn);
/*
* Removing this txn before a commit might result in the computation
* of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
*/
if (!rbtxn_prepared(txn))
ReorderBufferCleanupTXN(rb, txn);
return; return;
} }
...@@ -2474,6 +2569,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ...@@ -2474,6 +2569,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
command_id, false); command_id, false);
} }
/*
* Commit a transaction.
*
* See comments for ReorderBufferReplay().
*/
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
/* unknown transaction, nothing to replay */
if (txn == NULL)
return;
ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
origin_id, origin_lsn);
}
/*
* Record the prepare information for a transaction.
*/
bool
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
TimestampTz prepare_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
/* unknown transaction, nothing to do */
if (txn == NULL)
return false;
/*
* Remember the prepare information to be later used by commit prepared in
* case we skip doing prepare.
*/
txn->final_lsn = prepare_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = prepare_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
return true;
}
/* Remember that we have skipped prepare */
void
ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
/* unknown transaction, nothing to do */
if (txn == NULL)
return;
txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
}
/*
* Prepare a two-phase transaction.
*
* See comments for ReorderBufferReplay().
*/
void
ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
char *gid)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
/* unknown transaction, nothing to replay */
if (txn == NULL)
return;
txn->txn_flags |= RBTXN_PREPARE;
txn->gid = pstrdup(gid);
/* The prepare info must have been updated in txn by now. */
Assert(txn->final_lsn != InvalidXLogRecPtr);
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->commit_time, txn->origin_id, txn->origin_lsn);
}
/*
* This is used to handle COMMIT/ROLLBACK PREPARED.
*/
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
ReorderBufferTXN *txn;
XLogRecPtr prepare_end_lsn;
TimestampTz prepare_time;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);
/* unknown transaction, nothing to do */
if (txn == NULL)
return;
/*
* By this time the txn has the prepare record information, remember it to
* be later used for rollback.
*/
prepare_end_lsn = txn->end_lsn;
prepare_time = txn->commit_time;
/* add the gid in the txn */
txn->gid = pstrdup(gid);
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
* was decoded earlier but we have restarted. We can't distinguish between
* those two cases so we send the prepare in both the cases and let
* downstream decide whether to process or skip it. We don't need to
* decode the xact for aborts if it is not done already.
*/
if (!rbtxn_prepared(txn) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
/*
* The prepare info must have been updated in txn even if we skip
* prepare.
*/
Assert(txn->final_lsn != InvalidXLogRecPtr);
/*
* By this time the txn has the prepare record information and it is
* important to use that so that downstream gets the accurate
* information. If instead, we have passed commit information here
* then downstream can behave as it has already replayed commit
* prepared after the restart.
*/
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->commit_time, txn->origin_id, txn->origin_lsn);
}
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
if (is_commit)
rb->commit_prepared(rb, txn, commit_lsn);
else
rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
/* cleanup: make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(txn->ninvalidations,
txn->invalidations);
ReorderBufferCleanupTXN(rb, txn);
}
/* /*
* Abort a transaction that possibly has previous changes. Needs to be first * Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid. * called for subtransactions and then for the toplevel xid.
...@@ -2605,6 +2872,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) ...@@ -2605,6 +2872,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferCleanupTXN(rb, txn); ReorderBufferCleanupTXN(rb, txn);
} }
/*
* Invalidate cache for those transactions that need to be skipped just in case
* catalogs were manipulated as part of the transaction.
*
* Note that this is a special-purpose function for prepared transactions where
* we don't want to clean up the TXN even when we decide to skip it. See
* DecodePrepare.
*/
void
ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
/* unknown, nothing to do */
if (txn == NULL)
return;
/*
* Process cache invalidation messages if there are any. Even if we're not
* interested in the transaction's contents, it could have manipulated the
* catalog and we need to update the caches according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
txn->invalidations);
else
Assert(txn->ninvalidations == 0);
}
/* /*
* Execute invalidations happening outside the context of a decoded * Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits * transaction. That currently happens either for xid-less commits
......
...@@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) ...@@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue; continue;
/*
* We don't need to add snapshot to prepared transactions as they
* should not see the new catalog contents.
*/
if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
continue;
elog(DEBUG2, "adding a new snapshot to %u at %X/%X", elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
txn->xid, (uint32) (lsn >> 32), (uint32) lsn); txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
......
...@@ -174,6 +174,8 @@ typedef struct ReorderBufferChange ...@@ -174,6 +174,8 @@ typedef struct ReorderBufferChange
#define RBTXN_IS_STREAMED 0x0010 #define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_TOAST_INSERT 0x0020 #define RBTXN_HAS_TOAST_INSERT 0x0020
#define RBTXN_HAS_SPEC_INSERT 0x0040 #define RBTXN_HAS_SPEC_INSERT 0x0040
#define RBTXN_PREPARE 0x0080
#define RBTXN_SKIPPED_PREPARE 0x0100
/* Does the transaction have catalog changes? */ /* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \ #define rbtxn_has_catalog_changes(txn) \
...@@ -233,6 +235,18 @@ typedef struct ReorderBufferChange ...@@ -233,6 +235,18 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
) )
/* Has this transaction been prepared? */
#define rbtxn_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
/* prepare for this transaction skipped? */
#define rbtxn_skip_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
typedef struct ReorderBufferTXN typedef struct ReorderBufferTXN
{ {
/* See above */ /* See above */
...@@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN ...@@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN
XLogRecPtr first_lsn; XLogRecPtr first_lsn;
/* ---- /* ----
* LSN of the record that lead to this xact to be committed or * LSN of the record that lead to this xact to be prepared or committed or
* aborted. This can be a * aborted. This can be a
* * plain commit record * * plain commit record
* * plain commit record, of a parent transaction * * plain commit record, of a parent transaction
* * prepared tansaction
* * prepared transaction commit * * prepared transaction commit
* * plain abort record * * plain abort record
* * prepared transaction abort * * prepared transaction abort
...@@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN ...@@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN
XLogRecPtr origin_lsn; XLogRecPtr origin_lsn;
/* /*
* Commit time, only known when we read the actual commit record. * Commit or Prepare time, only known when we read the actual commit or
* prepare record.
*/ */
TimestampTz commit_time; TimestampTz commit_time;
...@@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho ...@@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
void ReorderBufferCommit(ReorderBuffer *, TransactionId, void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn); XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
...@@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l ...@@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
SharedInvalidationMessage *invalidations); SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
TimestampTz prepare_time,
RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment