Commit 7259736a authored by Amit Kapila's avatar Amit Kapila

Implement streaming mode in ReorderBuffer.

Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc973.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.

We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af and c55040cc respectively.

Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).

We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.

We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to.  The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).

We also provide a new option via SQL APIs to fetch the changes being
streamed.

Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent 0a7d771f
...@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" ...@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \ decoding_into_rel binary prepared replorigin time messages \
spill slot truncate spill slot truncate stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top oldest_xmin snapshot_transfer subxact_without_top
......
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- streaming test with sub-transaction
BEGIN;
savepoint s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
rollback to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
data
----------------------------------------------------------
opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
closing a streamed block for transaction
aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
(27 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
data
------------------------------------------
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
(13 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
...@@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc ...@@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT COMMIT
(9 rows) (9 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- streaming test with sub-transaction
BEGIN;
savepoint s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
rollback to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
...@@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; ...@@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
TRUNCATE tab1, tab2; TRUNCATE tab1, tab2;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
...@@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
{ {
ListCell *option; ListCell *option;
TestDecodingData *data; TestDecodingData *data;
bool enable_streaming = false;
data = palloc0(sizeof(TestDecodingData)); data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context, data->context = AllocSetContextCreate(ctx->context,
...@@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"", errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname))); strVal(elem->arg), elem->defname)));
} }
else if (strcmp(elem->defname, "stream-changes") == 0)
{
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &enable_streaming))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else else
{ {
ereport(ERROR, ereport(ERROR,
...@@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
elem->arg ? strVal(elem->arg) : "(null)"))); elem->arg ? strVal(elem->arg) : "(null)")));
} }
} }
ctx->streaming &= enable_streaming;
} }
/* cleanup this plugin's resources */ /* cleanup this plugin's resources */
......
...@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ...@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
ALTER TABLE user_catalog_table SET (user_catalog_table = true); ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
</programlisting> </programlisting>
Any actions leading to transaction ID assignment are prohibited. That, among others, Note that access to user catalog tables or regular system catalog tables
includes writing to tables, performing DDL changes, and in the output plugins has to be done via the <literal>systable_*</literal>
calling <literal>pg_current_xact_id()</literal>. scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
error out. Additionally, any actions leading to transaction ID assignment
are prohibited. That, among others, includes writing to tables, performing
DDL changes, and calling <literal>pg_current_xact_id()</literal>.
</para> </para>
</sect2> </sect2>
......
...@@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i ...@@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i
</programlisting> </programlisting>
</para> </para>
<para>
We can also get the changes of the in-progress transaction and the typical
output, might be:
<programlisting>
postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1');
lsn | xid | data
-----------+-----+--------------------------------------------------
0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
0/16B21F8 | 503 | streaming change for TXN 503
0/16B2300 | 503 | streaming change for TXN 503
0/16B2408 | 503 | streaming change for TXN 503
0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
0/16BECA8 | 503 | streaming change for TXN 503
0/16BEDB0 | 503 | streaming change for TXN 503
0/16BEEB8 | 503 | streaming change for TXN 503
0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
(10 rows)
</programlisting>
</para>
</sect1> </sect1>
...@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) ...@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg_internal("only heap AM is supported"))); errmsg_internal("only heap AM is supported")));
/*
* We don't expect direct calls to heap_getnext with valid CheckXidAlive
* for catalog or regular tables. See detailed comments in xact.c where
* these variables are declared. Normally we have such a check at tableam
* level API but this is called from many places so we need to ensure it
* here.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected heap_getnext call during logical decoding");
/* Note: no locking manipulations needed */ /* Note: no locking manipulations needed */
if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
...@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, ...@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
{ {
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
bufflags |= REGBUF_KEEP_DATA; bufflags |= REGBUF_KEEP_DATA;
if (IsToastRelation(relation))
xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
} }
XLogBeginInsert(); XLogBeginInsert();
......
...@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, ...@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer, htup, buffer,
&cmin, &cmax); &cmin, &cmax);
/*
* If we haven't resolved the combocid to cmin/cmax, that means we
* have not decoded the combocid yet. That means the cmin is
* definitely in the future, and we're not supposed to see the tuple
* yet.
*
* XXX This only applies to decoding of in-progress transactions. In
* regular logical decoding we only execute this code at commit time,
* at which point we should have seen all relevant combocids. So
* ideally, we should error out in this case but in practice, this
* won't happen. If we are too worried about this then we can add an
* elog inside ResolveCminCmaxDuringDecoding.
*
* XXX For the streaming case, we can track the largest combocid
* assigned, and error out based on this (when unable to resolve
* combocid below that observed maximum value).
*/
if (!resolved) if (!resolved)
elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); return false;
Assert(cmin != InvalidCommandId); Assert(cmin != InvalidCommandId);
...@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, ...@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer, htup, buffer,
&cmin, &cmax); &cmin, &cmax);
if (!resolved) /*
elog(ERROR, "could not resolve combocid to cmax"); * If we haven't resolved the combocid to cmin/cmax, that means we
* have not decoded the combocid yet. That means the cmax is
Assert(cmax != InvalidCommandId); * definitely in the future, and we're still supposed to see the
* tuple.
*
* XXX This only applies to decoding of in-progress transactions. In
* regular logical decoding we only execute this code at commit time,
* at which point we should have seen all relevant combocids. So
* ideally, we should error out in this case but in practice, this
* won't happen. If we are too worried about this then we can add an
* elog inside ResolveCminCmaxDuringDecoding.
*
* XXX For the streaming case, we can track the largest combocid
* assigned, and error out based on this (when unable to resolve
* combocid below that observed maximum value).
*/
if (!resolved || cmax == InvalidCommandId)
return true;
if (cmax >= snapshot->curcid) if (cmax >= snapshot->curcid)
return true; /* deleted after scan started */ return true; /* deleted after scan started */
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/procarray.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
...@@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation, ...@@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation,
sysscan->iscan = NULL; sysscan->iscan = NULL;
} }
/*
* If CheckXidAlive is set then set a flag to indicate that system table
* scan is in-progress. See detailed comments in xact.c where these
* variables are declared.
*/
if (TransactionIdIsValid(CheckXidAlive))
bsysscan = true;
return sysscan; return sysscan;
} }
/*
* HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive.
*
* Error out, if CheckXidAlive is aborted. We can't directly use
* TransactionIdDidAbort as after crash such transaction might not have been
* marked as aborted. See detailed comments in xact.c where the variable
* is declared.
*/
static inline void
HandleConcurrentAbort()
{
if (TransactionIdIsValid(CheckXidAlive) &&
!TransactionIdIsInProgress(CheckXidAlive) &&
!TransactionIdDidCommit(CheckXidAlive))
ereport(ERROR,
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("transaction aborted during system catalog scan")));
}
/* /*
* systable_getnext --- get next tuple in a heap-or-index scan * systable_getnext --- get next tuple in a heap-or-index scan
* *
...@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan) ...@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan)
} }
} }
/*
* Handle the concurrent abort while fetching the catalog tuple during
* logical streaming of a transaction.
*/
HandleConcurrentAbort();
return htup; return htup;
} }
...@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) ...@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
sysscan->slot, sysscan->slot,
freshsnap); freshsnap);
/*
* Handle the concurrent abort while fetching the catalog tuple during
* logical streaming of a transaction.
*/
HandleConcurrentAbort();
return result; return result;
} }
...@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan) ...@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan)
if (sysscan->snapshot) if (sysscan->snapshot)
UnregisterSnapshot(sysscan->snapshot); UnregisterSnapshot(sysscan->snapshot);
/*
* Reset the bsysscan flag at the end of the systable scan. See
* detailed comments in xact.c where these variables are declared.
*/
if (TransactionIdIsValid(CheckXidAlive))
bsysscan = false;
pfree(sysscan); pfree(sysscan);
} }
...@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) ...@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
if (htup && sysscan->iscan->xs_recheck) if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
/*
* Handle the concurrent abort while fetching the catalog tuple during
* logical streaming of a transaction.
*/
HandleConcurrentAbort();
return htup; return htup;
} }
......
...@@ -248,6 +248,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) ...@@ -248,6 +248,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
Relation rel = scan->rs_rd; Relation rel = scan->rs_rd;
const TableAmRoutine *tableam = rel->rd_tableam; const TableAmRoutine *tableam = rel->rd_tableam;
/*
* We don't expect direct calls to table_tuple_get_latest_tid with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
/* /*
* Since this can be called with user-supplied TID, don't trust the input * Since this can be called with user-supplied TID, don't trust the input
* too much. * too much.
......
...@@ -82,6 +82,19 @@ bool XactDeferrable; ...@@ -82,6 +82,19 @@ bool XactDeferrable;
int synchronous_commit = SYNCHRONOUS_COMMIT_ON; int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
/*
* CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
* transaction. Currently, it is used in logical decoding. It's possible
* that such transactions can get aborted while the decoding is ongoing in
* which case we skip decoding that particular transaction. To ensure that we
* check whether the CheckXidAlive is aborted after fetching the tuple from
* system tables. We also ensure that during logical decoding we never
* directly access the tableam or heap APIs because we are checking for the
* concurrent aborts only in systable_* APIs.
*/
TransactionId CheckXidAlive = InvalidTransactionId;
bool bsysscan = false;
/* /*
* When running as a parallel worker, we place only a single * When running as a parallel worker, we place only a single
* TransactionStateData on the parallel worker's state stack, and the XID * TransactionStateData on the parallel worker's state stack, and the XID
...@@ -2680,6 +2693,9 @@ AbortTransaction(void) ...@@ -2680,6 +2693,9 @@ AbortTransaction(void)
/* Forget about any active REINDEX. */ /* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel); ResetReindexState(s->nestingLevel);
/* Reset logical streaming state. */
ResetLogicalStreamingState();
/* If in parallel mode, clean up workers and exit parallel mode. */ /* If in parallel mode, clean up workers and exit parallel mode. */
if (IsInParallelMode()) if (IsInParallelMode())
{ {
...@@ -4982,6 +4998,9 @@ AbortSubTransaction(void) ...@@ -4982,6 +4998,9 @@ AbortSubTransaction(void)
/* Forget about any active REINDEX. */ /* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel); ResetReindexState(s->nestingLevel);
/* Reset logical streaming state. */
ResetLogicalStreamingState();
/* Exit from parallel mode, if necessary. */ /* Exit from parallel mode, if necessary. */
if (IsInParallelMode()) if (IsInParallelMode())
{ {
......
...@@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change,
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
} }
/* /*
...@@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
} }
/* /*
...@@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
} }
/* /*
...@@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
memcpy(change->data.truncate.relids, xlrec->relids, memcpy(change->data.truncate.relids, xlrec->relids,
xlrec->nrelids * sizeof(Oid)); xlrec->nrelids * sizeof(Oid));
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change); buf->origptr, change, false);
} }
/* /*
...@@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = false; change->data.tp.clear_toast_afterwards = false;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change); buf->origptr, change, false);
/* move to the next xl_multi_insert_tuple entry */ /* move to the next xl_multi_insert_tuple entry */
data += datalen; data += datalen;
...@@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
} }
......
...@@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) ...@@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex); SpinLockRelease(&MyReplicationSlot->mutex);
} }
} }
/*
* Clear logical streaming state during (sub)transaction abort.
*/
void
ResetLogicalStreamingState(void)
{
CheckXidAlive = InvalidTransactionId;
bsysscan = false;
}
...@@ -67,6 +67,7 @@ ...@@ -67,6 +67,7 @@
#define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_LAST_IN_MULTI (1<<1)
#define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_IS_SPECULATIVE (1<<2)
#define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3)
#define XLH_INSERT_ON_TOAST_RELATION (1<<4)
/* /*
* xl_heap_update flag values, 8 bits are available. * xl_heap_update flag values, 8 bits are available.
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "access/relscan.h" #include "access/relscan.h"
#include "access/sdir.h" #include "access/sdir.h"
#include "access/xact.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/snapshot.h" #include "utils/snapshot.h"
...@@ -903,6 +904,15 @@ static inline bool ...@@ -903,6 +904,15 @@ static inline bool
table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{ {
slot->tts_tableOid = RelationGetRelid(sscan->rs_rd); slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
/*
* We don't expect direct calls to table_scan_getnextslot with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding");
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
} }
...@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan, ...@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
TupleTableSlot *slot, TupleTableSlot *slot,
bool *call_again, bool *all_dead) bool *call_again, bool *all_dead)
{ {
/*
* We don't expect direct calls to table_index_fetch_tuple with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding");
return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot, return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
slot, call_again, slot, call_again,
...@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel, ...@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel,
Snapshot snapshot, Snapshot snapshot,
TupleTableSlot *slot) TupleTableSlot *slot)
{ {
/*
* We don't expect direct calls to table_tuple_fetch_row_version with
* valid CheckXidAlive for catalog or regular tables. See detailed
* comments in xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding");
return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot); return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
} }
...@@ -1713,6 +1738,14 @@ static inline bool ...@@ -1713,6 +1738,14 @@ static inline bool
table_scan_bitmap_next_block(TableScanDesc scan, table_scan_bitmap_next_block(TableScanDesc scan,
struct TBMIterateResult *tbmres) struct TBMIterateResult *tbmres)
{ {
/*
* We don't expect direct calls to table_scan_bitmap_next_block with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
tbmres); tbmres);
} }
...@@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan, ...@@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan,
struct TBMIterateResult *tbmres, struct TBMIterateResult *tbmres,
TupleTableSlot *slot) TupleTableSlot *slot)
{ {
/*
* We don't expect direct calls to table_scan_bitmap_next_tuple with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding");
return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan, return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan,
tbmres, tbmres,
slot); slot);
...@@ -1748,6 +1789,13 @@ static inline bool ...@@ -1748,6 +1789,13 @@ static inline bool
table_scan_sample_next_block(TableScanDesc scan, table_scan_sample_next_block(TableScanDesc scan,
struct SampleScanState *scanstate) struct SampleScanState *scanstate)
{ {
/*
* We don't expect direct calls to table_scan_sample_next_block with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate); return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate);
} }
...@@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan, ...@@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan,
struct SampleScanState *scanstate, struct SampleScanState *scanstate,
TupleTableSlot *slot) TupleTableSlot *slot)
{ {
/*
* We don't expect direct calls to table_scan_sample_next_tuple with valid
* CheckXidAlive for catalog or regular tables. See detailed comments in
* xact.c where these variables are declared.
*/
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate, return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate,
slot); slot);
} }
......
...@@ -81,6 +81,10 @@ typedef enum ...@@ -81,6 +81,10 @@ typedef enum
/* Synchronous commit level */ /* Synchronous commit level */
extern int synchronous_commit; extern int synchronous_commit;
/* used during logical streaming of a transaction */
extern TransactionId CheckXidAlive;
extern bool bsysscan;
/* /*
* Miscellaneous flag bits to record events which occur on the top level * Miscellaneous flag bits to record events which occur on the top level
* transaction. These flags are only persisted in MyXactFlags and are intended * transaction. These flags are only persisted in MyXactFlags and are intended
......
...@@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, ...@@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
#endif #endif
...@@ -162,6 +162,9 @@ typedef struct ReorderBufferChange ...@@ -162,6 +162,9 @@ typedef struct ReorderBufferChange
#define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004 #define RBTXN_IS_SERIALIZED 0x0004
#define RBTXN_IS_STREAMED 0x0008
#define RBTXN_HAS_TOAST_INSERT 0x0010
#define RBTXN_HAS_SPEC_INSERT 0x0020
/* Does the transaction have catalog changes? */ /* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \ #define rbtxn_has_catalog_changes(txn) \
...@@ -181,6 +184,40 @@ typedef struct ReorderBufferChange ...@@ -181,6 +184,40 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
) )
/* This transaction's changes has toast insert, without main table insert. */
#define rbtxn_has_toast_insert(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
)
/*
* This transaction's changes has speculative insert, without speculative
* confirm.
*/
#define rbtxn_has_spec_insert(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
)
/* Check whether this transaction has an incomplete change. */
#define rbtxn_has_incomplete_tuple(txn) \
( \
rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
)
/*
* Has this transaction been streamed to downstream?
*
* (It's not possible to deduce this from nentries and nentries_mem for
* various reasons. For example, all changes may be in subtransactions in
* which case we'd have nentries==0 for the toplevel one, which would say
* nothing about the streaming. So we maintain this flag, but only for the
* toplevel transaction.)
*/
#define rbtxn_is_streamed(txn) \
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
typedef struct ReorderBufferTXN typedef struct ReorderBufferTXN
{ {
/* See above */ /* See above */
...@@ -248,6 +285,13 @@ typedef struct ReorderBufferTXN ...@@ -248,6 +285,13 @@ typedef struct ReorderBufferTXN
XLogRecPtr base_snapshot_lsn; XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
/*
* Snapshot/CID from the previous streaming run. Only valid for already
* streamed transactions (NULL/InvalidCommandId otherwise).
*/
Snapshot snapshot_now;
CommandId command_id;
/* /*
* How many ReorderBufferChange's do we have in this txn. * How many ReorderBufferChange's do we have in this txn.
* *
...@@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN ...@@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN
* Size of this transaction (changes currently in memory, in bytes). * Size of this transaction (changes currently in memory, in bytes).
*/ */
Size size; Size size;
/* Size of top-transaction including sub-transactions. */
Size total_size;
/* If we have detected concurrent abort then ignore future changes. */
bool concurrent_abort;
} ReorderBufferTXN; } ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */ /* so we can define the callbacks used inside struct ReorderBuffer itself */
...@@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *); ...@@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *);
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
XLogRecPtr lsn, ReorderBufferChange *,
bool toast_insert);
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix, bool transactional, const char *prefix,
Size message_size, const char *message); Size message_size, const char *message);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment