Commit eb4da3e3 authored by Peter Eisentraut's avatar Peter Eisentraut

Add option to control snapshot export to CREATE_REPLICATION_SLOT

We used to export snapshots unconditionally in CREATE_REPLICATION_SLOT
in the replication protocol, but several upcoming patches want more
control over what happens.

Suppress snapshot export in pg_recvlogical, which neither needs nor can
use the exported snapshot.  Since snapshot exporting can fail this
improves reliability.

This also paves the way for allowing the creation of replication slots
on standbys, which cannot export snapshots because they cannot allocate
new XIDs.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
parent 71504026
...@@ -271,8 +271,9 @@ $ pg_recvlogical -d postgres --slot test --drop-slot ...@@ -271,8 +271,9 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
<sect2> <sect2>
<title>Exported Snapshots</title> <title>Exported Snapshots</title>
<para> <para>
When a new replication slot is created using the streaming replication interface, When a new replication slot is created using the streaming replication
a snapshot is exported interface (see <xref linkend="protocol-replication-create-slot">), a
snapshot is exported
(see <xref linkend="functions-snapshot-synchronization">), which will show (see <xref linkend="functions-snapshot-synchronization">), which will show
exactly the state of the database after which all changes will be exactly the state of the database after which all changes will be
included in the change stream. This can be used to create a new replica by included in the change stream. This can be used to create a new replica by
...@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot ...@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
database's state at that point in time, which afterwards can be updated database's state at that point in time, which afterwards can be updated
using the slot's contents without losing any changes. using the slot's contents without losing any changes.
</para> </para>
<para>
Creation of a snapshot is not always possible. In particular, it will
fail when connected to a hot standby. Applications that do not require
snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</>
option.
</para>
</sect2> </sect2>
</sect1> </sect1>
......
...@@ -1486,8 +1486,8 @@ The commands accepted in walsender mode are: ...@@ -1486,8 +1486,8 @@ The commands accepted in walsender mode are:
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> } <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ] }
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm> <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term> </term>
<listitem> <listitem>
...@@ -1538,6 +1538,21 @@ The commands accepted in walsender mode are: ...@@ -1538,6 +1538,21 @@ The commands accepted in walsender mode are:
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>EXPORT_SNAPSHOT</></term>
<term><literal>NOEXPORT_SNAPSHOT</></term>
<listitem>
<para>
Decides what to do with the snapshot created during logical slot
initialization. <literal>EXPORT_SNAPSHOT</>, which is the default,
will export the snapshot for use in other sessions. This option can't
be used inside a transaction. <literal>NOEXPORT_SNAPSHOT</> will
just use the snapshot for logical decoding as normal but won't do
anything else with it.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
<para> <para>
......
...@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
PG_TRY(); PG_TRY();
{ {
walrcv_create_slot(wrconn, slotname, false, &lsn); /*
* Create permanent slot for the subscription. We won't use the
* initial snapshot for anything, so no need to export it.
*/
walrcv_create_slot(wrconn, slotname, false, false, &lsn);
ereport(NOTICE, ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher", (errmsg("created replication slot \"%s\" on publisher",
slotname))); slotname)));
......
...@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, ...@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
static char *libpqrcv_create_slot(WalReceiverConn *conn, static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname, const char *slotname,
bool temporary, bool temporary,
bool export_snapshot,
XLogRecPtr *lsn); XLogRecPtr *lsn);
static bool libpqrcv_command(WalReceiverConn *conn, static bool libpqrcv_command(WalReceiverConn *conn,
const char *cmd, char **err); const char *cmd, char **err);
...@@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) ...@@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/ */
static char * static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, XLogRecPtr *lsn) bool temporary, bool export_snapshot, XLogRecPtr *lsn)
{ {
PGresult *res; PGresult *res;
StringInfoData cmd; StringInfoData cmd;
...@@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, ...@@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
initStringInfo(&cmd); initStringInfo(&cmd);
appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname); appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
if (temporary) if (temporary)
appendStringInfo(&cmd, "TEMPORARY "); appendStringInfo(&cmd, " TEMPORARY");
if (conn->logical) if (conn->logical)
appendStringInfo(&cmd, "LOGICAL pgoutput"); {
appendStringInfo(&cmd, " LOGICAL pgoutput");
if (export_snapshot)
appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
else
appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
}
res = libpqrcv_PQexec(conn->streamConn, cmd.data); res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data); pfree(cmd.data);
......
...@@ -79,6 +79,8 @@ Node *replication_parse_result; ...@@ -79,6 +79,8 @@ Node *replication_parse_result;
%token K_SLOT %token K_SLOT
%token K_RESERVE_WAL %token K_RESERVE_WAL
%token K_TEMPORARY %token K_TEMPORARY
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%type <node> command %type <node> command
%type <node> base_backup start_replication start_logical_replication %type <node> base_backup start_replication start_logical_replication
...@@ -91,7 +93,9 @@ Node *replication_parse_result; ...@@ -91,7 +93,9 @@ Node *replication_parse_result;
%type <defelt> plugin_opt_elem %type <defelt> plugin_opt_elem
%type <node> plugin_opt_arg %type <node> plugin_opt_arg
%type <str> opt_slot var_name %type <str> opt_slot var_name
%type <boolval> opt_reserve_wal opt_temporary %type <boolval> opt_temporary
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
%% %%
...@@ -202,18 +206,18 @@ base_backup_opt: ...@@ -202,18 +206,18 @@ base_backup_opt:
create_replication_slot: create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
{ {
CreateReplicationSlotCmd *cmd; CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd); cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2; cmd->slotname = $2;
cmd->temporary = $3; cmd->temporary = $3;
cmd->reserve_wal = $5; cmd->options = $5;
$$ = (Node *) cmd; $$ = (Node *) cmd;
} }
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
{ {
CreateReplicationSlotCmd *cmd; CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd); cmd = makeNode(CreateReplicationSlotCmd);
...@@ -221,10 +225,36 @@ create_replication_slot: ...@@ -221,10 +225,36 @@ create_replication_slot:
cmd->slotname = $2; cmd->slotname = $2;
cmd->temporary = $3; cmd->temporary = $3;
cmd->plugin = $5; cmd->plugin = $5;
cmd->options = $6;
$$ = (Node *) cmd; $$ = (Node *) cmd;
} }
; ;
create_slot_opt_list:
create_slot_opt_list create_slot_opt
{ $$ = lappend($1, $2); }
| /* EMPTY */
{ $$ = NIL; }
;
create_slot_opt:
K_EXPORT_SNAPSHOT
{
$$ = makeDefElem("export_snapshot",
(Node *)makeInteger(TRUE), -1);
}
| K_NOEXPORT_SNAPSHOT
{
$$ = makeDefElem("export_snapshot",
(Node *)makeInteger(FALSE), -1);
}
| K_RESERVE_WAL
{
$$ = makeDefElem("reserve_wal",
(Node *)makeInteger(TRUE), -1);
}
;
/* DROP_REPLICATION_SLOT slot */ /* DROP_REPLICATION_SLOT slot */
drop_replication_slot: drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT K_DROP_REPLICATION_SLOT IDENT
...@@ -291,11 +321,6 @@ opt_physical: ...@@ -291,11 +321,6 @@ opt_physical:
| /* EMPTY */ | /* EMPTY */
; ;
opt_reserve_wal:
K_RESERVE_WAL { $$ = true; }
| /* EMPTY */ { $$ = false; }
;
opt_temporary: opt_temporary:
K_TEMPORARY { $$ = true; } K_TEMPORARY { $$ = true; }
| /* EMPTY */ { $$ = false; } | /* EMPTY */ { $$ = false; }
......
...@@ -100,6 +100,8 @@ RESERVE_WAL { return K_RESERVE_WAL; } ...@@ -100,6 +100,8 @@ RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; } LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; } SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; } TEMPORARY { return K_TEMPORARY; }
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
"," { return ','; } "," { return ','; }
";" { return ';'; } ";" { return ';'; }
......
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
...@@ -737,6 +738,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -737,6 +738,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
return count; return count;
} }
/*
* Process extra options given to CREATE_REPLICATION_SLOT.
*/
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
bool *export_snapshot)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
/* Parse options */
foreach (lc, cmd->options)
{
DefElem *defel = (DefElem *) lfirst(lc);
if (strcmp(defel->defname, "export_snapshot") == 0)
{
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
snapshot_action_given = true;
*export_snapshot = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "reserve_wal") == 0)
{
if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
reserve_wal_given = true;
*reserve_wal = true;
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
}
/* /*
* Create a new replication slot. * Create a new replication slot.
*/ */
...@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
const char *snapshot_name = NULL; const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN]; char xpos[MAXFNAMELEN];
char *slot_name; char *slot_name;
bool reserve_wal = false;
bool export_snapshot = true;
DestReceiver *dest; DestReceiver *dest;
TupOutputState *tstate; TupOutputState *tstate;
TupleDesc tupdesc; TupleDesc tupdesc;
...@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot); Assert(!MyReplicationSlot);
parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
/* setup state for XLogReadPage */ /* setup state for XLogReadPage */
sendTimeLineIsHistoric = false; sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID; sendTimeLine = ThisTimeLineID;
...@@ -799,9 +846,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -799,9 +846,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
DecodingContextFindStartpoint(ctx); DecodingContextFindStartpoint(ctx);
/* /*
* Export a plain (not of the snapbuild.c type) snapshot to the user * Export the snapshot if we've been asked to do so.
* that can be imported into another session. *
* NB. We will convert the snapbuild.c kind of snapshot to normal
* snapshot when doing this.
*/ */
if (export_snapshot)
snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
/* don't need the decoding context anymore */ /* don't need the decoding context anymore */
...@@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (!cmd->temporary) if (!cmd->temporary)
ReplicationSlotPersist(); ReplicationSlotPersist();
} }
else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal) else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
{ {
ReplicationSlotReserveWal(); ReplicationSlotReserveWal();
......
...@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, ...@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
slot_name); slot_name);
else else
{
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
slot_name, plugin); slot_name, plugin);
if (PQserverVersion(conn) >= 100000)
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
}
res = PQexec(conn, query->data); res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
......
...@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd ...@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd
ReplicationKind kind; ReplicationKind kind;
char *plugin; char *plugin;
bool temporary; bool temporary;
bool reserve_wal; List *options;
} CreateReplicationSlotCmd; } CreateReplicationSlotCmd;
......
...@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, ...@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes); int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary, const char *slotname, bool temporary,
XLogRecPtr *lsn); bool export_snapshot, XLogRecPtr *lsn);
typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
char **err); char **err);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
...@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; ...@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \ #define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, lsn) \ #define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn) WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
#define walrcv_command(conn, cmd, err) \ #define walrcv_command(conn, cmd, err) \
WalReceiverFunctions->walrcv_command(conn, cmd, err) WalReceiverFunctions->walrcv_command(conn, cmd, err)
#define walrcv_disconnect(conn) \ #define walrcv_disconnect(conn) \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment