Commit 4c8e20f8 authored by Magnus Hagander's avatar Magnus Hagander

Track walsender state in shared memory and expose in pg_stat_replication

parent 47a5f3e9
......@@ -298,8 +298,8 @@ postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
<entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry>
<entry>One row per WAL sender process, showing process <acronym>ID</>,
user OID, user name, application name, client's address and port number,
time at which the server process began execution, and transaction log
location.
time at which the server process began execution, current WAL sender
state and transaction log location.
</entry>
</row>
......
......@@ -501,6 +501,7 @@ CREATE VIEW pg_stat_replication AS
S.client_addr,
S.client_port,
S.backend_start,
W.state,
W.sent_location
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
......
......@@ -24,6 +24,7 @@
#include "libpq/pqformat.h"
#include "nodes/pg_list.h"
#include "replication/basebackup.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
......@@ -115,6 +116,8 @@ SendBaseBackup(const char *options)
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(backup_context);
WalSndSetState(WALSNDSTATE_BACKUP);
if (backup_label == NULL)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
......
......@@ -179,6 +179,7 @@ WalSndHandshake(void)
{
int firstchar;
WalSndSetState(WALSNDSTATE_STARTUP);
set_ps_display("idle", false);
/* Wait for a command to arrive */
......@@ -482,6 +483,9 @@ WalSndLoop(void)
if (!XLogSend(output_message, &caughtup))
break;
}
/* Update our state to indicate if we're behind or not */
WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
}
/*
......@@ -533,6 +537,7 @@ InitWalSnd(void)
*/
walsnd->pid = MyProcPid;
MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
walsnd->state = WALSNDSTATE_STARTUP;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
OwnLatch((Latch *) &walsnd->latch);
......@@ -960,6 +965,45 @@ WalSndWakeup(void)
SetLatch(&WalSndCtl->walsnds[i].latch);
}
/* Set state for current walsender (only called in walsender) */
void
WalSndSetState(WalSndState state)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
Assert(am_walsender);
if (walsnd->state == state)
return;
SpinLockAcquire(&walsnd->mutex);
walsnd->state = state;
SpinLockRelease(&walsnd->mutex);
}
/*
* Return a string constant representing the state. This is used
* in system views, and should *not* be translated.
*/
static const char *
WalSndGetStateString(WalSndState state)
{
switch (state)
{
case WALSNDSTATE_STARTUP:
return "STARTUP";
case WALSNDSTATE_BACKUP:
return "BACKUP";
case WALSNDSTATE_CATCHUP:
return "CATCHUP";
case WALSNDSTATE_STREAMING:
return "STREAMING";
}
return "UNKNOWN";
}
/*
* Returns activity of walsenders, including pids and xlog locations sent to
* standby servers.
......@@ -967,7 +1011,7 @@ WalSndWakeup(void)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_WAL_SENDERS_COLS 2
#define PG_STAT_GET_WAL_SENDERS_COLS 3
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
......@@ -1021,7 +1065,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
values[1] = CStringGetTextDatum(sent_location);
values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state));
values[2] = CStringGetTextDatum(sent_location);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201101081
#define CATALOG_VERSION_NO 201101111
#endif
......@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
......
......@@ -16,12 +16,22 @@
#include "storage/latch.h"
#include "storage/spin.h"
typedef enum WalSndState
{
WALSNDSTATE_STARTUP = 0,
WALSNDSTATE_BACKUP,
WALSNDSTATE_CATCHUP,
WALSNDSTATE_STREAMING
} WalSndState;
/*
* Each walsender has a WalSnd struct in shared memory.
*/
typedef struct WalSnd
{
pid_t pid; /* this walsender's process id, or 0 */
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
slock_t mutex; /* locks shared variables shown above */
......@@ -53,6 +63,7 @@ extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndSetState(WalSndState state);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment