Commit d75288fb authored by Fujii Masao's avatar Fujii Masao

Make archiver process an auxiliary process.

This commit changes WAL archiver process so that it's treated as
an auxiliary process and can use shared memory. This is an infrastructure
patch required for upcoming shared-memory based stats collector patch
series. These patch series basically need any processes including archiver
that can report the statistics to access to shared memory. Since this patch
itself is useful to simplify the code and when users monitor the status of
archiver, it's committed separately in advance.

This commit simplifies the code for WAL archiving. For example, previously
backends need to signal to archiver via postmaster when they notify
archiver that there are some WAL files to archive. On the other hand,
this commit removes that signal to postmaster and enables backends to
notify archier directly using shared latch.

Also, as the side of this change, the information about archiver process
becomes viewable at pg_stat_activity view.

Author: Kyotaro Horiguchi
Reviewed-by: Andres Freund, Álvaro Herrera, Julien Rouhaud, Tomas Vondra, Arthur Zakirov, Fujii Masao
Discussion: https://postgr.es/m/20180629.173418.190173462.horiguchi.kyotaro@lab.ntt.co.jp
parent 0ea71c93
...@@ -935,6 +935,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -935,6 +935,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<literal>logical replication worker</literal>, <literal>logical replication worker</literal>,
<literal>parallel worker</literal>, <literal>background writer</literal>, <literal>parallel worker</literal>, <literal>background writer</literal>,
<literal>client backend</literal>, <literal>checkpointer</literal>, <literal>client backend</literal>, <literal>checkpointer</literal>,
<literal>archiver</literal>,
<literal>startup</literal>, <literal>walreceiver</literal>, <literal>startup</literal>, <literal>walreceiver</literal>,
<literal>walsender</literal> and <literal>walwriter</literal>. <literal>walsender</literal> and <literal>walwriter</literal>.
In addition, background workers registered by extensions may have In addition, background workers registered by extensions may have
......
...@@ -25,11 +25,11 @@ ...@@ -25,11 +25,11 @@
#include "common/archive.h" #include "common/archive.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "postmaster/startup.h" #include "postmaster/startup.h"
#include "postmaster/pgarch.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/pmsignal.h"
/* /*
* Attempt to retrieve the specified file from off-line archival storage. * Attempt to retrieve the specified file from off-line archival storage.
...@@ -491,7 +491,7 @@ XLogArchiveNotify(const char *xlog) ...@@ -491,7 +491,7 @@ XLogArchiveNotify(const char *xlog)
/* Notify archiver that it's got something to do */ /* Notify archiver that it's got something to do */
if (IsUnderPostmaster) if (IsUnderPostmaster)
SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER); PgArchWakeup();
} }
/* /*
......
...@@ -317,6 +317,9 @@ AuxiliaryProcessMain(int argc, char *argv[]) ...@@ -317,6 +317,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
case StartupProcess: case StartupProcess:
MyBackendType = B_STARTUP; MyBackendType = B_STARTUP;
break; break;
case ArchiverProcess:
MyBackendType = B_ARCHIVER;
break;
case BgWriterProcess: case BgWriterProcess:
MyBackendType = B_BG_WRITER; MyBackendType = B_BG_WRITER;
break; break;
...@@ -437,30 +440,29 @@ AuxiliaryProcessMain(int argc, char *argv[]) ...@@ -437,30 +440,29 @@ AuxiliaryProcessMain(int argc, char *argv[])
proc_exit(1); /* should never return */ proc_exit(1); /* should never return */
case StartupProcess: case StartupProcess:
/* don't set signals, startup process has its own agenda */
StartupProcessMain(); StartupProcessMain();
proc_exit(1); /* should never return */ proc_exit(1);
case ArchiverProcess:
PgArchiverMain();
proc_exit(1);
case BgWriterProcess: case BgWriterProcess:
/* don't set signals, bgwriter has its own agenda */
BackgroundWriterMain(); BackgroundWriterMain();
proc_exit(1); /* should never return */ proc_exit(1);
case CheckpointerProcess: case CheckpointerProcess:
/* don't set signals, checkpointer has its own agenda */
CheckpointerMain(); CheckpointerMain();
proc_exit(1); /* should never return */ proc_exit(1);
case WalWriterProcess: case WalWriterProcess:
/* don't set signals, walwriter has its own agenda */
InitXLOGAccess(); InitXLOGAccess();
WalWriterMain(); WalWriterMain();
proc_exit(1); /* should never return */ proc_exit(1);
case WalReceiverProcess: case WalReceiverProcess:
/* don't set signals, walreceiver has its own agenda */
WalReceiverMain(); WalReceiverMain();
proc_exit(1); /* should never return */ proc_exit(1);
default: default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
......
...@@ -38,16 +38,13 @@ ...@@ -38,16 +38,13 @@
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h" #include "postmaster/interrupt.h"
#include "postmaster/pgarch.h" #include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
#include "storage/dsm.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/procsignal.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
...@@ -73,153 +70,99 @@ ...@@ -73,153 +70,99 @@
*/ */
#define NUM_ORPHAN_CLEANUP_RETRIES 3 #define NUM_ORPHAN_CLEANUP_RETRIES 3
/* Shared memory area for archiver process */
typedef struct PgArchData
{
int pgprocno; /* pgprocno of archiver process */
} PgArchData;
/* ---------- /* ----------
* Local data * Local data
* ---------- * ----------
*/ */
static time_t last_pgarch_start_time;
static time_t last_sigterm_time = 0; static time_t last_sigterm_time = 0;
static PgArchData *PgArch = NULL;
/* /*
* Flags set by interrupt handlers for later service in the main loop. * Flags set by interrupt handlers for later service in the main loop.
*/ */
static volatile sig_atomic_t wakened = false;
static volatile sig_atomic_t ready_to_stop = false; static volatile sig_atomic_t ready_to_stop = false;
/* ---------- /* ----------
* Local function forward declarations * Local function forward declarations
* ---------- * ----------
*/ */
#ifdef EXEC_BACKEND
static pid_t pgarch_forkexec(void);
#endif
NON_EXEC_STATIC void PgArchiverMain(int argc, char *argv[]) pg_attribute_noreturn();
static void pgarch_waken(SIGNAL_ARGS);
static void pgarch_waken_stop(SIGNAL_ARGS); static void pgarch_waken_stop(SIGNAL_ARGS);
static void pgarch_MainLoop(void); static void pgarch_MainLoop(void);
static void pgarch_ArchiverCopyLoop(void); static void pgarch_ArchiverCopyLoop(void);
static bool pgarch_archiveXlog(char *xlog); static bool pgarch_archiveXlog(char *xlog);
static bool pgarch_readyXlog(char *xlog); static bool pgarch_readyXlog(char *xlog);
static void pgarch_archiveDone(char *xlog); static void pgarch_archiveDone(char *xlog);
static void pgarch_die(int code, Datum arg);
/* Report shared memory space needed by PgArchShmemInit */
/* ------------------------------------------------------------ Size
* Public functions called from postmaster follow PgArchShmemSize(void)
* ------------------------------------------------------------
*/
/*
* pgarch_start
*
* Called from postmaster at startup or after an existing archiver
* died. Attempt to fire up a fresh archiver process.
*
* Returns PID of child process, or 0 if fail.
*
* Note: if fail, we will be called again from the postmaster main loop.
*/
int
pgarch_start(void)
{ {
time_t curtime; Size size = 0;
pid_t pgArchPid;
/* size = add_size(size, sizeof(PgArchData));
* Do nothing if no archiver needed
*/
if (!XLogArchivingActive())
return 0;
/*
* Do nothing if too soon since last archiver start. This is a safety
* valve to protect against continuous respawn attempts if the archiver is
* dying immediately at launch. Note that since we will be re-called from
* the postmaster main loop, we will get another chance later.
*/
curtime = time(NULL);
if ((unsigned int) (curtime - last_pgarch_start_time) <
(unsigned int) PGARCH_RESTART_INTERVAL)
return 0;
last_pgarch_start_time = curtime;
#ifdef EXEC_BACKEND return size;
switch ((pgArchPid = pgarch_forkexec())) }
#else
switch ((pgArchPid = fork_process()))
#endif
{
case -1:
ereport(LOG,
(errmsg("could not fork archiver: %m")));
return 0;
#ifndef EXEC_BACKEND
case 0:
/* in postmaster child ... */
InitPostmasterChild();
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
/* Drop our connection to postmaster's shared memory, as well */ /* Allocate and initialize archiver-related shared memory */
dsm_detach_all(); void
PGSharedMemoryDetach(); PgArchShmemInit(void)
{
bool found;
PgArchiverMain(0, NULL); PgArch = (PgArchData *)
break; ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
#endif
default: if (!found)
return (int) pgArchPid; {
/* First time through, so initialize */
MemSet(PgArch, 0, PgArchShmemSize());
PgArch->pgprocno = INVALID_PGPROCNO;
} }
/* shouldn't get here */
return 0;
} }
/* ------------------------------------------------------------
* Local functions called by archiver follow
* ------------------------------------------------------------
*/
#ifdef EXEC_BACKEND
/* /*
* pgarch_forkexec() - * PgArchCanRestart
* *
* Format up the arglist for, then fork and exec, archive process * Return true and archiver is allowed to restart if enough time has
* passed since it was launched last to reach PGARCH_RESTART_INTERVAL.
* Otherwise return false.
*
* This is a safety valve to protect against continuous respawn attempts if the
* archiver is dying immediately at launch. Note that since we will retry to
* launch the archiver from the postmaster main loop, we will get another
* chance later.
*/ */
static pid_t bool
pgarch_forkexec(void) PgArchCanRestart(void)
{ {
char *av[10]; static time_t last_pgarch_start_time = 0;
int ac = 0; time_t curtime = time(NULL);
av[ac++] = "postgres";
av[ac++] = "--forkarch";
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL; /*
Assert(ac < lengthof(av)); * Return false and don't restart archiver if too soon since last archiver
* start.
*/
if ((unsigned int) (curtime - last_pgarch_start_time) <
(unsigned int) PGARCH_RESTART_INTERVAL)
return false;
return postmaster_forkexec(ac, av); last_pgarch_start_time = curtime;
return true;
} }
#endif /* EXEC_BACKEND */
/* /* Main entry point for archiver process */
* PgArchiverMain void
* PgArchiverMain(void)
* The argc/argv parameters are valid only in EXEC_BACKEND case. However,
* since we don't use 'em, it hardly matters...
*/
NON_EXEC_STATIC void
PgArchiverMain(int argc, char *argv[])
{ {
/* /*
* Ignore all signals usually bound to some action in the postmaster, * Ignore all signals usually bound to some action in the postmaster,
...@@ -231,33 +174,51 @@ PgArchiverMain(int argc, char *argv[]) ...@@ -231,33 +174,51 @@ PgArchiverMain(int argc, char *argv[])
/* SIGQUIT handler was already set up by InitPostmasterChild */ /* SIGQUIT handler was already set up by InitPostmasterChild */
pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, pgarch_waken); pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, pgarch_waken_stop); pqsignal(SIGUSR2, pgarch_waken_stop);
/* Reset some signals that are accepted by postmaster but not here */ /* Reset some signals that are accepted by postmaster but not here */
pqsignal(SIGCHLD, SIG_DFL); pqsignal(SIGCHLD, SIG_DFL);
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
MyBackendType = B_ARCHIVER; /* We shouldn't be launched unnecessarily. */
init_ps_display(NULL); Assert(XLogArchivingActive());
/* Arrange to clean up at archiver exit */
on_shmem_exit(pgarch_die, 0);
/*
* Advertise our pgprocno so that backends can use our latch to wake us up
* while we're sleeping.
*/
PgArch->pgprocno = MyProc->pgprocno;
pgarch_MainLoop(); pgarch_MainLoop();
exit(0); proc_exit(0);
} }
/* SIGUSR1 signal handler for archiver process */ /*
static void * Wake up the archiver
pgarch_waken(SIGNAL_ARGS) */
void
PgArchWakeup(void)
{ {
int save_errno = errno; int arch_pgprocno = PgArch->pgprocno;
/* set flag that there is work to be done */ /*
wakened = true; * We don't acquire ProcArrayLock here. It's actually fine because
SetLatch(MyLatch); * procLatch isn't ever freed, so we just can potentially set the wrong
* process' (or no process') latch. Even in that case the archiver will
errno = save_errno; * be relaunched shortly and will start archiving.
*/
if (arch_pgprocno != INVALID_PGPROCNO)
SetLatch(&ProcGlobal->allProcs[arch_pgprocno].procLatch);
} }
/* SIGUSR2 signal handler for archiver process */ /* SIGUSR2 signal handler for archiver process */
static void static void
pgarch_waken_stop(SIGNAL_ARGS) pgarch_waken_stop(SIGNAL_ARGS)
...@@ -282,14 +243,6 @@ pgarch_MainLoop(void) ...@@ -282,14 +243,6 @@ pgarch_MainLoop(void)
pg_time_t last_copy_time = 0; pg_time_t last_copy_time = 0;
bool time_to_stop; bool time_to_stop;
/*
* We run the copy loop immediately upon entry, in case there are
* unarchived files left over from a previous database run (or maybe the
* archiver died unexpectedly). After that we wait for a signal or
* timeout before doing more.
*/
wakened = true;
/* /*
* There shouldn't be anything for the archiver to do except to wait for a * There shouldn't be anything for the archiver to do except to wait for a
* signal ... however, the archiver exists to protect our data, so she * signal ... however, the archiver exists to protect our data, so she
...@@ -328,12 +281,8 @@ pgarch_MainLoop(void) ...@@ -328,12 +281,8 @@ pgarch_MainLoop(void)
} }
/* Do what we're here for */ /* Do what we're here for */
if (wakened || time_to_stop)
{
wakened = false;
pgarch_ArchiverCopyLoop(); pgarch_ArchiverCopyLoop();
last_copy_time = time(NULL); last_copy_time = time(NULL);
}
/* /*
* Sleep until a signal is received, or until a poll is forced by * Sleep until a signal is received, or until a poll is forced by
...@@ -354,13 +303,9 @@ pgarch_MainLoop(void) ...@@ -354,13 +303,9 @@ pgarch_MainLoop(void)
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
timeout * 1000L, timeout * 1000L,
WAIT_EVENT_ARCHIVER_MAIN); WAIT_EVENT_ARCHIVER_MAIN);
if (rc & WL_TIMEOUT)
wakened = true;
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
time_to_stop = true; time_to_stop = true;
} }
else
wakened = true;
} }
/* /*
...@@ -744,3 +689,15 @@ pgarch_archiveDone(char *xlog) ...@@ -744,3 +689,15 @@ pgarch_archiveDone(char *xlog)
StatusFilePath(rlogdone, xlog, ".done"); StatusFilePath(rlogdone, xlog, ".done");
(void) durable_rename(rlogready, rlogdone, WARNING); (void) durable_rename(rlogready, rlogdone, WARNING);
} }
/*
* pgarch_die
*
* Exit-time cleanup handler
*/
static void
pgarch_die(int code, Datum arg)
{
PgArch->pgprocno = INVALID_PGPROCNO;
}
...@@ -443,9 +443,10 @@ static void InitPostmasterDeathWatchHandle(void); ...@@ -443,9 +443,10 @@ static void InitPostmasterDeathWatchHandle(void);
* even during recovery. * even during recovery.
*/ */
#define PgArchStartupAllowed() \ #define PgArchStartupAllowed() \
((XLogArchivingActive() && pmState == PM_RUN) || \ (((XLogArchivingActive() && pmState == PM_RUN) || \
(XLogArchivingAlways() && \ (XLogArchivingAlways() && \
(pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) (pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) && \
PgArchCanRestart())
#ifdef EXEC_BACKEND #ifdef EXEC_BACKEND
...@@ -548,6 +549,7 @@ static void ShmemBackendArrayRemove(Backend *bn); ...@@ -548,6 +549,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
#endif /* EXEC_BACKEND */ #endif /* EXEC_BACKEND */
#define StartupDataBase() StartChildProcess(StartupProcess) #define StartupDataBase() StartChildProcess(StartupProcess)
#define StartArchiver() StartChildProcess(ArchiverProcess)
#define StartBackgroundWriter() StartChildProcess(BgWriterProcess) #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
#define StartCheckpointer() StartChildProcess(CheckpointerProcess) #define StartCheckpointer() StartChildProcess(CheckpointerProcess)
#define StartWalWriter() StartChildProcess(WalWriterProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess)
...@@ -1792,7 +1794,7 @@ ServerLoop(void) ...@@ -1792,7 +1794,7 @@ ServerLoop(void)
/* If we have lost the archiver, try to start a new one. */ /* If we have lost the archiver, try to start a new one. */
if (PgArchPID == 0 && PgArchStartupAllowed()) if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = pgarch_start(); PgArchPID = StartArchiver();
/* If we need to signal the autovacuum launcher, do so now */ /* If we need to signal the autovacuum launcher, do so now */
if (avlauncher_needs_signal) if (avlauncher_needs_signal)
...@@ -3007,7 +3009,7 @@ reaper(SIGNAL_ARGS) ...@@ -3007,7 +3009,7 @@ reaper(SIGNAL_ARGS)
if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0) if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0)
AutoVacPID = StartAutoVacLauncher(); AutoVacPID = StartAutoVacLauncher();
if (PgArchStartupAllowed() && PgArchPID == 0) if (PgArchStartupAllowed() && PgArchPID == 0)
PgArchPID = pgarch_start(); PgArchPID = StartArchiver();
if (PgStatPID == 0) if (PgStatPID == 0)
PgStatPID = pgstat_start(); PgStatPID = pgstat_start();
...@@ -3142,20 +3144,22 @@ reaper(SIGNAL_ARGS) ...@@ -3142,20 +3144,22 @@ reaper(SIGNAL_ARGS)
} }
/* /*
* Was it the archiver? If so, just try to start a new one; no need * Was it the archiver? If exit status is zero (normal) or one (FATAL
* to force reset of the rest of the system. (If fail, we'll try * exit), we assume everything is all right just like normal backends
* again in future cycles of the main loop.). Unless we were waiting * and just try to restart a new one so that we immediately retry
* for it to shut down; don't restart it in that case, and * archiving remaining files. (If fail, we'll try again in future
* cycles of the postmaster's main loop.) Unless we were waiting for
* it to shut down; don't restart it in that case, and
* PostmasterStateMachine() will advance to the next shutdown step. * PostmasterStateMachine() will advance to the next shutdown step.
*/ */
if (pid == PgArchPID) if (pid == PgArchPID)
{ {
PgArchPID = 0; PgArchPID = 0;
if (!EXIT_STATUS_0(exitstatus)) if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
LogChildExit(LOG, _("archiver process"), HandleChildCrash(pid, exitstatus,
pid, exitstatus); _("archiver process"));
if (PgArchStartupAllowed()) if (PgArchStartupAllowed())
PgArchPID = pgarch_start(); PgArchPID = StartArchiver();
continue; continue;
} }
...@@ -3403,7 +3407,7 @@ CleanupBackend(int pid, ...@@ -3403,7 +3407,7 @@ CleanupBackend(int pid,
/* /*
* HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer, * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer,
* walwriter, autovacuum, or background worker. * walwriter, autovacuum, archiver or background worker.
* *
* The objectives here are to clean up our local state about the child * The objectives here are to clean up our local state about the child
* process, and to signal all other remaining children to quickdie. * process, and to signal all other remaining children to quickdie.
...@@ -3609,19 +3613,16 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) ...@@ -3609,19 +3613,16 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
signal_child(AutoVacPID, (SendStop ? SIGSTOP : SIGQUIT)); signal_child(AutoVacPID, (SendStop ? SIGSTOP : SIGQUIT));
} }
/* /* Take care of the archiver too */
* Force a power-cycle of the pgarch process too. (This isn't absolutely if (pid == PgArchPID)
* necessary, but it seems like a good idea for robustness, and it PgArchPID = 0;
* simplifies the state-machine logic in the case where a shutdown request else if (PgArchPID != 0 && take_action)
* arrives during crash processing.)
*/
if (PgArchPID != 0 && take_action)
{ {
ereport(DEBUG2, ereport(DEBUG2,
(errmsg_internal("sending %s to process %d", (errmsg_internal("sending %s to process %d",
"SIGQUIT", (SendStop ? "SIGSTOP" : "SIGQUIT"),
(int) PgArchPID))); (int) PgArchPID)));
signal_child(PgArchPID, SIGQUIT); signal_child(PgArchPID, (SendStop ? SIGSTOP : SIGQUIT));
} }
/* /*
...@@ -3804,12 +3805,11 @@ PostmasterStateMachine(void) ...@@ -3804,12 +3805,11 @@ PostmasterStateMachine(void)
* (including autovac workers), no bgworkers (including unconnected * (including autovac workers), no bgworkers (including unconnected
* ones), and no walwriter, autovac launcher or bgwriter. If we are * ones), and no walwriter, autovac launcher or bgwriter. If we are
* doing crash recovery or an immediate shutdown then we expect the * doing crash recovery or an immediate shutdown then we expect the
* checkpointer to exit as well, otherwise not. The archiver, stats, * checkpointer to exit as well, otherwise not. The stats and
* and syslogger processes are disregarded since they are not * syslogger processes are disregarded since they are not connected to
* connected to shared memory; we also disregard dead_end children * shared memory; we also disregard dead_end children here. Walsenders
* here. Walsenders are also disregarded, they will be terminated * and archiver are also disregarded, they will be terminated later
* later after writing the checkpoint record, like the archiver * after writing the checkpoint record.
* process.
*/ */
if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 &&
StartupPID == 0 && StartupPID == 0 &&
...@@ -3912,6 +3912,7 @@ PostmasterStateMachine(void) ...@@ -3912,6 +3912,7 @@ PostmasterStateMachine(void)
Assert(CheckpointerPID == 0); Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0); Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0); Assert(AutoVacPID == 0);
Assert(PgArchPID == 0);
/* syslogger is not considered here */ /* syslogger is not considered here */
pmState = PM_NO_CHILDREN; pmState = PM_NO_CHILDREN;
} }
...@@ -5037,12 +5038,6 @@ SubPostmasterMain(int argc, char *argv[]) ...@@ -5037,12 +5038,6 @@ SubPostmasterMain(int argc, char *argv[])
StartBackgroundWorker(); StartBackgroundWorker();
} }
if (strcmp(argv[1], "--forkarch") == 0)
{
/* Do not want to attach to shared memory */
PgArchiverMain(argc, argv); /* does not return */
}
if (strcmp(argv[1], "--forkcol") == 0) if (strcmp(argv[1], "--forkcol") == 0)
{ {
/* Do not want to attach to shared memory */ /* Do not want to attach to shared memory */
...@@ -5140,7 +5135,7 @@ sigusr1_handler(SIGNAL_ARGS) ...@@ -5140,7 +5135,7 @@ sigusr1_handler(SIGNAL_ARGS)
*/ */
Assert(PgArchPID == 0); Assert(PgArchPID == 0);
if (XLogArchivingAlways()) if (XLogArchivingAlways())
PgArchPID = pgarch_start(); PgArchPID = StartArchiver();
/* /*
* If we aren't planning to enter hot standby mode later, treat * If we aren't planning to enter hot standby mode later, treat
...@@ -5194,16 +5189,6 @@ sigusr1_handler(SIGNAL_ARGS) ...@@ -5194,16 +5189,6 @@ sigusr1_handler(SIGNAL_ARGS)
if (StartWorkerNeeded || HaveCrashedWorker) if (StartWorkerNeeded || HaveCrashedWorker)
maybe_start_bgworkers(); maybe_start_bgworkers();
if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
PgArchPID != 0)
{
/*
* Send SIGUSR1 to archiver process, to wake it up and begin archiving
* next WAL file.
*/
signal_child(PgArchPID, SIGUSR1);
}
/* Tell syslogger to rotate logfile if requested */ /* Tell syslogger to rotate logfile if requested */
if (SysLoggerPID != 0) if (SysLoggerPID != 0)
{ {
...@@ -5445,6 +5430,10 @@ StartChildProcess(AuxProcType type) ...@@ -5445,6 +5430,10 @@ StartChildProcess(AuxProcType type)
ereport(LOG, ereport(LOG,
(errmsg("could not fork startup process: %m"))); (errmsg("could not fork startup process: %m")));
break; break;
case ArchiverProcess:
ereport(LOG,
(errmsg("could not fork archiver process: %m")));
break;
case BgWriterProcess: case BgWriterProcess:
ereport(LOG, ereport(LOG,
(errmsg("could not fork background writer process: %m"))); (errmsg("could not fork background writer process: %m")));
......
...@@ -144,6 +144,7 @@ CreateSharedMemoryAndSemaphores(void) ...@@ -144,6 +144,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, ReplicationOriginShmemSize()); size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize()); size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize()); size = add_size(size, WalRcvShmemSize());
size = add_size(size, PgArchShmemSize());
size = add_size(size, ApplyLauncherShmemSize()); size = add_size(size, ApplyLauncherShmemSize());
size = add_size(size, SnapMgrShmemSize()); size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize()); size = add_size(size, BTreeShmemSize());
...@@ -258,6 +259,7 @@ CreateSharedMemoryAndSemaphores(void) ...@@ -258,6 +259,7 @@ CreateSharedMemoryAndSemaphores(void)
ReplicationOriginShmemInit(); ReplicationOriginShmemInit();
WalSndShmemInit(); WalSndShmemInit();
WalRcvShmemInit(); WalRcvShmemInit();
PgArchShmemInit();
ApplyLauncherShmemInit(); ApplyLauncherShmemInit();
/* /*
......
...@@ -417,6 +417,7 @@ typedef enum ...@@ -417,6 +417,7 @@ typedef enum
BootstrapProcess, BootstrapProcess,
StartupProcess, StartupProcess,
BgWriterProcess, BgWriterProcess,
ArchiverProcess,
CheckpointerProcess, CheckpointerProcess,
WalWriterProcess, WalWriterProcess,
WalReceiverProcess, WalReceiverProcess,
...@@ -429,6 +430,7 @@ extern AuxProcType MyAuxProcType; ...@@ -429,6 +430,7 @@ extern AuxProcType MyAuxProcType;
#define AmBootstrapProcess() (MyAuxProcType == BootstrapProcess) #define AmBootstrapProcess() (MyAuxProcType == BootstrapProcess)
#define AmStartupProcess() (MyAuxProcType == StartupProcess) #define AmStartupProcess() (MyAuxProcType == StartupProcess)
#define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess) #define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess)
#define AmArchiverProcess() (MyAuxProcType == ArchiverProcess)
#define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess) #define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess)
#define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess) #define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess)
#define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess) #define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess)
......
...@@ -26,14 +26,10 @@ ...@@ -26,14 +26,10 @@
#define MAX_XFN_CHARS 40 #define MAX_XFN_CHARS 40
#define VALID_XFN_CHARS "0123456789ABCDEF.history.backup.partial" #define VALID_XFN_CHARS "0123456789ABCDEF.history.backup.partial"
/* ---------- extern Size PgArchShmemSize(void);
* Functions called from postmaster extern void PgArchShmemInit(void);
* ---------- extern bool PgArchCanRestart(void);
*/ extern void PgArchiverMain(void) pg_attribute_noreturn();
extern int pgarch_start(void); extern void PgArchWakeup(void);
#ifdef EXEC_BACKEND
extern void PgArchiverMain(int argc, char *argv[]) pg_attribute_noreturn();
#endif
#endif /* _PGARCH_H */ #endif /* _PGARCH_H */
...@@ -34,7 +34,6 @@ typedef enum ...@@ -34,7 +34,6 @@ typedef enum
{ {
PMSIGNAL_RECOVERY_STARTED, /* recovery has started */ PMSIGNAL_RECOVERY_STARTED, /* recovery has started */
PMSIGNAL_BEGIN_HOT_STANDBY, /* begin Hot Standby */ PMSIGNAL_BEGIN_HOT_STANDBY, /* begin Hot Standby */
PMSIGNAL_WAKEN_ARCHIVER, /* send a NOTIFY signal to xlog archiver */
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
......
...@@ -370,11 +370,11 @@ extern PGPROC *PreparedXactProcs; ...@@ -370,11 +370,11 @@ extern PGPROC *PreparedXactProcs;
* We set aside some extra PGPROC structures for auxiliary processes, * We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access. * ie things that aren't full-fledged backends but need shmem access.
* *
* Background writer, checkpointer and WAL writer run during normal operation. * Background writer, checkpointer, WAL writer and archiver run during normal
* Startup process and WAL receiver also consume 2 slots, but WAL writer is * operation. Startup process and WAL receiver also consume 2 slots, but WAL
* launched only after startup has exited, so we only need 4 slots. * writer is launched only after startup has exited, so we only need 5 slots.
*/ */
#define NUM_AUXILIARY_PROCS 4 #define NUM_AUXILIARY_PROCS 5
/* configurable options */ /* configurable options */
extern PGDLLIMPORT int DeadlockTimeout; extern PGDLLIMPORT int DeadlockTimeout;
......
...@@ -1572,6 +1572,7 @@ PGresAttValue ...@@ -1572,6 +1572,7 @@ PGresAttValue
PGresParamDesc PGresParamDesc
PGresult PGresult
PGresult_data PGresult_data
PgArchData
PHANDLE PHANDLE
PLAINTREE PLAINTREE
PLUID_AND_ATTRIBUTES PLUID_AND_ATTRIBUTES
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment