Commit f49a80c4 authored by Alvaro Herrera's avatar Alvaro Herrera

Fix "base" snapshot handling in logical decoding

Two closely related bugs are fixed.  First, xmin of logical slots was
advanced too early.  During xl_running_xacts processing, xmin of the
slot was set to the oldest running xid in the record, but that's wrong:
actually, snapshots which will be used for not-yet-replayed transactions
might consider older txns as running too, so we need to keep xmin back
for them.  The problem wasn't noticed earlier because DDL which allows
to delete tuple (set xmax) while some another not-yet-committed
transaction looks at it is pretty rare, if not unique: e.g. all forms of
ALTER TABLE which change schema acquire ACCESS EXCLUSIVE lock
conflicting with any inserts. The included test case (test_decoding's
oldest_xmin) uses ALTER of a composite type, which doesn't have such
interlocking.

To deal with this, we must be able to quickly retrieve oldest xmin
(oldest running xid among all assigned snapshots) from ReorderBuffer. To
fix, add another list of ReorderBufferTXNs to the reorderbuffer, where
transactions are sorted by base-snapshot-LSN.  This is slightly
different from the existing (sorted by first-LSN) list, because a
transaction can have an earlier LSN but a later Xmin, if its first
record does not obtain an xmin (eg. xl_xact_assignment).  Note this new
list doesn't fully replace the existing txn list: we still need that one
to prevent WAL recycling.

The second issue concerns SnapBuilder snapshots and subtransactions.
SnapBuildDistributeNewCatalogSnapshot never assigned a snapshot to a
transaction that is known to be a subtxn, which is good in the common
case that the top-level transaction already has one (no point in doing
so), but a bug otherwise.  To fix, arrange to transfer the snapshot from
the subtxn to its top-level txn as soon as the kinship gets known.
test_decoding's snapshot_transfer verifies this.

Also, fix a minor memory leak: refcount of toplevel's old base snapshot
was not decremented when the snapshot is transferred from child.

Liberally sprinkle code comments, and rewrite a few existing ones.  This
part is my (Álvaro's) contribution to this commit, as I had to write all
those comments in order to understand the existing code and Arseny's
patch.
Reported-by: default avatarArseny Sher <a.sher@postgrespro.ru>
Diagnosed-by: default avatarArseny Sher <a.sher@postgrespro.ru>
Co-authored-by: default avatarArseny Sher <a.sher@postgrespro.ru>
Co-authored-by: default avatarÁlvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: default avatarAntonin Houska <ah@cybertec.at>
Discussion: https://postgr.es/m/87lgdyz1wj.fsf@ars-thinkpad
parent 4d54543e
...@@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install ...@@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install
$(pg_regress_installcheck) \ $(pg_regress_installcheck) \
$(REGRESSCHECKS) $(REGRESSCHECKS)
ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer
isolationcheck: | submake-isolation submake-test_decoding temp-install isolationcheck: | submake-isolation submake-test_decoding temp-install
$(pg_isolation_regress_check) \ $(pg_isolation_regress_check) \
......
Parsed test spec with 2 sessions
starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes
step s0_begin: BEGIN;
step s0_getxid: SELECT txid_current() IS NULL;
?column?
f
step s1_begin: BEGIN;
step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
step s0_commit: COMMIT;
step s0_checkpoint: CHECKPOINT;
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
step s1_commit: COMMIT;
step s0_vacuum: VACUUM FULL;
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
BEGIN
table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
COMMIT
?column?
stop
Parsed test spec with 2 sessions
starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit s0_get_changes
step s0_begin: BEGIN;
step s0_begin_sub0: SAVEPOINT s0;
step s0_log_assignment: SELECT txid_current() IS NULL;
?column?
f
step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
step s0_end_sub0: RELEASE SAVEPOINT s0;
step s0_commit: COMMIT;
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
BEGIN
table public.dummy: INSERT: i[integer]:0
table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
COMMIT
?column?
stop
starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 s0_commit s0_get_changes
step s0_begin: BEGIN;
step s0_begin_sub0: SAVEPOINT s0;
step s0_log_assignment: SELECT txid_current() IS NULL;
?column?
f
step s0_begin_sub1: SAVEPOINT s1;
step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
step s0_end_sub1: RELEASE SAVEPOINT s1;
step s0_end_sub0: RELEASE SAVEPOINT s0;
step s0_commit: COMMIT;
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
BEGIN
table public.dummy: INSERT: i[integer]:0
table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
COMMIT
?column?
stop
# Test advancement of the slot's oldest xmin
setup
{
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
DROP TYPE IF EXISTS basket;
CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
DROP TABLE IF EXISTS harvest;
CREATE TABLE harvest(fruits basket);
}
teardown
{
DROP TABLE IF EXISTS harvest;
DROP TYPE IF EXISTS basket;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
session "s0"
step "s0_begin" { BEGIN; }
step "s0_getxid" { SELECT txid_current() IS NULL; }
step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
step "s0_commit" { COMMIT; }
step "s0_checkpoint" { CHECKPOINT; }
step "s0_vacuum" { VACUUM FULL; }
step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
session "s1"
step "s1_begin" { BEGIN; }
step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
step "s1_commit" { COMMIT; }
# Checkpoint with following get_changes forces to advance xmin. ALTER of a
# composite type is a rare form of DDL which allows T1 to see the tuple which
# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
# forbid modifying catalog after someone read it (and didn't commit yet).
permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
# Test snapshot transfer from subxact to top-level and receival of later snaps.
setup
{
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
DROP TABLE IF EXISTS dummy;
CREATE TABLE dummy(i int);
DROP TABLE IF EXISTS harvest;
CREATE TABLE harvest(apples int, pears int);
}
teardown
{
DROP TABLE IF EXISTS harvest;
DROP TABLE IF EXISTS dummy;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
session "s0"
step "s0_begin" { BEGIN; }
step "s0_begin_sub0" { SAVEPOINT s0; }
step "s0_log_assignment" { SELECT txid_current() IS NULL; }
step "s0_begin_sub1" { SAVEPOINT s1; }
step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); }
step "s0_commit" { COMMIT; }
step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
session "s1"
step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
# start top-level without base snap, get base snap in subxact, then create new
# snap and make sure it is queued.
permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes"
# In previous test, we firstly associated subxact with xact and only then got
# base snap; now nest one more subxact to get snap first and only then (at
# commit) associate it with toplevel.
permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes"
...@@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); ...@@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new, TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top); XLogRecPtr lsn, bool create_as_top);
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
ReorderBufferTXN *subtxn);
static void AssertTXNLsnOrder(ReorderBuffer *rb); static void AssertTXNLsnOrder(ReorderBuffer *rb);
...@@ -271,6 +273,7 @@ ReorderBufferAllocate(void) ...@@ -271,6 +273,7 @@ ReorderBufferAllocate(void)
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->toplevel_by_lsn);
dlist_init(&buffer->txns_by_base_snapshot_lsn);
/* /*
* Ensure there's no stale data from prior uses of this slot, in case some * Ensure there's no stale data from prior uses of this slot, in case some
...@@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, ...@@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
bool found; bool found;
Assert(TransactionIdIsValid(xid)); Assert(TransactionIdIsValid(xid));
Assert(!create || lsn != InvalidXLogRecPtr);
/* /*
* Check the one-entry lookup cache first * Check the one-entry lookup cache first
...@@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, ...@@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
{ {
/* initialize the new entry, if creation was requested */ /* initialize the new entry, if creation was requested */
Assert(ent != NULL); Assert(ent != NULL);
Assert(lsn != InvalidXLogRecPtr);
ent->txn = ReorderBufferGetTXN(rb); ent->txn = ReorderBufferGetTXN(rb);
ent->txn->xid = xid; ent->txn->xid = xid;
...@@ -607,43 +610,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, ...@@ -607,43 +610,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
} }
} }
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
*
* Other LSN-related invariants are checked too.
*
* No-op if assertions are not in use.
*/
static void static void
AssertTXNLsnOrder(ReorderBuffer *rb) AssertTXNLsnOrder(ReorderBuffer *rb)
{ {
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
dlist_iter iter; dlist_iter iter;
XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
dlist_foreach(iter, &rb->toplevel_by_lsn) dlist_foreach(iter, &rb->toplevel_by_lsn)
{ {
ReorderBufferTXN *cur_txn; ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
iter.cur);
cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur); /* start LSN must be set */
Assert(cur_txn->first_lsn != InvalidXLogRecPtr); Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
/* If there is an end LSN, it must be higher than start LSN */
if (cur_txn->end_lsn != InvalidXLogRecPtr) if (cur_txn->end_lsn != InvalidXLogRecPtr)
Assert(cur_txn->first_lsn <= cur_txn->end_lsn); Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
/* Current initial LSN must be strictly higher than previous */
if (prev_first_lsn != InvalidXLogRecPtr) if (prev_first_lsn != InvalidXLogRecPtr)
Assert(prev_first_lsn < cur_txn->first_lsn); Assert(prev_first_lsn < cur_txn->first_lsn);
/* known-as-subtxn txns must not be listed */
Assert(!cur_txn->is_known_as_subxact); Assert(!cur_txn->is_known_as_subxact);
prev_first_lsn = cur_txn->first_lsn; prev_first_lsn = cur_txn->first_lsn;
} }
dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
{
ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
base_snapshot_node,
iter.cur);
/* base snapshot (and its LSN) must be set */
Assert(cur_txn->base_snapshot != NULL);
Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
/* current LSN must be strictly higher than previous */
if (prev_base_snap_lsn != InvalidXLogRecPtr)
Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
/* known-as-subtxn txns must not be listed */
Assert(!cur_txn->is_known_as_subxact);
prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
}
#endif #endif
} }
/*
* ReorderBufferGetOldestTXN
* Return oldest transaction in reorderbuffer
*/
ReorderBufferTXN * ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer *rb) ReorderBufferGetOldestTXN(ReorderBuffer *rb)
{ {
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
AssertTXNLsnOrder(rb);
if (dlist_is_empty(&rb->toplevel_by_lsn)) if (dlist_is_empty(&rb->toplevel_by_lsn))
return NULL; return NULL;
AssertTXNLsnOrder(rb);
txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
Assert(!txn->is_known_as_subxact); Assert(!txn->is_known_as_subxact);
...@@ -651,12 +691,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) ...@@ -651,12 +691,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
return txn; return txn;
} }
/*
* ReorderBufferGetOldestXmin
* Return oldest Xmin in reorderbuffer
*
* Returns oldest possibly running Xid from the point of view of snapshots
* used in the transactions kept by reorderbuffer, or InvalidTransactionId if
* there are none.
*
* Since snapshots are assigned monotonically, this equals the Xmin of the
* base snapshot with minimal base_snapshot_lsn.
*/
TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
AssertTXNLsnOrder(rb);
if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
return InvalidTransactionId;
txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
&rb->txns_by_base_snapshot_lsn);
return txn->base_snapshot->xmin;
}
void void
ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
{ {
rb->current_restart_decoding_lsn = ptr; rb->current_restart_decoding_lsn = ptr;
} }
/*
* ReorderBufferAssignChild
*
* Make note that we know that subxid is a subtransaction of xid, seen as of
* the given lsn.
*/
void void
ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr lsn) TransactionId subxid, XLogRecPtr lsn)
...@@ -669,32 +741,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, ...@@ -669,32 +741,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
if (new_sub) if (new_top && !new_sub)
elog(ERROR, "subtransaction logged without previous top-level txn record");
if (!new_sub)
{
if (subtxn->is_known_as_subxact)
{
/* already associated, nothing to do */
return;
}
else
{ {
/* /*
* we assign subtransactions to top level transaction even if we don't * We already saw this transaction, but initially added it to the list
* have data for it yet, assignment records frequently reference xids * of top-level txns. Now that we know it's not top-level, remove
* that have not yet produced any records. Knowing those aren't top * it from there.
* level xids allows us to make processing cheaper in some places.
*/ */
dlist_push_tail(&txn->subtxns, &subtxn->node); dlist_delete(&subtxn->node);
txn->nsubtxns++;
} }
else if (!subtxn->is_known_as_subxact) }
{
subtxn->is_known_as_subxact = true; subtxn->is_known_as_subxact = true;
subtxn->toplevel_xid = xid;
Assert(subtxn->nsubtxns == 0); Assert(subtxn->nsubtxns == 0);
/* remove from lsn order list of top-level transactions */ /* add to subtransaction list */
dlist_delete(&subtxn->node);
/* add to toplevel transaction */
dlist_push_tail(&txn->subtxns, &subtxn->node); dlist_push_tail(&txn->subtxns, &subtxn->node);
txn->nsubtxns++; txn->nsubtxns++;
/* Possibly transfer the subtxn's snapshot to its top-level txn. */
ReorderBufferTransferSnapToParent(txn, subtxn);
/* Verify LSN-ordering invariant */
AssertTXNLsnOrder(rb);
}
/*
* ReorderBufferTransferSnapToParent
* Transfer base snapshot from subtxn to top-level txn, if needed
*
* This is done if the top-level txn doesn't have a base snapshot, or if the
* subtxn's base snapshot has an earlier LSN than the top-level txn's base
* snapshot's LSN. This can happen if there are no changes in the toplevel
* txn but there are some in the subtxn, or the first change in subtxn has
* earlier LSN than first change in the top-level txn and we learned about
* their kinship only now.
*
* The subtransaction's snapshot is cleared regardless of the transfer
* happening, since it's not needed anymore in either case.
*
* We do this as soon as we become aware of their kinship, to avoid queueing
* extra snapshots to txns known-as-subtxns -- only top-level txns will
* receive further snapshots.
*/
static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
ReorderBufferTXN *subtxn)
{
Assert(subtxn->toplevel_xid == txn->xid);
if (subtxn->base_snapshot != NULL)
{
if (txn->base_snapshot == NULL ||
subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
{
/*
* If the toplevel transaction already has a base snapshot but
* it's newer than the subxact's, purge it.
*/
if (txn->base_snapshot != NULL)
{
SnapBuildSnapDecRefcount(txn->base_snapshot);
dlist_delete(&txn->base_snapshot_node);
} }
else if (new_top)
/*
* The snapshot is now the top transaction's; transfer it, and
* adjust the list position of the top transaction in the list by
* moving it to where the subtransaction is.
*/
txn->base_snapshot = subtxn->base_snapshot;
txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
dlist_insert_before(&subtxn->base_snapshot_node,
&txn->base_snapshot_node);
/*
* The subtransaction doesn't have a snapshot anymore (so it
* mustn't be in the list.)
*/
subtxn->base_snapshot = NULL;
subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
dlist_delete(&subtxn->base_snapshot_node);
}
else
{ {
elog(ERROR, "existing subxact assigned to unknown toplevel xact"); /* Base snap of toplevel is fine, so subxact's is not needed */
SnapBuildSnapDecRefcount(subtxn->base_snapshot);
dlist_delete(&subtxn->base_snapshot_node);
subtxn->base_snapshot = NULL;
subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
}
} }
} }
...@@ -707,7 +854,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, ...@@ -707,7 +854,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr commit_lsn, TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn) XLogRecPtr end_lsn)
{ {
ReorderBufferTXN *txn;
ReorderBufferTXN *subtxn; ReorderBufferTXN *subtxn;
subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
...@@ -719,42 +865,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, ...@@ -719,42 +865,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
if (!subtxn) if (!subtxn)
return; return;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
if (txn == NULL)
elog(ERROR, "subxact logged without previous toplevel record");
/*
* Pass our base snapshot to the parent transaction if it doesn't have
* one, or ours is older. That can happen if there are no changes in the
* toplevel transaction but in one of the child transactions. This allows
* the parent to simply use its base snapshot initially.
*/
if (subtxn->base_snapshot != NULL &&
(txn->base_snapshot == NULL ||
txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
{
txn->base_snapshot = subtxn->base_snapshot;
txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
subtxn->base_snapshot = NULL;
subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
}
subtxn->final_lsn = commit_lsn; subtxn->final_lsn = commit_lsn;
subtxn->end_lsn = end_lsn; subtxn->end_lsn = end_lsn;
if (!subtxn->is_known_as_subxact) /*
{ * Assign this subxact as a child of the toplevel xact (no-op if already
subtxn->is_known_as_subxact = true; * done.)
Assert(subtxn->nsubtxns == 0); */
ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
/* remove from lsn order list of top-level transactions */
dlist_delete(&subtxn->node);
/* add to subtransaction list */
dlist_push_tail(&txn->subtxns, &subtxn->node);
txn->nsubtxns++;
}
} }
...@@ -1078,11 +1196,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1078,11 +1196,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferReturnChange(rb, change); ReorderBufferReturnChange(rb, change);
} }
/*
* Cleanup the base snapshot, if set.
*/
if (txn->base_snapshot != NULL) if (txn->base_snapshot != NULL)
{ {
SnapBuildSnapDecRefcount(txn->base_snapshot); SnapBuildSnapDecRefcount(txn->base_snapshot);
txn->base_snapshot = NULL; dlist_delete(&txn->base_snapshot_node);
txn->base_snapshot_lsn = InvalidXLogRecPtr;
} }
/* /*
...@@ -1257,17 +1377,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) ...@@ -1257,17 +1377,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
} }
/* /*
* Perform the replay of a transaction and it's non-aborted subtransactions. * Perform the replay of a transaction and its non-aborted subtransactions.
* *
* Subtransactions previously have to be processed by * Subtransactions previously have to be processed by
* ReorderBufferCommitChild(), even if previously assigned to the toplevel * ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild. * transaction with ReorderBufferAssignChild.
* *
* We currently can only decode a transaction's contents in when their commit * We currently can only decode a transaction's contents when its commit
* record is read because that's currently the only place where we know about * record is read because that's the only place where we know about cache
* cache invalidations. Thus, once a toplevel commit is read, we iterate over * invalidations. Thus, once a toplevel commit is read, we iterate over the top
* the top and subtransactions (using a k-way merge) and replay the changes in * and subtransactions (using a k-way merge) and replay the changes in lsn
* lsn order. * order.
*/ */
void void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
...@@ -1295,10 +1415,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ...@@ -1295,10 +1415,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->origin_lsn = origin_lsn; txn->origin_lsn = origin_lsn;
/* /*
* If this transaction didn't have any real changes in our database, it's * If this transaction has no snapshot, it didn't make any changes to the
* OK not to have a snapshot. Note that ReorderBufferCommitChild will have * database, so there's nothing to decode. Note that
* transferred its snapshot to this transaction if it had one and the * ReorderBufferCommitChild will have transferred any snapshots from
* toplevel tx didn't. * subtransactions if there were any.
*/ */
if (txn->base_snapshot == NULL) if (txn->base_snapshot == NULL)
{ {
...@@ -1861,12 +1981,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, ...@@ -1861,12 +1981,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
} }
/* /*
* Setup the base snapshot of a transaction. The base snapshot is the snapshot * Set up the transaction's base snapshot.
* that is used to decode all changes until either this transaction modifies
* the catalog or another catalog modifying transaction commits.
* *
* Needs to be called before any changes are added with * If we know that xid is a subtransaction, set the base snapshot on the
* ReorderBufferQueueChange(). * top-level transaction instead.
*/ */
void void
ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
...@@ -1875,12 +1993,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, ...@@ -1875,12 +1993,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
bool is_new; bool is_new;
AssertArg(snap != NULL);
/*
* Fetch the transaction to operate on. If we know it's a subtransaction,
* operate on its top-level transaction instead.
*/
txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
if (txn->is_known_as_subxact)
txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
NULL, InvalidXLogRecPtr, false);
Assert(txn->base_snapshot == NULL); Assert(txn->base_snapshot == NULL);
Assert(snap != NULL);
txn->base_snapshot = snap; txn->base_snapshot = snap;
txn->base_snapshot_lsn = lsn; txn->base_snapshot_lsn = lsn;
dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
AssertTXNLsnOrder(rb);
} }
/* /*
...@@ -1999,25 +2128,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) ...@@ -1999,25 +2128,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
} }
/* /*
* Have we already added the first snapshot? * ReorderBufferXidHasBaseSnapshot
* Have we already set the base snapshot for the given txn/subtxn?
*/ */
bool bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
{ {
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, txn = ReorderBufferTXNByXid(rb, xid, false,
false); NULL, InvalidXLogRecPtr, false);
/* transaction isn't known yet, ergo no snapshot */ /* transaction isn't known yet, ergo no snapshot */
if (txn == NULL) if (txn == NULL)
return false; return false;
/* /* a known subtxn? operate on top-level txn instead */
* TODO: It would be a nice improvement if we would check the toplevel if (txn->is_known_as_subxact)
* transaction in subtransactions, but we'd need to keep track of a bit txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
* more state. NULL, InvalidXLogRecPtr, false);
*/
return txn->base_snapshot != NULL; return txn->base_snapshot != NULL;
} }
......
...@@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) ...@@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
* all. We'll add a snapshot when the first change gets queued. * all. We'll add a snapshot when the first change gets queued.
* *
* NB: This works correctly even for subtransactions because * NB: This works correctly even for subtransactions because
* ReorderBufferCommitChild() takes care to pass the parent the base * ReorderBufferAssignChild() takes care to transfer the base snapshot
* snapshot, and while iterating the changequeue we'll get the change * to the top-level transaction, and while iterating the changequeue
* from the subtxn. * we'll get the change from the subtxn.
*/ */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue; continue;
...@@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, ...@@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/* refcount of the snapshot builder for the new snapshot */ /* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot); SnapBuildSnapIncRefcount(builder->snapshot);
/* add a new Snapshot to all currently running transactions */ /* add a new catalog snapshot to all currently running transactions */
SnapBuildDistributeNewCatalogSnapshot(builder, lsn); SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
} }
} }
...@@ -1094,6 +1094,7 @@ void ...@@ -1094,6 +1094,7 @@ void
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{ {
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
TransactionId xmin;
/* /*
* If we're not consistent yet, inspect the record to see whether it * If we're not consistent yet, inspect the record to see whether it
...@@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact ...@@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
/* Remove transactions we don't need to keep track off anymore */ /* Remove transactions we don't need to keep track off anymore */
SnapBuildPurgeCommittedTxn(builder); SnapBuildPurgeCommittedTxn(builder);
elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
builder->xmin, builder->xmax,
running->oldestRunningXid);
/* /*
* Increase shared memory limits, so vacuum can work on tuples we * Advance the xmin limit for the current replication slot, to allow
* prevented from being pruned till now. * vacuum to clean up the tuples this slot has been protecting.
*/ *
LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); * The reorderbuffer might have an xmin among the currently running
* snapshots; use it if so. If not, we need only consider the snapshots
* we'll produce later, which can't be less than the oldest running xid in
* the record we're reading now.
*/
xmin = ReorderBufferGetOldestXmin(builder->reorder);
if (xmin == InvalidTransactionId)
xmin = running->oldestRunningXid;
elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
LogicalIncreaseXminForSlot(lsn, xmin);
/* /*
* Also tell the slot where we can restart decoding from. We don't want to * Also tell the slot where we can restart decoding from. We don't want to
......
...@@ -160,10 +160,9 @@ typedef struct ReorderBufferTXN ...@@ -160,10 +160,9 @@ typedef struct ReorderBufferTXN
/* did the TX have catalog changes */ /* did the TX have catalog changes */
bool has_catalog_changes; bool has_catalog_changes;
/* /* Do we know this is a subxact? Xid of top-level txn if so */
* Do we know this is a subxact?
*/
bool is_known_as_subxact; bool is_known_as_subxact;
TransactionId toplevel_xid;
/* /*
* LSN of the first data carrying, WAL record with knowledge about this * LSN of the first data carrying, WAL record with knowledge about this
...@@ -209,10 +208,13 @@ typedef struct ReorderBufferTXN ...@@ -209,10 +208,13 @@ typedef struct ReorderBufferTXN
TimestampTz commit_time; TimestampTz commit_time;
/* /*
* Base snapshot or NULL. * The base snapshot is used to decode all changes until either this
* transaction modifies the catalog, or another catalog-modifying
* transaction commits.
*/ */
Snapshot base_snapshot; Snapshot base_snapshot;
XLogRecPtr base_snapshot_lsn; XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
/* /*
* How many ReorderBufferChange's do we have in this txn. * How many ReorderBufferChange's do we have in this txn.
...@@ -279,7 +281,7 @@ typedef struct ReorderBufferTXN ...@@ -279,7 +281,7 @@ typedef struct ReorderBufferTXN
* Position in one of three lists: * Position in one of three lists:
* * list of subtransactions if we are *known* to be subxact * * list of subtransactions if we are *known* to be subxact
* * list of toplevel xacts (can be an as-yet unknown subxact) * * list of toplevel xacts (can be an as-yet unknown subxact)
* * list of preallocated ReorderBufferTXNs * * list of preallocated ReorderBufferTXNs (if unused)
* --- * ---
*/ */
dlist_node node; dlist_node node;
...@@ -337,6 +339,15 @@ struct ReorderBuffer ...@@ -337,6 +339,15 @@ struct ReorderBuffer
*/ */
dlist_head toplevel_by_lsn; dlist_head toplevel_by_lsn;
/*
* Transactions and subtransactions that have a base snapshot, ordered by
* LSN of the record which caused us to first obtain the base snapshot.
* This is not the same as toplevel_by_lsn, because we only set the base
* snapshot on the first logical-decoding-relevant record (eg. heap
* writes), whereas the initial LSN could be set by other operations.
*/
dlist_head txns_by_base_snapshot_lsn;
/* /*
* one-entry sized cache for by_txn. Very frequently the same txn gets * one-entry sized cache for by_txn. Very frequently the same txn gets
* looked up over and over again. * looked up over and over again.
...@@ -422,6 +433,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); ...@@ -422,6 +433,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment