Commit 0ba5181c authored by Amit Kapila's avatar Amit Kapila

Skip empty transaction stream in test_decoding.

We were decoding empty transactions via streaming APIs added in commit
45fdc973 even when the user used the option 'skip-empty-xacts'. The APIs
makes no effort to skip empty xacts under the assumption that we will
never try to stream such transactions. However, that is not true because
we can pick to stream a transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such messages to
downstream rather they are just to update the internal state. So, we need
to skip such xacts when plugin uses the option 'skip-empty-xacts'.

Diagnosed-By: Amit Kapila
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1+OqgFNZkf7=ETe_y5ntjgDk3T0wcdkd4Sot_u1hySGfw@mail.gmail.com
parent 9f1cf97b
...@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ...@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \ decoding_into_rel binary prepared replorigin time messages \
spill slot truncate stream spill slot truncate stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
......
Parsed test spec with 2 sessions
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
step s0_begin: BEGIN;
step s0_ddl: CREATE TABLE stream_test1(data text);
step s1_ddl: CREATE TABLE stream_test(data text);
step s1_begin: BEGIN;
step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
step s1_commit: COMMIT;
step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
opening a streamed block for transaction
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
?column?
stop
...@@ -29,10 +29,7 @@ COMMIT; ...@@ -29,10 +29,7 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data data
---------------------------------------------------------- ----------------------------------------------------------
opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50 streaming message: transactional: 1 prefix: test, sz: 50
closing a streamed block for transaction
aborting streamed (sub)transaction
opening a streamed block for transaction opening a streamed block for transaction
streaming change for transaction streaming change for transaction
streaming change for transaction streaming change for transaction
...@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl ...@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction streaming change for transaction
closing a streamed block for transaction closing a streamed block for transaction
committing streamed transaction committing streamed transaction
(27 rows) (24 rows)
-- streaming test for toast changes -- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external; ALTER TABLE stream_test ALTER COLUMN data set storage external;
......
# Test decoding of in-progress transaction containing dml and a concurrent
# transaction with ddl operation. The transaction containing ddl operation
# should not get streamed as it doesn't have any changes.
setup
{
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
}
teardown
{
DROP TABLE IF EXISTS stream_test;
DROP TABLE IF EXISTS stream_test1;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
session "s0"
setup { SET synchronous_commit=on; }
step "s0_begin" { BEGIN; }
step "s0_ddl" {CREATE TABLE stream_test1(data text);}
# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
# the currently running s0_ddl and we want to test that s0_ddl should not get
# streamed when user asked to skip-empty-xacts.
session "s1"
setup { SET synchronous_commit=on; }
step "s1_ddl" { CREATE TABLE stream_test(data text); }
step "s1_begin" { BEGIN; }
step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
step "s1_commit" { COMMIT; }
step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
...@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ...@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message); Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx, static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ReorderBufferTXN *txn);
static void pg_output_stream_start(LogicalDecodingContext *ctx,
TestDecodingData *data,
ReorderBufferTXN *txn,
bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
...@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx, ...@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void static void
pg_decode_stream_start(LogicalDecodingContext *ctx, pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) ReorderBufferTXN *txn)
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true); data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_stream_start(ctx, data, txn, true);
}
static void
pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else else
appendStringInfo(ctx->out, "opening a streamed block for transaction"); appendStringInfo(ctx->out, "opening a streamed block for transaction");
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, last_write);
} }
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void static void
pg_decode_stream_stop(LogicalDecodingContext *ctx, pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) ReorderBufferTXN *txn)
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
...@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, ...@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void static void
pg_decode_stream_abort(LogicalDecodingContext *ctx, pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, ReorderBufferTXN *txn,
...@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, ...@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
...@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, ...@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void static void
pg_decode_stream_commit(LogicalDecodingContext *ctx, pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, ReorderBufferTXN *txn,
...@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, ...@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
...@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, ...@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
/* output stream start if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
...@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{ {
TestDecodingData *data = ctx->output_plugin_private; TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
if (data->include_xids) if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment