Commit 9fcf670c authored by Peter Eisentraut's avatar Peter Eisentraut

Fix signal handling in logical replication workers

The logical replication worker processes now use the normal die()
handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code.
One problem before was that the apply worker would not exit promptly
when a subscription was dropped, which could lead to deadlocks.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: default avatarMasahiko Sawada <sawada.mshk@gmail.com>
parent acbd8375
...@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void); ...@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker); static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */ /* Flags set by signal handlers */
volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t got_SIGTERM = false; static volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false; static bool on_commit_launcher_wakeup = false;
...@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg) ...@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
} }
/* SIGTERM: set flag to exit at next convenient time */ /* SIGTERM: set flag to exit at next convenient time */
void static void
logicalrep_worker_sigterm(SIGNAL_ARGS) logicalrep_launcher_sigterm(SIGNAL_ARGS)
{ {
int save_errno = errno; int save_errno = errno;
...@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) ...@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
} }
/* SIGHUP: set flag to reload configuration at next convenient time */ /* SIGHUP: set flag to reload configuration at next convenient time */
void static void
logicalrep_worker_sighup(SIGNAL_ARGS) logicalrep_launcher_sighup(SIGNAL_ARGS)
{ {
int save_errno = errno; int save_errno = errno;
...@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg) ...@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
/* Establish signal handlers. */ /* Establish signal handlers. */
pqsignal(SIGHUP, logicalrep_worker_sighup); pqsignal(SIGHUP, logicalrep_launcher_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm); pqsignal(SIGTERM, logicalrep_launcher_sigterm);
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
/* Make it easy to identify our processes. */ /* Make it easy to identify our processes. */
......
...@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate) ...@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
int rc; int rc;
char state = origstate; char state = origstate;
while (!got_SIGTERM) for (;;)
{ {
LogicalRepWorker *worker; LogicalRepWorker *worker;
CHECK_FOR_INTERRUPTS();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid, worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
relid, false); relid, false);
...@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread) ...@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
bytesread += avail; bytesread += avail;
} }
while (!got_SIGTERM && maxread > 0 && bytesread < minread) while (maxread > 0 && bytesread < minread)
{ {
pgsocket fd = PGINVALID_SOCKET; pgsocket fd = PGINVALID_SOCKET;
int rc; int rc;
...@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread) ...@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
ResetLatch(&MyProc->procLatch); ResetLatch(&MyProc->procLatch);
} }
/* Check for exit condition. */
if (got_SIGTERM)
proc_exit(0);
return bytesread; return bytesread;
} }
......
...@@ -72,6 +72,8 @@ ...@@ -72,6 +72,8 @@
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/catcache.h" #include "utils/catcache.h"
#include "utils/datum.h" #include "utils/datum.h"
...@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn); ...@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void); static void reread_subscription(void);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
/* /*
* Should this worker apply changes for given relation. * Should this worker apply changes for given relation.
* *
...@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */ /* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
while (!got_SIGTERM) for (;;)
{ {
pgsocket fd = PGINVALID_SOCKET; pgsocket fd = PGINVALID_SOCKET;
int rc; int rc;
...@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp(); TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false; bool ping_sent = false;
CHECK_FOR_INTERRUPTS();
MemoryContextSwitchTo(ApplyMessageContext); MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(wrconn, &buf, &fd);
...@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) ...@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false; MySubscriptionValid = false;
} }
/* SIGHUP: set flag to reload configuration at next convenient time */
static void
logicalrep_worker_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGHUP = true;
/* Waken anything waiting on the process latch */
SetLatch(MyLatch);
errno = save_errno;
}
/* Logical Replication Apply worker entry point */ /* Logical Replication Apply worker entry point */
void void
...@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg) ...@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
/* Setup signal handling */ /* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup); pqsignal(SIGHUP, logicalrep_worker_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm); pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */ /* Initialise stats to a sanish value */
...@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg) ...@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
/* Run the main loop. */ /* Run the main loop. */
LogicalRepApplyLoop(origin_startpos); LogicalRepApplyLoop(origin_startpos);
/* We should only get here if we received SIGTERM */
proc_exit(0); proc_exit(0);
} }
/*
* Is current process a logical replication worker?
*/
bool
IsLogicalWorker(void)
{
return MyLogicalRepWorker != NULL;
}
...@@ -55,6 +55,7 @@ ...@@ -55,6 +55,7 @@
#include "pg_getopt.h" #include "pg_getopt.h"
#include "postmaster/autovacuum.h" #include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "replication/logicalworker.h"
#include "replication/slot.h" #include "replication/slot.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteHandler.h"
...@@ -2845,6 +2846,10 @@ ProcessInterrupts(void) ...@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN), (errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command"))); errmsg("terminating autovacuum process due to administrator command")));
else if (IsLogicalWorker())
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating logical replication worker due to administrator command")));
else if (RecoveryConflictPending && RecoveryConflictRetryable) else if (RecoveryConflictPending && RecoveryConflictRetryable)
{ {
pgstat_report_recovery_conflict(RecoveryConflictReason); pgstat_report_recovery_conflict(RecoveryConflictReason);
......
...@@ -14,4 +14,6 @@ ...@@ -14,4 +14,6 @@
extern void ApplyWorkerMain(Datum main_arg); extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
#endif /* LOGICALWORKER_H */ #endif /* LOGICALWORKER_H */
...@@ -67,8 +67,6 @@ extern Subscription *MySubscription; ...@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker; extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction; extern bool in_remote_transaction;
extern volatile sig_atomic_t got_SIGHUP;
extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot); extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
...@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); ...@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid); extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sighup(SIGNAL_ARGS);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn); void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid, void invalidate_syncing_table_states(Datum arg, int cacheid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment