Commit fd5942c1 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Use the regular main processing loop also in walsenders.

The regular backend's main loop handles signal handling and error recovery
better than the current WAL sender command loop does. For example, if the
client hangs and a SIGTERM is received before starting streaming, the
walsender will now terminate immediately, rather than hang until the
connection times out.
parent 1997f34d
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "replication/basebackup.h" #include "replication/basebackup.h"
#include "replication/walsender.h" #include "replication/walsender.h"
...@@ -30,7 +31,6 @@ ...@@ -30,7 +31,6 @@
#include "storage/ipc.h" #include "storage/ipc.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h" #include "utils/elog.h"
#include "utils/memutils.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
typedef struct typedef struct
...@@ -370,19 +370,10 @@ void ...@@ -370,19 +370,10 @@ void
SendBaseBackup(BaseBackupCmd *cmd) SendBaseBackup(BaseBackupCmd *cmd)
{ {
DIR *dir; DIR *dir;
MemoryContext backup_context;
MemoryContext old_context;
basebackup_options opt; basebackup_options opt;
parse_basebackup_options(cmd->options, &opt); parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext,
"Streaming base backup context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(backup_context);
WalSndSetState(WALSNDSTATE_BACKUP); WalSndSetState(WALSNDSTATE_BACKUP);
if (update_process_title) if (update_process_title)
...@@ -403,9 +394,6 @@ SendBaseBackup(BaseBackupCmd *cmd) ...@@ -403,9 +394,6 @@ SendBaseBackup(BaseBackupCmd *cmd)
perform_base_backup(&opt, dir); perform_base_backup(&opt, dir);
FreeDir(dir); FreeDir(dir);
MemoryContextSwitchTo(old_context);
MemoryContextDelete(backup_context);
} }
static void static void
...@@ -606,7 +594,7 @@ sendDir(char *path, int basepathlen, bool sizeonly) ...@@ -606,7 +594,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
* error in that case. The error handler further up will call * error in that case. The error handler further up will call
* do_pg_abort_backup() for us. * do_pg_abort_backup() for us.
*/ */
if (walsender_shutdown_requested || walsender_ready_to_stop) if (ProcDiePending || walsender_ready_to_stop)
ereport(ERROR, ereport(ERROR,
(errmsg("shutdown requested, aborting active base backup"))); (errmsg("shutdown requested, aborting active base backup")));
......
This diff is collapsed.
...@@ -192,6 +192,7 @@ static int InteractiveBackend(StringInfo inBuf); ...@@ -192,6 +192,7 @@ static int InteractiveBackend(StringInfo inBuf);
static int interactive_getc(void); static int interactive_getc(void);
static int SocketBackend(StringInfo inBuf); static int SocketBackend(StringInfo inBuf);
static int ReadCommand(StringInfo inBuf); static int ReadCommand(StringInfo inBuf);
static void forbidden_in_wal_sender(char firstchar);
static List *pg_rewrite_query(Query *query); static List *pg_rewrite_query(Query *query);
static bool check_log_statement(List *stmt_list); static bool check_log_statement(List *stmt_list);
static int errdetail_execute(List *raw_parsetree_list); static int errdetail_execute(List *raw_parsetree_list);
...@@ -3720,12 +3721,9 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3720,12 +3721,9 @@ PostgresMain(int argc, char *argv[], const char *username)
if (IsUnderPostmaster && Log_disconnections) if (IsUnderPostmaster && Log_disconnections)
on_proc_exit(log_disconnections, 0); on_proc_exit(log_disconnections, 0);
/* If this is a WAL sender process, we're done with initialization. */ /* Perform initialization specific to a WAL sender process. */
if (am_walsender) if (am_walsender)
{ InitWalSender();
WalSenderMain(); /* does not return */
abort();
}
/* /*
* process any libraries that should be preloaded at backend start (this * process any libraries that should be preloaded at backend start (this
...@@ -3835,6 +3833,9 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3835,6 +3833,9 @@ PostgresMain(int argc, char *argv[], const char *username)
*/ */
AbortCurrentTransaction(); AbortCurrentTransaction();
if (am_walsender)
WalSndErrorCleanup();
/* /*
* Now return to normal top-level context and clear ErrorContext for * Now return to normal top-level context and clear ErrorContext for
* next time. * next time.
...@@ -3969,7 +3970,10 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3969,7 +3970,10 @@ PostgresMain(int argc, char *argv[], const char *username)
query_string = pq_getmsgstring(&input_message); query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message); pq_getmsgend(&input_message);
exec_simple_query(query_string); if (am_walsender)
exec_replication_command(query_string);
else
exec_simple_query(query_string);
send_ready_for_query = true; send_ready_for_query = true;
} }
...@@ -3982,6 +3986,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -3982,6 +3986,8 @@ PostgresMain(int argc, char *argv[], const char *username)
int numParams; int numParams;
Oid *paramTypes = NULL; Oid *paramTypes = NULL;
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */ /* Set statement_timestamp() */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
...@@ -4004,6 +4010,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4004,6 +4010,8 @@ PostgresMain(int argc, char *argv[], const char *username)
break; break;
case 'B': /* bind */ case 'B': /* bind */
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */ /* Set statement_timestamp() */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
...@@ -4019,6 +4027,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4019,6 +4027,8 @@ PostgresMain(int argc, char *argv[], const char *username)
const char *portal_name; const char *portal_name;
int max_rows; int max_rows;
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */ /* Set statement_timestamp() */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
...@@ -4031,6 +4041,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4031,6 +4041,8 @@ PostgresMain(int argc, char *argv[], const char *username)
break; break;
case 'F': /* fastpath function call */ case 'F': /* fastpath function call */
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */ /* Set statement_timestamp() */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
...@@ -4078,6 +4090,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4078,6 +4090,8 @@ PostgresMain(int argc, char *argv[], const char *username)
int close_type; int close_type;
const char *close_target; const char *close_target;
forbidden_in_wal_sender(firstchar);
close_type = pq_getmsgbyte(&input_message); close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message); close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message); pq_getmsgend(&input_message);
...@@ -4120,6 +4134,8 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4120,6 +4134,8 @@ PostgresMain(int argc, char *argv[], const char *username)
int describe_type; int describe_type;
const char *describe_target; const char *describe_target;
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() (needed for xact) */ /* Set statement_timestamp() (needed for xact) */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
...@@ -4201,6 +4217,29 @@ PostgresMain(int argc, char *argv[], const char *username) ...@@ -4201,6 +4217,29 @@ PostgresMain(int argc, char *argv[], const char *username)
} /* end of input-reading loop */ } /* end of input-reading loop */
} }
/*
* Throw an error if we're a WAL sender process.
*
* This is used to forbid anything else than simple query protocol messages
* in a WAL sender process. 'firstchar' specifies what kind of a forbidden
* message was received, and is used to construct the error message.
*/
static void
forbidden_in_wal_sender(char firstchar)
{
if (am_walsender)
{
if (firstchar == 'F')
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("fastpath function calls not supported in a replication connection")));
else
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("extended query protocol not supported in a replication connection")));
}
}
/* /*
* Obtain platform stack depth limit (in bytes) * Obtain platform stack depth limit (in bytes)
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
/* global state */ /* global state */
extern bool am_walsender; extern bool am_walsender;
extern bool am_cascading_walsender; extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop; extern volatile sig_atomic_t walsender_ready_to_stop;
extern bool wake_wal_senders; extern bool wake_wal_senders;
...@@ -27,7 +26,9 @@ extern bool wake_wal_senders; ...@@ -27,7 +26,9 @@ extern bool wake_wal_senders;
extern int max_wal_senders; extern int max_wal_senders;
extern int replication_timeout; extern int replication_timeout;
extern void WalSenderMain(void) __attribute__((noreturn)); extern void InitWalSender(void);
extern void exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndSignals(void); extern void WalSndSignals(void);
extern Size WalSndShmemSize(void); extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void); extern void WalSndShmemInit(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment