Commit 5a991ef8 authored by Robert Haas's avatar Robert Haas

Allow logical decoding via the walsender interface.

In order for this to work, walsenders need the optional ability to
connect to a database, so the "replication" keyword now allows true
or false, for backward-compatibility, and the new value "database"
(which causes the "dbname" parameter to be respected).

walsender needs to loop not only when idle but also when sending
decoded data to the user and when waiting for more xlog data to decode.
This means that there are now three separate loops inside walsender.c;
although some refactoring has been done here, this is still a bit ugly.

Andres Freund, with contributions from Álvaro Herrera, and further
review by me.
parent cb9a0c79
...@@ -1302,10 +1302,13 @@ ...@@ -1302,10 +1302,13 @@
<para> <para>
To initiate streaming replication, the frontend sends the To initiate streaming replication, the frontend sends the
<literal>replication</> parameter in the startup message. This tells the <literal>replication</> parameter in the startup message. A boolean value
backend to go into walsender mode, wherein a small set of replication commands of <literal>true</> tells the backend to go into walsender mode, wherein a
can be issued instead of SQL statements. Only the simple query protocol can be small set of replication commands can be issued instead of SQL statements. Only
used in walsender mode. the simple query protocol can be used in walsender mode.
Passing <literal>database</> as the value instructs walsender to connect to
the database specified in the <literal>dbname</> parameter, which will allow
the connection to be used for logical replication from that database.
The commands accepted in walsender mode are: The commands accepted in walsender mode are:
...@@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are: ...@@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are:
<listitem> <listitem>
<para> <para>
Requests the server to identify itself. Server replies with a result Requests the server to identify itself. Server replies with a result
set of a single row, containing three fields: set of a single row, containing four fields:
</para> </para>
<para> <para>
...@@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are: ...@@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are:
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term>
dbname
</term>
<listitem>
<para>
Database connected to or NULL.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</para> </para>
</listitem> </listitem>
......
...@@ -1884,10 +1884,23 @@ retry1: ...@@ -1884,10 +1884,23 @@ retry1:
port->cmdline_options = pstrdup(valptr); port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0) else if (strcmp(nameptr, "replication") == 0)
{ {
if (!parse_bool(valptr, &am_walsender)) /*
* Due to backward compatibility concerns the replication
* parameter is a hybrid beast which allows the value to be
* either boolean or the string 'database'. The latter
* connects to a specific database which is e.g. required for
* logical decoding while.
*/
if (strcmp(valptr, "database") == 0)
{
am_walsender = true;
am_db_walsender = true;
}
else if (!parse_bool(valptr, &am_walsender))
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"replication\""))); errmsg("invalid value for parameter \"replication\""),
errhint("Valid values are: false, 0, true, 1, database.")));
} }
else else
{ {
...@@ -1968,8 +1981,15 @@ retry1: ...@@ -1968,8 +1981,15 @@ retry1:
if (strlen(port->user_name) >= NAMEDATALEN) if (strlen(port->user_name) >= NAMEDATALEN)
port->user_name[NAMEDATALEN - 1] = '\0'; port->user_name[NAMEDATALEN - 1] = '\0';
/* Walsender is not related to a particular database */ /*
if (am_walsender) * Normal walsender backends, e.g. for streaming replication, are not
* connected to a particular database. But walsenders used for logical
* replication need to connect to a specific database. We allow streaming
* replication commands to be issued even if connected to a database as it
* can make sense to first make a basebackup and then stream changes
* starting from that.
*/
if (am_walsender && !am_db_walsender)
port->database_name[0] = '\0'; port->database_name[0] = '\0';
/* /*
......
...@@ -131,7 +131,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) ...@@ -131,7 +131,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
"the primary server: %s", "the primary server: %s",
PQerrorMessage(streamConn)))); PQerrorMessage(streamConn))));
} }
if (PQnfields(res) != 3 || PQntuples(res) != 1) if (PQnfields(res) < 3 || PQntuples(res) != 1)
{ {
int ntuples = PQntuples(res); int ntuples = PQntuples(res);
int nfields = PQnfields(res); int nfields = PQnfields(res);
...@@ -139,8 +139,8 @@ libpqrcv_identify_system(TimeLineID *primary_tli) ...@@ -139,8 +139,8 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("invalid response from primary server"), (errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", errdetail("Could not identify system: Got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields))); ntuples, nfields, 3, 1)));
} }
primary_sysid = PQgetvalue(res, 0, 0); primary_sysid = PQgetvalue(res, 0, 0);
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
......
...@@ -73,13 +73,17 @@ Node *replication_parse_result; ...@@ -73,13 +73,17 @@ Node *replication_parse_result;
%token K_WAL %token K_WAL
%token K_TIMELINE %token K_TIMELINE
%token K_PHYSICAL %token K_PHYSICAL
%token K_LOGICAL
%token K_SLOT %token K_SLOT
%type <node> command %type <node> command
%type <node> base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history %type <node> base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system timeline_history
%type <list> base_backup_opt_list %type <list> base_backup_opt_list
%type <defelt> base_backup_opt %type <defelt> base_backup_opt
%type <uintval> opt_timeline %type <uintval> opt_timeline
%type <list> plugin_options plugin_opt_list
%type <defelt> plugin_opt_elem
%type <node> plugin_opt_arg
%type <str> opt_slot %type <str> opt_slot
%% %%
...@@ -98,6 +102,7 @@ command: ...@@ -98,6 +102,7 @@ command:
identify_system identify_system
| base_backup | base_backup
| start_replication | start_replication
| start_logical_replication
| create_replication_slot | create_replication_slot
| drop_replication_slot | drop_replication_slot
| timeline_history | timeline_history
...@@ -165,8 +170,8 @@ base_backup_opt: ...@@ -165,8 +170,8 @@ base_backup_opt:
} }
; ;
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
create_replication_slot: create_replication_slot:
/* CREATE_REPLICATION_SLOT slot PHYSICAL */
K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
{ {
CreateReplicationSlotCmd *cmd; CreateReplicationSlotCmd *cmd;
...@@ -175,9 +180,19 @@ create_replication_slot: ...@@ -175,9 +180,19 @@ create_replication_slot:
cmd->slotname = $2; cmd->slotname = $2;
$$ = (Node *) cmd; $$ = (Node *) cmd;
} }
/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
cmd->plugin = $4;
$$ = (Node *) cmd;
}
; ;
/* DROP_REPLICATION_SLOT SLOT slot */ /* DROP_REPLICATION_SLOT slot */
drop_replication_slot: drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT K_DROP_REPLICATION_SLOT IDENT
{ {
...@@ -205,19 +220,19 @@ start_replication: ...@@ -205,19 +220,19 @@ start_replication:
} }
; ;
opt_timeline: /* START_REPLICATION SLOT slot LOGICAL %X/%X options */
K_TIMELINE UCONST start_logical_replication:
K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
{ {
if ($2 <= 0) StartReplicationCmd *cmd;
ereport(ERROR, cmd = makeNode(StartReplicationCmd);
(errcode(ERRCODE_SYNTAX_ERROR), cmd->kind = REPLICATION_KIND_LOGICAL;;
(errmsg("invalid timeline %u", $2)))); cmd->slotname = $3;
$$ = $2; cmd->startpoint = $5;
cmd->options = $6;
$$ = (Node *) cmd;
} }
| /* EMPTY */
{ $$ = 0; }
; ;
/* /*
* TIMELINE_HISTORY %d * TIMELINE_HISTORY %d
*/ */
...@@ -250,6 +265,46 @@ opt_slot: ...@@ -250,6 +265,46 @@ opt_slot:
{ $$ = NULL; } { $$ = NULL; }
; ;
opt_timeline:
K_TIMELINE UCONST
{
if ($2 <= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg("invalid timeline %u", $2))));
$$ = $2;
}
| /* EMPTY */ { $$ = 0; }
;
plugin_options:
'(' plugin_opt_list ')' { $$ = $2; }
| /* EMPTY */ { $$ = NIL; }
;
plugin_opt_list:
plugin_opt_elem
{
$$ = list_make1($1);
}
| plugin_opt_list ',' plugin_opt_elem
{
$$ = lappend($1, $3);
}
;
plugin_opt_elem:
IDENT plugin_opt_arg
{
$$ = makeDefElem($1, $2);
}
;
plugin_opt_arg:
SCONST { $$ = (Node *) makeString($1); }
| /* EMPTY */ { $$ = NULL; }
;
%% %%
#include "repl_scanner.c" #include "repl_scanner.c"
...@@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } ...@@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; } PHYSICAL { return K_PHYSICAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; } SLOT { return K_SLOT; }
"," { return ','; } "," { return ','; }
......
This diff is collapsed.
...@@ -729,11 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ...@@ -729,11 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS), (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("remaining connection slots are reserved for non-replication superuser connections"))); errmsg("remaining connection slots are reserved for non-replication superuser connections")));
/* /* Check replication permissions needed for walsender processes. */
* If walsender, we don't want to connect to any particular database. Just
* finish the backend startup by processing any options from the startup
* packet, and we're done.
*/
if (am_walsender) if (am_walsender)
{ {
Assert(!bootstrap); Assert(!bootstrap);
...@@ -742,7 +738,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ...@@ -742,7 +738,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or replication role to start walsender"))); errmsg("must be superuser or replication role to start walsender")));
}
/*
* If this is a plain walsender only supporting physical replication, we
* don't want to connect to any particular database. Just finish the
* backend startup by processing any options from the startup packet, and
* we're done.
*/
if (am_walsender && !am_db_walsender)
{
/* process any options passed in the startup packet */ /* process any options passed in the startup packet */
if (MyProcPort != NULL) if (MyProcPort != NULL)
process_startup_options(MyProcPort, am_superuser); process_startup_options(MyProcPort, am_superuser);
......
...@@ -1639,10 +1639,10 @@ BaseBackup(void) ...@@ -1639,10 +1639,10 @@ BaseBackup(void)
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1 || PQnfields(res) != 3) if (PQntuples(res) != 1 || PQnfields(res) < 3)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3); progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
......
...@@ -275,10 +275,10 @@ StreamLog(void) ...@@ -275,10 +275,10 @@ StreamLog(void)
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1 || PQnfields(res) != 3) if (PQntuples(res) != 1 || PQnfields(res) < 3)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3); progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
......
...@@ -563,10 +563,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -563,10 +563,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res); PQclear(res);
return false; return false;
} }
if (PQnfields(res) != 3 || PQntuples(res) != 1) if (PQntuples(res) != 1 || PQnfields(res) < 3)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3); progname, PQntuples(res), PQnfields(res), 1, 3);
PQclear(res); PQclear(res);
return false; return false;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
/* global state */ /* global state */
extern bool am_walsender; extern bool am_walsender;
extern bool am_cascading_walsender; extern bool am_cascading_walsender;
extern bool am_db_walsender;
extern bool wake_wal_senders; extern bool wake_wal_senders;
/* user-settable parameters */ /* user-settable parameters */
......
...@@ -1909,6 +1909,7 @@ WalRcvData ...@@ -1909,6 +1909,7 @@ WalRcvData
WalRcvState WalRcvState
WalSnd WalSnd
WalSndCtlData WalSndCtlData
WalSndSendDataCallback
WalSndState WalSndState
WholeRowVarExprState WholeRowVarExprState
WindowAgg WindowAgg
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment