Commit 89fd41b3 authored by Andres Freund's avatar Andres Freund

Fix and improve cache invalidation logic for logical decoding.

There are basically three situations in which logical decoding needs
to perform cache invalidation. During/After replaying a transaction
with catalog changes, when skipping a uninteresting transaction that
performed catalog changes and when erroring out while replaying a
transaction. Unfortunately these three cases were all done slightly
differently - partially because 8de3e410, which greatly simplifies
matters, got committed in the midst of the development of logical
decoding.

The actually problematic case was when logical decoding skipped
transaction commits (and thus processed invalidations). When used via
the SQL interface cache invalidation could access the catalog - bad,
because we didn't set up enough state to allow that correctly. It'd
not be hard to setup sufficient state, but the simpler solution is to
always perform cache invalidation outside a valid transaction.

Also make the different cache invalidation cases look as similar as
possible, to ease code review.

This fixes the assertion failure reported by Antonin Houska in
53EE02D9.7040702@gmail.com. The presented testcase has been expanded
into a regression test.

Backpatch to 9.4, where logical decoding was introduced.
parent 5a2c1840
......@@ -37,7 +37,7 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
regresscheck: all | submake-regress submake-test_decoding
$(MKDIR_P) regression_output
......
-- test that we can insert the result of a get_changes call into a
-- logged relation. That's really not a good idea in practical terms,
-- but provides a nice test.
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
-- slot works
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- create some changes
CREATE TABLE somechange(id serial primary key);
INSERT INTO somechange DEFAULT VALUES;
CREATE TABLE changeresult AS
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT * FROM changeresult;
data
------------------------------------------------
BEGIN
table public.somechange: INSERT: id[integer]:1
COMMIT
(3 rows)
INSERT INTO changeresult
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO changeresult
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT * FROM changeresult;
data
--------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.somechange: INSERT: id[integer]:1
COMMIT
BEGIN
table public.changeresult: INSERT: data[text]:'BEGIN'
table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
table public.changeresult: INSERT: data[text]:'COMMIT'
COMMIT
BEGIN
table public.changeresult: INSERT: data[text]:'BEGIN'
table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
table public.changeresult: INSERT: data[text]:'COMMIT'
COMMIT
BEGIN
table public.changeresult: INSERT: data[text]:'BEGIN'
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
table public.changeresult: INSERT: data[text]:'COMMIT'
COMMIT
(20 rows)
DROP TABLE changeresult;
DROP TABLE somechange;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.changeresult: INSERT: data[text]:'BEGIN'
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
table public.changeresult: INSERT: data[text]:'COMMIT'
table public.changeresult: INSERT: data[text]:'BEGIN'
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
table public.changeresult: INSERT: data[text]:'COMMIT'
COMMIT
(14 rows)
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)
-- test that we can insert the result of a get_changes call into a
-- logged relation. That's really not a good idea in practical terms,
-- but provides a nice test.
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-- slot works
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- create some changes
CREATE TABLE somechange(id serial primary key);
INSERT INTO somechange DEFAULT VALUES;
CREATE TABLE changeresult AS
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT * FROM changeresult;
INSERT INTO changeresult
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO changeresult
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT * FROM changeresult;
DROP TABLE changeresult;
DROP TABLE somechange;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
......@@ -1264,8 +1264,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
volatile CommandId command_id = FirstCommandId;
volatile Snapshot snapshot_now = NULL;
volatile bool txn_started = false;
volatile bool subtxn_started = false;
volatile bool using_subtxn = false;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
......@@ -1305,7 +1304,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_TRY();
{
txn_started = false;
/*
* Decoding needs access to syscaches et al., which in turn use
......@@ -1317,16 +1315,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
* When we're called via the SQL SRF there's already a transaction
* started, so start an explicit subtransaction there.
*/
if (IsTransactionOrTransactionBlock())
{
using_subtxn = IsTransactionOrTransactionBlock();
if (using_subtxn)
BeginInternalSubTransaction("replay");
subtxn_started = true;
}
else
{
StartTransactionCommand();
txn_started = true;
}
rb->begin(rb, txn);
......@@ -1489,22 +1483,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
elog(ERROR, "output plugin used XID %u",
GetCurrentTransactionId());
/* make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(rb, txn);
/* cleanup */
TeardownHistoricSnapshot(false);
/*
* Abort subtransaction or the transaction as a whole has the right
* Aborting the current (sub-)transaction as a whole has the right
* semantics. We want all locks acquired in here to be released, not
* reassigned to the parent and we do not want any database access
* have persistent effects.
*/
if (subtxn_started)
AbortCurrentTransaction();
/* make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(rb, txn);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
else if (txn_started)
AbortCurrentTransaction();
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
......@@ -1520,20 +1514,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
TeardownHistoricSnapshot(true);
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
if (subtxn_started)
RollbackAndReleaseCurrentSubTransaction();
else if (txn_started)
AbortCurrentTransaction();
/*
* Invalidations in an aborted transactions aren't allowed to do
* catalog access, so we don't need to still have the snapshot setup.
* Force cache invalidation to happen outside of a valid transaction
* to prevent catalog access as we just caught an error.
*/
AbortCurrentTransaction();
/* make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(rb, txn);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);
......@@ -1645,20 +1640,24 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
{
/* setup snapshot to perform the invalidations in */
SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
PG_TRY();
{
ReorderBufferExecuteInvalidations(rb, txn);
TeardownHistoricSnapshot(false);
}
PG_CATCH();
{
/* cleanup */
TeardownHistoricSnapshot(true);
PG_RE_THROW();
}
PG_END_TRY();
bool use_subtxn = IsTransactionOrTransactionBlock();
if (use_subtxn)
BeginInternalSubTransaction("replay");
/*
* Force invalidations to happen outside of a valid transaction - that
* way entries will just be marked as invalid without accessing the
* catalog. That's advantageous because we don't need to setup the
* full state necessary for catalog access.
*/
if (use_subtxn)
AbortCurrentTransaction();
ReorderBufferExecuteInvalidations(rb, txn);
if (use_subtxn)
RollbackAndReleaseCurrentSubTransaction();
}
else
Assert(txn->ninvalidations == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment