Commit ac4645c0 authored by Amit Kapila's avatar Amit Kapila

Allow pgoutput to send logical decoding messages.

The output plugin accepts a new parameter (messages) that controls if
logical decoding messages are written into the replication stream. It is
useful for those clients that use pgoutput as an output plugin and needs
to process messages that were written by pg_logical_emit_message().

Although logical streaming replication protocol supports logical
decoding messages now, logical replication does not use this feature yet.

Author: David Pirotte, Euler Taveira
Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila
Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
parent 531737dd
...@@ -6433,6 +6433,82 @@ Begin ...@@ -6433,6 +6433,82 @@ Begin
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term>
Message
</term>
<listitem>
<para>
<variablelist>
<varlistentry>
<term>
Byte1('M')
</term>
<listitem>
<para>
Identifies the message as a logical decoding message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int32
</term>
<listitem>
<para>
Xid of the transaction. The XID is sent only when user has
requested streaming of in-progress transactions.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int8
</term>
<listitem>
<para>
Flags; Either 0 for no flags or 1 if the logical decoding
message is transactional.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Int64
</term>
<listitem>
<para>
The LSN of the logical decoding message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
String
</term>
<listitem>
<para>
The prefix of the logical decoding message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte<replaceable>n</replaceable>
</term>
<listitem>
<para>
The content of the logical decoding message.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term> <term>
Commit Commit
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
*/ */
#define LOGICALREP_IS_REPLICA_IDENTITY 1 #define LOGICALREP_IS_REPLICA_IDENTITY 1
#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1) #define TRUNCATE_RESTART_SEQS (1<<1)
...@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in, ...@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
return relids; return relids;
} }
/*
* Write MESSAGE to stream
*/
void
logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz,
const char *message)
{
uint8 flags = 0;
pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
/* encode and send message flags */
if (transactional)
flags |= MESSAGE_TRANSACTIONAL;
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
pq_sendint8(out, flags);
pq_sendint64(out, lsn);
pq_sendstring(out, prefix);
pq_sendint32(out, sz);
pq_sendbytes(out, message, sz);
}
/* /*
* Write relation description to the output stream. * Write relation description to the output stream.
*/ */
......
...@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s) ...@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s); apply_handle_origin(s);
return; return;
case LOGICAL_REP_MSG_MESSAGE:
/*
* Logical replication does not use generic logical messages yet.
* Although, it could be used by other applications that use this
* output plugin.
*/
return;
case LOGICAL_REP_MSG_STREAM_START: case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s); apply_handle_stream_start(s);
return; return;
......
...@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ...@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
static void pgoutput_truncate(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change); ReorderBufferChange *change);
static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id); RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
...@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) ...@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pgoutput_begin_txn; cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change; cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate; cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn; cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter; cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown; cb->shutdown_cb = pgoutput_shutdown;
...@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) ...@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_abort_cb = pgoutput_stream_abort; cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change; cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate; cb->stream_truncate_cb = pgoutput_truncate;
} }
...@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data) ...@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool protocol_version_given = false; bool protocol_version_given = false;
bool publication_names_given = false; bool publication_names_given = false;
bool binary_option_given = false; bool binary_option_given = false;
bool messages_option_given = false;
bool streaming_given = false; bool streaming_given = false;
data->binary = false; data->binary = false;
data->streaming = false; data->streaming = false;
data->messages = false;
foreach(lc, options) foreach(lc, options)
{ {
...@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data) ...@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->binary = defGetBoolean(defel); data->binary = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "messages") == 0)
{
if (messages_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
messages_option_given = true;
data->messages = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0) else if (strcmp(defel->defname, "streaming") == 0)
{ {
if (streaming_given) if (streaming_given)
...@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context); MemoryContextReset(data->context);
} }
static void
pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
if (!data->messages)
return;
/*
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
if (in_streaming)
xid = txn->xid;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_message(ctx->out,
xid,
message_lsn,
transactional,
prefix,
sz,
message);
OutputPluginWrite(ctx, true);
}
/* /*
* Currently we always forward. * Currently we always forward.
*/ */
......
...@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType ...@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_TRUNCATE = 'T',
LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
...@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, ...@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
bool cascade, bool restart_seqs); bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in, extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs); bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid, extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel); Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
......
...@@ -26,6 +26,7 @@ typedef struct PGOutputData ...@@ -26,6 +26,7 @@ typedef struct PGOutputData
List *publications; List *publications;
bool binary; bool binary;
bool streaming; bool streaming;
bool messages;
} PGOutputData; } PGOutputData;
#endif /* PGOUTPUT_H */ #endif /* PGOUTPUT_H */
# Tests that logical decoding messages
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 5;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_test (a int primary key)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_test (a int primary key)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);
# Ensure a transactional logical decoding message shows up on the slot
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
$node_publisher->safe_psql('postgres',
"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
);
my $result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0)
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true')
));
# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
is($result, qq(66
77
67),
'messages on slot are B M C with message option');
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true')
OFFSET 1 LIMIT 1
));
is($result, qq(1|pgoutput),
"flag transactional is set to 1 and prefix is pgoutput");
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0)
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub')
));
# 66 67 == B C == BEGIN COMMIT
is($result, qq(66
67),
'option messages defaults to false so message (M) is not available on slot');
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
$node_publisher->wait_for_catchup('tap_sub');
# ensure a non-transactional logical decoding message shows up on the slot
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
my $message_lsn = $node_publisher->safe_psql('postgres',
"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0), get_byte(data, 1)
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true')
WHERE lsn = '$message_lsn' AND xid = 0
));
is($result, qq(77|0), 'non-transactional message on slot is M');
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
$node_publisher->wait_for_catchup('tap_sub');
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
# wait for the replication connection to drop from the publisher
$node_publisher->poll_query_until('postgres',
'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
# Ensure a non-transactional logical decoding message shows up on the slot when
# it is emitted within an aborted transaction. The message won't emit until
# something advances the LSN, hence, we intentionally forces the server to
# switch to a new WAL file.
$node_publisher->safe_psql(
'postgres', qq(
BEGIN;
SELECT pg_logical_emit_message(false, 'pgoutput',
'a non-transactional message is available even if the transaction is aborted 1');
INSERT INTO tab_test VALUES (3);
SELECT pg_logical_emit_message(true, 'pgoutput',
'a transactional message is not available if the transaction is aborted');
SELECT pg_logical_emit_message(false, 'pgoutput',
'a non-transactional message is available even if the transaction is aborted 2');
ROLLBACK;
SELECT pg_switch_wal();
));
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0), get_byte(data, 1)
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true')
));
is($result, qq(77|0
77|0),
'non-transactional message on slot from aborted transaction is M');
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment