Commit 8bdb1332 authored by Amit Kapila's avatar Amit Kapila

Avoid repeated decoding of prepared transactions after a restart.

In commit a271a1b5, we allowed decoding at prepare time and the prepare
was decoded again if there is a restart after decoding it. It was done
that way because we can't distinguish between the cases where we have not
decoded the prepare because it was prior to consistent snapshot or we have
decoded it earlier but restarted. To distinguish between these two cases,
we have introduced an initial_consistent_point at the slot level which is
an LSN at which we found a consistent point at the time of slot creation.
This is also the point where we have exported a snapshot for the initial
copy. So, prepare transaction prior to this point are sent along with
commit prepared.

This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. It
will break existing slots which is fine in a major release.

Author: Ajin Cherian, based on idea by Andres Freund
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
parent 6230912f
...@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two ...@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#1'; COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data data
---------------------------------------------------- -----------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:1
table public.test_prepared1: INSERT: id[integer]:2
PREPARE TRANSACTION 'test_prepared#1'
COMMIT PREPARED 'test_prepared#1' COMMIT PREPARED 'test_prepared#1'
(5 rows) (1 row)
-- Test that rollback of a prepared xact is decoded. -- Test that rollback of a prepared xact is decoded.
BEGIN; BEGIN;
...@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two ...@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#3'; COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data data
------------------------------------------------------------------------- -----------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
PREPARE TRANSACTION 'test_prepared#3'
COMMIT PREPARED 'test_prepared#3' COMMIT PREPARED 'test_prepared#3'
(4 rows) (1 row)
-- make sure stuff still works -- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6); INSERT INTO test_prepared1 VALUES (6);
...@@ -158,14 +151,10 @@ RESET statement_timeout; ...@@ -158,14 +151,10 @@ RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock'; COMMIT PREPARED 'test_prepared_lock';
-- consume the commit -- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data data
--------------------------------------------------------------------------- --------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
PREPARE TRANSACTION 'test_prepared_lock'
COMMIT PREPARED 'test_prepared_lock' COMMIT PREPARED 'test_prepared_lock'
(5 rows) (1 row)
-- Test savepoints and sub-xacts. Creating savepoints will create -- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly. -- sub-xacts implicitly.
...@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two ...@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared_savepoint'; COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit -- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
data data
------------------------------------------------------------ -------------------------------------------
BEGIN
table public.test_prepared_savepoint: INSERT: a[integer]:1
PREPARE TRANSACTION 'test_prepared_savepoint'
COMMIT PREPARED 'test_prepared_savepoint' COMMIT PREPARED 'test_prepared_savepoint'
(4 rows) (1 row)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time. -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN; BEGIN;
......
...@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two- ...@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
COMMIT PREPARED 'test1'; COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction --should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data data
------------------------------------------------------------- -------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
PREPARE TRANSACTION 'test1'
COMMIT PREPARED 'test1' COMMIT PREPARED 'test1'
(23 rows) (1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time. -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
......
...@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1'; ...@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data lsn | xid | data
-----------+-----+-------------------------------------------- -----------+-----+--------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row) (4 row)
...@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx ...@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
<parameter>gid</parameter> field, which is part of the <parameter>gid</parameter> field, which is part of the
<parameter>txn</parameter> parameter, can be used in this callback to <parameter>txn</parameter> parameter, can be used in this callback to
check if the plugin has already received this <command>PREPARE</command> check if the plugin has already received this <command>PREPARE</command>
in which case it can skip the remaining changes of the transaction. in which case it can either error out or skip the remaining changes of
This can only happen if the user restarts the decoding after receiving the transaction.
the <command>PREPARE</command> for a transaction but before receiving
the <command>COMMIT PREPARED</command>, say because of some error.
<programlisting> <programlisting>
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ReorderBufferTXN *txn);
......
...@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase) if (two_phase)
{ {
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn, commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true); parsed->twophase_gid, true);
} }
...@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{ {
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
abort_time, origin_id, origin_lsn, abort_time, origin_id, origin_lsn,
InvalidXLogRecPtr,
parsed->twophase_gid, false); parsed->twophase_gid, false);
} }
else else
......
...@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate(); ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder = ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
need_full_snapshot); need_full_snapshot, slot->data.initial_consistent_point);
ctx->reorder->private_data = ctx; ctx->reorder->private_data = ctx;
...@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) ...@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex); SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr; slot->data.confirmed_flush = ctx->reader->EndRecPtr;
slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex); SpinLockRelease(&slot->mutex);
} }
......
...@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, ...@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr initial_consistent_point,
TimestampTz commit_time, RepOriginId origin_id, TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit) XLogRecPtr origin_lsn, char *gid, bool is_commit)
{ {
...@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, ...@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
/* /*
* It is possible that this transaction is not decoded at prepare time * It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it * either because by that time we didn't have a consistent snapshot or it
* was decoded earlier but we have restarted. We can't distinguish between * was decoded earlier but we have restarted. We only need to send the
* those two cases so we send the prepare in both the cases and let * prepare if it was not decoded earlier. We don't need to decode the xact
* downstream decide whether to process or skip it. We don't need to * for aborts if it is not done already.
* decode the xact for aborts if it is not done already.
*/ */
if (!rbtxn_prepared(txn) && is_commit) if ((txn->final_lsn < initial_consistent_point) && is_commit)
{ {
txn->txn_flags |= RBTXN_PREPARE; txn->txn_flags |= RBTXN_PREPARE;
......
...@@ -164,6 +164,17 @@ struct SnapBuild ...@@ -164,6 +164,17 @@ struct SnapBuild
*/ */
XLogRecPtr start_decoding_at; XLogRecPtr start_decoding_at;
/*
* LSN at which we found a consistent point at the time of slot creation.
* This is also the point where we have exported a snapshot for the
* initial copy.
*
* The prepared transactions that are not covered by initial snapshot
* needs to be sent later along with commit prepared and they must be
* before this point.
*/
XLogRecPtr initial_consistent_point;
/* /*
* Don't start decoding WAL until the "xl_running_xacts" information * Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with an xid smaller than this. * indicates there are no running xids with an xid smaller than this.
...@@ -269,7 +280,8 @@ SnapBuild * ...@@ -269,7 +280,8 @@ SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer *reorder, AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon, TransactionId xmin_horizon,
XLogRecPtr start_lsn, XLogRecPtr start_lsn,
bool need_full_snapshot) bool need_full_snapshot,
XLogRecPtr initial_consistent_point)
{ {
MemoryContext context; MemoryContext context;
MemoryContext oldcontext; MemoryContext oldcontext;
...@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, ...@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon; builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn; builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot; builder->building_full_snapshot = need_full_snapshot;
builder->initial_consistent_point = initial_consistent_point;
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
...@@ -356,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder) ...@@ -356,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder)
return builder->state; return builder->state;
} }
/*
* Return the LSN at which the snapshot was exported
*/
XLogRecPtr
SnapBuildInitialConsistentPoint(SnapBuild *builder)
{
return builder->initial_consistent_point;
}
/* /*
* Should the contents of transaction ending at 'ptr' be decoded? * Should the contents of transaction ending at 'ptr' be decoded?
*/ */
...@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk ...@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version) offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001 #define SNAPBUILD_MAGIC 0x51A1E001
#define SNAPBUILD_VERSION 3 #define SNAPBUILD_VERSION 4
/* /*
* Store/Load a snapshot from disk, depending on the snapshot builder's state. * Store/Load a snapshot from disk, depending on the snapshot builder's state.
......
...@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, ...@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr initial_consistent_point,
TimestampTz commit_time, TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn, RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit); char *gid, bool is_commit);
......
...@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData ...@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
*/ */
XLogRecPtr confirmed_flush; XLogRecPtr confirmed_flush;
/*
* LSN at which we found a consistent point at the time of slot creation.
* This is also the point where we have exported a snapshot for the
* initial copy.
*/
XLogRecPtr initial_consistent_point;
/* plugin name */ /* plugin name */
NameData plugin; NameData plugin;
} ReplicationSlotPersistentData; } ReplicationSlotPersistentData;
......
...@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void); ...@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn, TransactionId xmin_horizon, XLogRecPtr start_lsn,
bool need_full_snapshot); bool need_full_snapshot,
XLogRecPtr initial_consistent_point);
extern void FreeSnapshotBuilder(SnapBuild *cache); extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap); extern void SnapBuildSnapDecRefcount(Snapshot snap);
...@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, ...@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
TransactionId xid); TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts, TransactionId xid, int nsubxacts,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment