Commit 26ad194c authored by Peter Eisentraut's avatar Peter Eisentraut

Support configuration reload in logical replication workers

Author: Michael Paquier <michael.paquier@gmail.com>
Reviewed-by: default avatarPetr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: default avatarFujii Masao <masao.fujii@gmail.com>
parent c0a8ae7b
...@@ -75,7 +75,10 @@ LogicalRepCtxStruct *LogicalRepCtx; ...@@ -75,7 +75,10 @@ LogicalRepCtxStruct *LogicalRepCtx;
static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void); static void logicalrep_worker_detach(void);
bool got_SIGTERM = false; /* Flags set by signal handlers */
volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false; static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
...@@ -495,10 +498,28 @@ logicalrep_worker_onexit(int code, Datum arg) ...@@ -495,10 +498,28 @@ logicalrep_worker_onexit(int code, Datum arg)
void void
logicalrep_worker_sigterm(SIGNAL_ARGS) logicalrep_worker_sigterm(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGTERM = true; got_SIGTERM = true;
/* Waken anything waiting on the process latch */ /* Waken anything waiting on the process latch */
SetLatch(MyLatch); SetLatch(MyLatch);
errno = save_errno;
}
/* SIGHUP: set flag to reload configuration at next convenient time */
void
logicalrep_worker_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGHUP = true;
/* Waken anything waiting on the process latch */
SetLatch(MyLatch);
errno = save_errno;
} }
/* /*
...@@ -637,6 +658,7 @@ ApplyLauncherMain(Datum main_arg) ...@@ -637,6 +658,7 @@ ApplyLauncherMain(Datum main_arg)
(errmsg("logical replication launcher started"))); (errmsg("logical replication launcher started")));
/* Establish signal handlers. */ /* Establish signal handlers. */
pqsignal(SIGHUP, logicalrep_worker_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm); pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
...@@ -728,6 +750,12 @@ ApplyLauncherMain(Datum main_arg) ...@@ -728,6 +750,12 @@ ApplyLauncherMain(Datum main_arg)
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
proc_exit(1); proc_exit(1);
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
ResetLatch(&MyProc->procLatch); ResetLatch(&MyProc->procLatch);
} }
......
...@@ -1138,6 +1138,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1138,6 +1138,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
proc_exit(1); proc_exit(1);
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
if (rc & WL_TIMEOUT) if (rc & WL_TIMEOUT)
{ {
/* /*
...@@ -1441,6 +1447,7 @@ ApplyWorkerMain(Datum main_arg) ...@@ -1441,6 +1447,7 @@ ApplyWorkerMain(Datum main_arg)
logicalrep_worker_attach(worker_slot); logicalrep_worker_attach(worker_slot);
/* Setup signal handling */ /* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm); pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
......
...@@ -56,7 +56,8 @@ extern Subscription *MySubscription; ...@@ -56,7 +56,8 @@ extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker; extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction; extern bool in_remote_transaction;
extern bool got_SIGTERM; extern volatile sig_atomic_t got_SIGHUP;
extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot); extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
...@@ -69,6 +70,7 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); ...@@ -69,6 +70,7 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid); extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sighup(SIGNAL_ARGS);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS); extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn); void process_syncing_tables(XLogRecPtr current_lsn);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment