Commit be65eddd authored by Andres Freund's avatar Andres Freund

Add required database and origin filtering for logical messages.

Logical messages, added in 3fe3511d, during decoding failed to filter
messages emitted in other databases and messages emitted "under" a
replication origin the output plugin isn't interested in.

Add tests to verify that both types of filtering actually work. While
touching message.sql remove hunk obsoleted by d25379eb.

Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because
3fe3511d had omitted doing so. 3fe3511d additionally didn't bump
catversion, but 7a542700 has done so since.

Author: Petr Jelinek
Reported-By: Andres Freund
Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
parent 80abbeba
-- predictability -- predictability
SET synchronous_commit = on; SET synchronous_commit = on;
SET client_encoding = 'utf8';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column? ?column?
---------- ----------
...@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for ...@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
message: transactional: 1 prefix: test, sz: 11 content:czechtastic message: transactional: 1 prefix: test, sz: 11 content:czechtastic
(7 rows) (7 rows)
SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); -- test db filtering
\set prevdb :DBNAME
\c template1
SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
?column? ?column?
---------- ----------
init otherdb1
(1 row)
SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
?column?
----------
otherdb2
(1 row)
\c :prevdb
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
cleanup
(1 row) (1 row)
...@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); ...@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
-- ensure we prevent duplicate setup -- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
ERROR: cannot setup replication origin when one is already setup ERROR: cannot setup replication origin when one is already setup
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
?column?
----------
(1 row)
BEGIN; BEGIN;
-- setup transaction origin -- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
......
-- predictability -- predictability
SET synchronous_commit = on; SET synchronous_commit = on;
SET client_encoding = 'utf8';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
...@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ...@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); -- test db filtering
\set prevdb :DBNAME
\c template1
SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
\c :prevdb
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
...@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); ...@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
-- ensure we prevent duplicate setup -- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
BEGIN; BEGIN;
-- setup transaction origin -- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
......
...@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
} }
} }
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
/* /*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/ */
...@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record; XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r); TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot; Snapshot snapshot;
xl_logical_message *message; xl_logical_message *message;
...@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r); message = (xl_logical_message *) XLogRecGetData(r);
if (message->dbId != ctx->slot->data.database ||
FilterByOrigin(ctx, origin_id))
return;
if (message->transactional && if (message->transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr)) !SnapBuildProcessChange(builder, xid, buf->origptr))
return; return;
...@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size); message->message + message->prefix_size);
} }
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
/* /*
* Consolidated commit record handling between the different form of commit * Consolidated commit record handling between the different form of commit
* records. * records.
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
...@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, ...@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
GetCurrentTransactionId(); GetCurrentTransactionId();
} }
xlrec.dbId = MyDatabaseId;
xlrec.transactional = transactional; xlrec.transactional = transactional;
xlrec.prefix_size = strlen(prefix) + 1; xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size; xlrec.message_size = size;
...@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, ...@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
XLogRegisterData((char *) prefix, xlrec.prefix_size); XLogRegisterData((char *) prefix, xlrec.prefix_size);
XLogRegisterData((char *) message, size); XLogRegisterData((char *) message, size);
/* allow origin filtering */
XLogIncludeOrigin();
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
} }
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
*/ */
typedef struct xl_logical_message typedef struct xl_logical_message
{ {
Oid dbId; /* database Oid emitted from */
bool transactional; /* is message transactional? */ bool transactional; /* is message transactional? */
Size prefix_size; /* length of prefix */ Size prefix_size; /* length of prefix */
Size message_size; /* size of the message */ Size message_size; /* size of the message */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment