Commit 9a895462 authored by Fujii Masao's avatar Fujii Masao

Enhance pg_stat_wal_receiver view to display host and port of sender server.

Previously there was no way in the standby side to find out the host and port
of the sender server that the walreceiver was currently connected to when
multiple hosts and ports were specified in primary_conninfo. For that purpose,
this patch adds sender_host and sender_port columns into pg_stat_wal_receiver
view. They report the host and port that the active replication connection
currently uses.

Bump catalog version.

Author: Haribabu Kommi
Reviewed-by: Michael Paquier and me

Discussion: https://postgr.es/m/CAJrrPGcV_aq8=cdqkFhVDJKEnDQ70yRTTdY9RODzMnXNrCz2Ow@mail.gmail.com
parent 11002f8a
...@@ -2031,6 +2031,25 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -2031,6 +2031,25 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<entry><type>text</type></entry> <entry><type>text</type></entry>
<entry>Replication slot name used by this WAL receiver</entry> <entry>Replication slot name used by this WAL receiver</entry>
</row> </row>
<row>
<entry><structfield>sender_host</structfield></entry>
<entry><type>text</type></entry>
<entry>
Host of the <productname>PostgreSQL</productname> instance
this WAL receiver is connected to. This can be a host name,
an IP address, or a directory path if the connection is via
Unix socket. (The path case can be distinguished because it
will always be an absolute path, beginning with <literal>/</literal>.)
</entry>
</row>
<row>
<entry><structfield>sender_port</structfield></entry>
<entry><type>integer</type></entry>
<entry>
Port number of the <productname>PostgreSQL</productname> instance
this WAL receiver is connected to.
</entry>
</row>
<row> <row>
<entry><structfield>conninfo</structfield></entry> <entry><structfield>conninfo</structfield></entry>
<entry><type>text</type></entry> <entry><type>text</type></entry>
......
...@@ -752,6 +752,8 @@ CREATE VIEW pg_stat_wal_receiver AS ...@@ -752,6 +752,8 @@ CREATE VIEW pg_stat_wal_receiver AS
s.latest_end_lsn, s.latest_end_lsn,
s.latest_end_time, s.latest_end_time,
s.slot_name, s.slot_name,
s.sender_host,
s.sender_port,
s.conninfo s.conninfo
FROM pg_stat_get_wal_receiver() s FROM pg_stat_get_wal_receiver() s
WHERE s.pid IS NOT NULL; WHERE s.pid IS NOT NULL;
......
...@@ -53,6 +53,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo, ...@@ -53,6 +53,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
char **err); char **err);
static void libpqrcv_check_conninfo(const char *conninfo); static void libpqrcv_check_conninfo(const char *conninfo);
static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char **sender_host, int *sender_port);
static char *libpqrcv_identify_system(WalReceiverConn *conn, static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli, TimeLineID *primary_tli,
int *server_version); int *server_version);
...@@ -82,6 +84,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { ...@@ -82,6 +84,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect, libpqrcv_connect,
libpqrcv_check_conninfo, libpqrcv_check_conninfo,
libpqrcv_get_conninfo, libpqrcv_get_conninfo,
libpqrcv_get_senderinfo,
libpqrcv_identify_system, libpqrcv_identify_system,
libpqrcv_readtimelinehistoryfile, libpqrcv_readtimelinehistoryfile,
libpqrcv_startstreaming, libpqrcv_startstreaming,
...@@ -282,6 +285,29 @@ libpqrcv_get_conninfo(WalReceiverConn *conn) ...@@ -282,6 +285,29 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
return retval; return retval;
} }
/*
* Provides information of sender this WAL receiver is connected to.
*/
static void
libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
int *sender_port)
{
char *ret = NULL;
*sender_host = NULL;
*sender_port = 0;
Assert(conn->streamConn != NULL);
ret = PQhost(conn->streamConn);
if (ret && strlen(ret) != 0)
*sender_host = pstrdup(ret);
ret = PQport(conn->streamConn);
if (ret && strlen(ret) != 0)
*sender_port = atoi(ret);
}
/* /*
* Check that primary's system identifier matches ours, and fetch the current * Check that primary's system identifier matches ours, and fetch the current
* timeline ID of the primary. * timeline ID of the primary.
......
...@@ -52,6 +52,7 @@ ...@@ -52,6 +52,7 @@
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
...@@ -199,6 +200,8 @@ WalReceiverMain(void) ...@@ -199,6 +200,8 @@ WalReceiverMain(void)
TimestampTz now; TimestampTz now;
bool ping_sent; bool ping_sent;
char *err; char *err;
char *sender_host = NULL;
int sender_port = 0;
/* /*
* WalRcv should be set up already (if we are a backend, we inherit this * WalRcv should be set up already (if we are a backend, we inherit this
...@@ -308,19 +311,30 @@ WalReceiverMain(void) ...@@ -308,19 +311,30 @@ WalReceiverMain(void)
/* /*
* Save user-visible connection string. This clobbers the original * Save user-visible connection string. This clobbers the original
* conninfo, for security. * conninfo, for security. Also save host and port of the sender server
* this walreceiver is connected to.
*/ */
tmp_conninfo = walrcv_get_conninfo(wrconn); tmp_conninfo = walrcv_get_conninfo(wrconn);
walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
memset(walrcv->conninfo, 0, MAXCONNINFO); memset(walrcv->conninfo, 0, MAXCONNINFO);
if (tmp_conninfo) if (tmp_conninfo)
strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO); strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
memset(walrcv->sender_host, 0, NI_MAXHOST);
if (sender_host)
strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
walrcv->sender_port = sender_port;
walrcv->ready_to_display = true; walrcv->ready_to_display = true;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
if (tmp_conninfo) if (tmp_conninfo)
pfree(tmp_conninfo); pfree(tmp_conninfo);
if (sender_host)
pfree(sender_host);
first_stream = true; first_stream = true;
for (;;) for (;;)
{ {
...@@ -1402,6 +1416,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) ...@@ -1402,6 +1416,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
TimestampTz last_receipt_time; TimestampTz last_receipt_time;
XLogRecPtr latest_end_lsn; XLogRecPtr latest_end_lsn;
TimestampTz latest_end_time; TimestampTz latest_end_time;
char sender_host[NI_MAXHOST];
int sender_port = 0;
char slotname[NAMEDATALEN]; char slotname[NAMEDATALEN];
char conninfo[MAXCONNINFO]; char conninfo[MAXCONNINFO];
...@@ -1419,6 +1435,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) ...@@ -1419,6 +1435,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
latest_end_lsn = WalRcv->latestWalEnd; latest_end_lsn = WalRcv->latestWalEnd;
latest_end_time = WalRcv->latestWalEndTime; latest_end_time = WalRcv->latestWalEndTime;
strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname)); strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
sender_port = WalRcv->sender_port;
strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo)); strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
SpinLockRelease(&WalRcv->mutex); SpinLockRelease(&WalRcv->mutex);
...@@ -1482,10 +1500,18 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) ...@@ -1482,10 +1500,18 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
nulls[10] = true; nulls[10] = true;
else else
values[10] = CStringGetTextDatum(slotname); values[10] = CStringGetTextDatum(slotname);
if (*conninfo == '\0') if (*sender_host == '\0')
nulls[11] = true; nulls[11] = true;
else else
values[11] = CStringGetTextDatum(conninfo); values[11] = CStringGetTextDatum(sender_host);
if (sender_port == 0)
nulls[12] = true;
else
values[12] = Int32GetDatum(sender_port);
if (*conninfo == '\0')
nulls[13] = true;
else
values[13] = CStringGetTextDatum(conninfo);
} }
/* Returns the record as Datum */ /* Returns the record as Datum */
......
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201803291 #define CATALOG_VERSION_NO 201803311
#endif #endif
...@@ -2919,7 +2919,7 @@ DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 ...@@ -2919,7 +2919,7 @@ DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0
DESCR("statistics: information about progress of backends running maintenance command"); DESCR("statistics: information about progress of backends running maintenance command");
DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication"); DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25,23,25}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver"); DESCR("statistics: information about WAL receiver");
DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ )); DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
DESCR("statistics: information about subscription"); DESCR("statistics: information about subscription");
......
...@@ -108,6 +108,13 @@ typedef struct ...@@ -108,6 +108,13 @@ typedef struct
*/ */
char conninfo[MAXCONNINFO]; char conninfo[MAXCONNINFO];
/*
* Host name (this can be a host name, an IP address, or a directory
* path) and port number of the active replication connection.
*/
char sender_host[NI_MAXHOST];
int sender_port;
/* /*
* replication slot name; is also used for walreceiver to connect with the * replication slot name; is also used for walreceiver to connect with the
* primary * primary
...@@ -197,6 +204,9 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica ...@@ -197,6 +204,9 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica
char **err); char **err);
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
char **sender_host,
int *sender_port);
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli, TimeLineID *primary_tli,
int *server_version); int *server_version);
...@@ -227,6 +237,7 @@ typedef struct WalReceiverFunctionsType ...@@ -227,6 +237,7 @@ typedef struct WalReceiverFunctionsType
walrcv_connect_fn walrcv_connect; walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo; walrcv_check_conninfo_fn walrcv_check_conninfo;
walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system; walrcv_identify_system_fn walrcv_identify_system;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming; walrcv_startstreaming_fn walrcv_startstreaming;
...@@ -246,6 +257,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; ...@@ -246,6 +257,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_check_conninfo(conninfo) WalReceiverFunctions->walrcv_check_conninfo(conninfo)
#define walrcv_get_conninfo(conn) \ #define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn) WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_identify_system(conn, primary_tli, server_version) \ #define walrcv_identify_system(conn, primary_tli, server_version) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version) WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
......
...@@ -1972,8 +1972,10 @@ pg_stat_wal_receiver| SELECT s.pid, ...@@ -1972,8 +1972,10 @@ pg_stat_wal_receiver| SELECT s.pid,
s.latest_end_lsn, s.latest_end_lsn,
s.latest_end_time, s.latest_end_time,
s.slot_name, s.slot_name,
s.sender_host,
s.sender_port,
s.conninfo s.conninfo
FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, conninfo) FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, sender_host, sender_port, conninfo)
WHERE (s.pid IS NOT NULL); WHERE (s.pid IS NOT NULL);
pg_stat_xact_all_tables| SELECT c.oid AS relid, pg_stat_xact_all_tables| SELECT c.oid AS relid,
n.nspname AS schemaname, n.nspname AS schemaname,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment