Commit c40489e4 authored by Alvaro Herrera's avatar Alvaro Herrera

Fix logical replication slot initialization

This was broken in commit 9c7d06d6, which inadvertently gave the
wrong value to fast_forward in one StartupDecodingContext call.  Fix by
flipping the value.  Add a test for the obvious error, namely trying to
initialize a replication slot with an nonexistent output plugin.

While at it, move the CreateDecodingContext call earlier, so that any
errors are reported before sending the CopyBoth message.

Author: Dave Cramer <davecramer@gmail.com>
Reviewed-by: default avatarAndres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CADK3HHLVkeRe1v4P02-5hj55H3_yJg3AEtpXyEY5T3wuzO2jSg@mail.gmail.com
parent 91bc213d
...@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes ...@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
init init
(1 row) (1 row)
SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
ERROR: could not access file "nonexistent": No such file or directory
-- here we want to start a new session and wait till old one is gone -- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset select pg_backend_pid() as oldpid \gset
\c - \c -
......
...@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test ...@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
-- here we want to start a new session and wait till old one is gone -- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset select pg_backend_pid() as oldpid \gset
\c - \c -
......
...@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin, ...@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave(); ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
need_full_snapshot, true, need_full_snapshot, false,
read_page, prepare_write, do_write, read_page, prepare_write, do_write,
update_progress); update_progress);
......
...@@ -1068,6 +1068,19 @@ StartLogicalReplication(StartReplicationCmd *cmd) ...@@ -1068,6 +1068,19 @@ StartLogicalReplication(StartReplicationCmd *cmd)
got_STOPPING = true; got_STOPPING = true;
} }
/*
* Create our decoding context, making it start at the previously ack'ed
* position.
*
* Do this before sending CopyBoth, so that any errors are reported early.
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
WalSndSetState(WALSNDSTATE_CATCHUP); WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */ /* Send a CopyBothResponse message, and start streaming */
...@@ -1077,16 +1090,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) ...@@ -1077,16 +1090,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf); pq_endmessage(&buf);
pq_flush(); pq_flush();
/*
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
*/
logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
false,
logical_read_xlog_page,
WalSndPrepareWrite,
WalSndWriteData,
WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */ /* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn; logical_startptr = MyReplicationSlot->data.restart_lsn;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment