Commit 45fdc973 authored by Amit Kapila's avatar Amit Kapila

Extend the logical decoding output plugin API with stream methods.

This adds seven methods to the output plugin API, adding support for
streaming changes of large in-progress transactions.

* stream_start
* stream_stop
* stream_abort
* stream_commit
* stream_change
* stream_message
* stream_truncate

Most of this is a simple extension of the existing methods, with
the semantic difference that the transaction (or subtransaction)
is incomplete and may be aborted later (which is something the
regular API does not really need to deal with).

This also extends the 'test_decoding' plugin, implementing these
new stream methods.

The stream_start/start_stop are used to demarcate a chunk of changes
streamed for a particular toplevel transaction.

This commit simply adds these new APIs and the upcoming patch to "allow
the streaming mode in ReorderBuffer" will use these APIs.

Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent 13838740
...@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ...@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn, ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix, bool transactional, const char *prefix,
Size sz, const char *message); Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
void void
_PG_init(void) _PG_init(void)
...@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) ...@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter; cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown; cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message; cb->message_cb = pg_decode_message;
cb->stream_start_cb = pg_decode_stream_start;
cb->stream_stop_cb = pg_decode_stream_stop;
cb->stream_abort_cb = pg_decode_stream_abort;
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
cb->stream_truncate_cb = pg_decode_stream_truncate;
} }
...@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx, ...@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
appendBinaryStringInfo(ctx->out, message, sz); appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
OutputPluginWrite(ctx, true);
}
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "closing a streamed block for transaction");
OutputPluginWrite(ctx, true);
}
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
OutputPluginWrite(ctx, true);
}
/*
* We never try to stream any empty xact so we don't need any special handling
* for skip_empty_xacts in streaming mode APIs.
*/
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "committing streamed transaction");
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the changes as the transaction can abort
* at a later point in time. We don't want users to see the changes until the
* transaction is committed.
*/
static void
pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "streaming change for transaction");
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the contents for transactional messages
* as the transaction can abort at a later point in time. We don't want users to
* see the message contents until the transaction is committed.
*/
static void
pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
const char *prefix, Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
if (transactional)
{
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
transactional, prefix, sz);
}
else
{
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
transactional, prefix, sz);
appendBinaryStringInfo(ctx->out, message, sz);
}
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
*/
static void
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "streaming truncate for transaction");
OutputPluginWrite(ctx, true);
}
...@@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks ...@@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb; LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks; } OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
...@@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ...@@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
If <function>truncate_cb</function> is not set but a If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored. <command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para> </para>
<para>
An output plugin may also define functions to support streaming of large,
in-progress transactions. The <function>stream_start_cb</function>,
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
<function>stream_commit_cb</function> and <function>stream_change_cb</function>
are required, while <function>stream_message_cb</function> and
<function>stream_truncate_cb</function> are optional.
</para>
</sect2> </sect2>
<sect2 id="logicaldecoding-capabilities"> <sect2 id="logicaldecoding-capabilities">
...@@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ...@@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para> </para>
</sect3> </sect3>
<sect3 id="logicaldecoding-output-plugin-stream-start">
<title>Stream Start Callback</title>
<para>
The <function>stream_start_cb</function> callback is called when opening
a block of streamed changes from an in-progress transaction.
<programlisting>
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-stop">
<title>Stream Stop Callback</title>
<para>
The <function>stream_stop_cb</function> callback is called when closing
a block of streamed changes from an in-progress transaction.
<programlisting>
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-abort">
<title>Stream Abort Callback</title>
<para>
The <function>stream_abort_cb</function> callback is called to abort
a previously streamed transaction.
<programlisting>
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-commit">
<title>Stream Commit Callback</title>
<para>
The <function>stream_commit_cb</function> callback is called to commit
a previously streamed transaction.
<programlisting>
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-change">
<title>Stream Change Callback</title>
<para>
The <function>stream_change_cb</function> callback is called when sending
a change in a block of streamed changes (demarcated by
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
The actual changes are not displayed as the transaction can abort at a later
point in time and we don't decode changes for aborted transactions.
<programlisting>
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-message">
<title>Stream Message Callback</title>
<para>
The <function>stream_message_cb</function> callback is called when sending
a generic message in a block of streamed changes (demarcated by
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
The message contents for transactional messages are not displayed as the transaction
can abort at a later point in time and we don't decode changes for aborted
transactions.
<programlisting>
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
</programlisting>
</para>
</sect3>
<sect3 id="logicaldecoding-output-plugin-stream-truncate">
<title>Stream Truncate Callback</title>
<para>
The <function>stream_truncate_cb</function> callback is called for a
<command>TRUNCATE</command> command in a block of streamed changes
(demarcated by <function>stream_start_cb</function> and
<function>stream_stop_cb</function> calls).
<programlisting>
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
</programlisting>
The parameters are analogous to the <function>stream_change_cb</function>
callback. However, because <command>TRUNCATE</command> actions on
tables connected by foreign keys need to be executed together, this
callback receives an array of relations instead of just a single one.
See the description of the <xref linkend="sql-truncate"/> statement for
details.
</para>
</sect3>
</sect2> </sect2>
<sect2 id="logicaldecoding-output-plugin-output"> <sect2 id="logicaldecoding-output-plugin-output">
...@@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true); ...@@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true);
</para> </para>
</note> </note>
</sect1> </sect1>
<sect1 id="logicaldecoding-streaming">
<title>Streaming of Large Transactions for Logical Decoding</title>
<para>
The basic output plugin callbacks (e.g. <function>begin_cb</function>,
<function>change_cb</function>, <function>commit_cb</function> and
<function>message_cb</function>) are only invoked when the transaction
actually commits. The changes are still decoded from the transaction
log, but are only passed to the output plugin at commit (and discarded
if the transaction aborts).
</para>
<para>
This means that while the decoding happens incrementally, and may spill
to disk to keep memory usage under control, all the decoded changes have
to be transmitted when the transaction finally commits (or more precisely,
when the commit is decoded from the transaction log). Depending on the
size of the transaction and network bandwidth, the transfer time may
significantly increase the apply lag.
</para>
<para>
To reduce the apply lag caused by large transactions, an output plugin
may provide additional callback to support incremental streaming of
in-progress transactions. There are multiple required streaming callbacks
(<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
and <function>stream_change_cb</function>) and two optional callbacks
(<function>stream_message_cb</function>) and (<function>stream_truncate_cb</function>).
</para>
<para>
When streaming an in-progress transaction, the changes (and messages) are
streamed in blocks demarcated by <function>stream_start_cb</function>
and <function>stream_stop_cb</function> callbacks. Once all the decoded
changes are transmitted, the transaction is committed using the
<function>stream_commit_cb</function> callback (or possibly aborted using
the <function>stream_abort_cb</function> callback).
</para>
<para>
One example sequence of streaming callback calls for one transaction may
look like this:
<programlisting>
stream_start_cb(...); &lt;-- start of first block of changes
stream_change_cb(...);
stream_change_cb(...);
stream_message_cb(...);
stream_change_cb(...);
...
stream_change_cb(...);
stream_stop_cb(...); &lt;-- end of first block of changes
stream_start_cb(...); &lt;-- start of second block of changes
stream_change_cb(...);
stream_change_cb(...);
stream_change_cb(...);
...
stream_message_cb(...);
stream_change_cb(...);
stream_stop_cb(...); &lt;-- end of second block of changes
stream_commit_cb(...); &lt;-- commit of the streamed transaction
</programlisting>
</para>
<para>
The actual sequence of callback calls may be more complicated, of course.
There may be blocks for multiple streamed transactions, some of the
transactions may get aborted, etc.
</para>
<para>
Similar to spill-to-disk behavior, streaming is triggered when the total
amount of changes decoded from the WAL (for all in-progress transactions)
exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting.
At that point the largest toplevel transaction (measured by amount of memory
currently used for decoded changes) is selected and streamed. However, in
some cases we still have to spill to the disk even if streaming is enabled
because if we cross the memory limit but we still have not decoded the
complete tuple e.g. only decoded toast table insert but not the main table
insert.
</para>
<para>
Even when streaming large transactions, the changes are still applied in
commit order, preserving the same guarantees as the non-streaming mode.
</para>
</sect1>
</chapter> </chapter>
...@@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ...@@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional, XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message); const char *prefix, Size message_size, const char *message);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn);
static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr last_lsn);
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
/* /*
...@@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper; ctx->reorder->message = message_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
* callbacks. The message and truncate callbacks are optional, similar to
* regular output plugins. We however enable streaming when at least one
* of the methods is enabled so that we can easily identify missing
* methods.
*
* We decide it here, but only check it later in the wrappers.
*/
ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
(ctx->callbacks.stream_stop_cb != NULL) ||
(ctx->callbacks.stream_abort_cb != NULL) ||
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
* streaming callbacks
*
* stream_message and stream_truncate callbacks are optional, so we do not
* fail with ERROR when missing, but the wrappers simply do nothing. We
* must set the ReorderBuffer callbacks to something, otherwise the calls
* from there will crash (we don't want to move the checks there).
*/
ctx->reorder->stream_start = stream_start_cb_wrapper;
ctx->reorder->stream_stop = stream_stop_cb_wrapper;
ctx->reorder->stream_abort = stream_abort_cb_wrapper;
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
ctx->out = makeStringInfo(); ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write; ctx->prepare_write = prepare_write;
ctx->write = do_write; ctx->write = do_write;
...@@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ...@@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
} }
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_start";
state.report_location = first_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
/*
* report this message's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
* receipt of this transaction, but it might allow another transaction's
* commit to be confirmed with one message.
*/
ctx->write_location = first_lsn;
/* in streaming mode, stream_start_cb is required */
if (ctx->callbacks.stream_start_cb == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical streaming requires a stream_start_cb callback")));
ctx->callbacks.stream_start_cb(ctx, txn);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr last_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_stop";
state.report_location = last_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
/*
* report this message's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
* receipt of this transaction, but it might allow another transaction's
* commit to be confirmed with one message.
*/
ctx->write_location = last_lsn;
/* in streaming mode, stream_stop_cb is required */
if (ctx->callbacks.stream_stop_cb == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical streaming requires a stream_stop_cb callback")));
ctx->callbacks.stream_stop_cb(ctx, txn);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_abort";
state.report_location = abort_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = abort_lsn;
/* in streaming mode, stream_abort_cb is required */
if (ctx->callbacks.stream_abort_cb == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical streaming requires a stream_abort_cb callback")));
ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_commit";
state.report_location = txn->final_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
/* in streaming mode, stream_abort_cb is required */
if (ctx->callbacks.stream_commit_cb == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical streaming requires a stream_commit_cb callback")));
ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_change";
state.report_location = change->lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
/*
* report this change's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
* receipt of this transaction, but it might allow another transaction's
* commit to be confirmed with one message.
*/
ctx->write_location = change->lsn;
/* in streaming mode, stream_change_cb is required */
if (ctx->callbacks.stream_change_cb == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical streaming requires a stream_change_cb callback")));
ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* this callback is optional */
if (ctx->callbacks.stream_message_cb == NULL)
return;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_message";
state.report_location = message_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
/* do the actual work: call callback */
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
message_size, message);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* this callback is optional */
if (!ctx->callbacks.stream_truncate_cb)
return;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_truncate";
state.report_location = change->lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
/*
* report this change's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
* receipt of this transaction, but it might allow another transaction's
* commit to be confirmed with one message.
*/
ctx->write_location = change->lsn;
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
/* /*
* Set the required catalog xmin horizon for historic snapshots in the current * Set the required catalog xmin horizon for historic snapshots in the current
* replication slot. * replication slot.
......
...@@ -79,6 +79,11 @@ typedef struct LogicalDecodingContext ...@@ -79,6 +79,11 @@ typedef struct LogicalDecodingContext
*/ */
void *output_writer_private; void *output_writer_private;
/*
* Does the output plugin support streaming, and is it enabled?
*/
bool streaming;
/* /*
* State for writing output. * State for writing output.
*/ */
......
...@@ -99,6 +99,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct ...@@ -99,6 +99,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
*/ */
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
* Called when starting to stream a block of changes from in-progress
* transaction (may be called repeatedly, if it's streamed in multiple
* chunks).
*/
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
/*
* Called when stopping to stream a block of changes from in-progress
* transaction to a remote node (may be called repeatedly, if it's streamed
* in multiple chunks).
*/
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
/*
* Called to discard changes streamed to remote node from in-progress
* transaction.
*/
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
/*
* Called to apply changes streamed to remote node from in-progress
* transaction.
*/
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
/*
* Callback for streaming individual changes from in-progress transactions.
*/
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
/*
* Callback for streaming generic logical decoding messages from in-progress
* transactions.
*/
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
/*
* Callback for streaming truncates from in-progress transactions.
*/
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
/* /*
* Output plugin callbacks * Output plugin callbacks
*/ */
...@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks ...@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb; LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeShutdownCB shutdown_cb;
/* streaming of changes */
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks; } OutputPluginCallbacks;
/* Functions in replication/logical/logical.c */ /* Functions in replication/logical/logical.c */
......
...@@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, ...@@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz, const char *prefix, Size sz,
const char *message); const char *message);
/* start streaming transaction callback signature */
typedef void (*ReorderBufferStreamStartCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr first_lsn);
/* stop streaming transaction callback signature */
typedef void (*ReorderBufferStreamStopCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr last_lsn);
/* discard streamed transaction callback signature */
typedef void (*ReorderBufferStreamAbortCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
/* commit streamed transaction callback signature */
typedef void (*ReorderBufferStreamCommitCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
/* stream change callback signature */
typedef void (*ReorderBufferStreamChangeCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
/* stream message callback signature */
typedef void (*ReorderBufferStreamMessageCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix, Size sz,
const char *message);
/* stream truncate callback signature */
typedef void (*ReorderBufferStreamTruncateCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
struct ReorderBuffer struct ReorderBuffer
{ {
/* /*
...@@ -386,6 +434,17 @@ struct ReorderBuffer ...@@ -386,6 +434,17 @@ struct ReorderBuffer
ReorderBufferCommitCB commit; ReorderBufferCommitCB commit;
ReorderBufferMessageCB message; ReorderBufferMessageCB message;
/*
* Callbacks to be called when streaming a transaction.
*/
ReorderBufferStreamStartCB stream_start;
ReorderBufferStreamStopCB stream_stop;
ReorderBufferStreamAbortCB stream_abort;
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
ReorderBufferStreamTruncateCB stream_truncate;
/* /*
* Pointer that will be passed untouched to the callbacks. * Pointer that will be passed untouched to the callbacks.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment