Commit f64ea6dc authored by Amit Kapila's avatar Amit Kapila

Add a xid argument to the filter_prepare callback for output plugins.

Along with gid, this provides a different way to identify the transaction.
The users that use xid in some way to prepare the transactions can use it
to filter prepare transactions. The later commands COMMIT PREPARED or
ROLLBACK PREPARED carries both identifiers, providing an output plugin the
choice of what to use.

Author: Markus Wanner
Reviewed-by: Vignesh C, Amit Kapila
Discussion: https://postgr.es/m/ee280000-7355-c4dc-e47b-2436e7be959c@enterprisedb.com
parent bc2797eb
...@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ...@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
bool transactional, const char *prefix, bool transactional, const char *prefix,
Size sz, const char *message); Size sz, const char *message);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid); const char *gid);
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ReorderBufferTXN *txn);
...@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ...@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
* substring, then we filter it out. * substring, then we filter it out.
*/ */
static bool static bool
pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid) pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
const char *gid)
{ {
if (strstr(gid, "_nodecode") != NULL) if (strstr(gid, "_nodecode") != NULL)
return true; return true;
......
...@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ...@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
<command>COMMIT PREPARED</command> time. To signal that <command>COMMIT PREPARED</command> time. To signal that
decoding should be skipped, return <literal>true</literal>; decoding should be skipped, return <literal>true</literal>;
<literal>false</literal> otherwise. When the callback is not <literal>false</literal> otherwise. When the callback is not
defined, <literal>false</literal> is assumed (i.e. nothing is defined, <literal>false</literal> is assumed (i.e. no filtering, all
filtered). transactions using two-phase commit are decoded in two phases as well).
<programlisting> <programlisting>
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid); const char *gid);
</programlisting> </programlisting>
The <parameter>ctx</parameter> parameter has the same contents as for the The <parameter>ctx</parameter> parameter has the same contents as for
other callbacks. The <parameter>gid</parameter> is the identifier that later the other callbacks. The parameters <parameter>xid</parameter>
identifies this transaction for <command>COMMIT PREPARED</command> or and <parameter>gid</parameter> provide two different ways to identify
<command>ROLLBACK PREPARED</command>. the transaction. The later <command>COMMIT PREPARED</command> or
<command>ROLLBACK PREPARED</command> carries both identifiers,
providing an output plugin the choice of what to use.
</para> </para>
<para> <para>
The callback has to provide the same static answer for a given The callback may be invoked multiple times per transaction to decode
<parameter>gid</parameter> every time it is called. and must provide the same static answer for a given pair of
<parameter>xid</parameter> and <parameter>gid</parameter> every time
it is called.
</para> </para>
</sect3> </sect3>
...@@ -1219,9 +1224,11 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction ...@@ -1219,9 +1224,11 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction
</para> </para>
<para> <para>
Optionally the output plugin can specify a name pattern in the Optionally the output plugin can define filtering rules via
<function>filter_prepare_cb</function> and transactions with gid containing <function>filter_prepare_cb</function> to decode only specific transaction
that name pattern will not be decoded as a two-phase commit transaction. in two phases. This can be achieved by pattern matching on the
<parameter>gid</parameter> or via lookups using the
<parameter>xid</parameter>.
</para> </para>
<para> <para>
......
...@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
/* helper functions for decoding transactions */ /* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid); static inline bool FilterPrepare(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf, Oid dbId, XLogRecordBuffer *buf, Oid dbId,
RepOriginId origin_id); RepOriginId origin_id);
...@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* doesn't filter the transaction at prepare time. * doesn't filter the transaction at prepare time.
*/ */
if (info == XLOG_XACT_COMMIT_PREPARED) if (info == XLOG_XACT_COMMIT_PREPARED)
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); two_phase = !(FilterPrepare(ctx, xid,
parsed.twophase_gid));
DecodeCommit(ctx, buf, &parsed, xid, two_phase); DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break; break;
...@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* doesn't filter the transaction at prepare time. * doesn't filter the transaction at prepare time.
*/ */
if (info == XLOG_XACT_ABORT_PREPARED) if (info == XLOG_XACT_ABORT_PREPARED)
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); two_phase = !(FilterPrepare(ctx, xid,
parsed.twophase_gid));
DecodeAbort(ctx, buf, &parsed, xid, two_phase); DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break; break;
...@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* manner iff output plugin supports two-phase commits and * manner iff output plugin supports two-phase commits and
* doesn't filter the transaction at prepare time. * doesn't filter the transaction at prepare time.
*/ */
if (FilterPrepare(ctx, parsed.twophase_gid)) if (FilterPrepare(ctx, parsed.twophase_xid,
parsed.twophase_gid))
{ {
ReorderBufferProcessXid(reorder, parsed.twophase_xid, ReorderBufferProcessXid(reorder, parsed.twophase_xid,
buf->origptr); buf->origptr);
...@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* this transaction as a regular commit later. * this transaction as a regular commit later.
*/ */
static inline bool static inline bool
FilterPrepare(LogicalDecodingContext *ctx, const char *gid) FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
const char *gid)
{ {
/* /*
* Skip if decoding of two-phase transactions at PREPARE time is not * Skip if decoding of two-phase transactions at PREPARE time is not
...@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid) ...@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
if (ctx->callbacks.filter_prepare_cb == NULL) if (ctx->callbacks.filter_prepare_cb == NULL)
return false; return false;
return filter_prepare_cb_wrapper(ctx, gid); return filter_prepare_cb_wrapper(ctx, xid, gid);
} }
static inline bool static inline bool
......
...@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ...@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
} }
bool bool
filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
const char *gid)
{ {
LogicalErrorCallbackState state; LogicalErrorCallbackState state;
ErrorContextCallback errcallback; ErrorContextCallback errcallback;
...@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) ...@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
ctx->accept_writes = false; ctx->accept_writes = false;
/* do the actual work: call callback */ /* do the actual work: call callback */
ret = ctx->callbacks.filter_prepare_cb(ctx, gid); ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
/* Pop the error context stack */ /* Pop the error context stack */
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
......
...@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, ...@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn); XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void); extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx); extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
......
...@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); ...@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
* and sent as usual transaction. * and sent as usual transaction.
*/ */
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid); const char *gid);
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment