Commit b025f32e authored by Michael Paquier's avatar Michael Paquier

Add leader_pid to pg_stat_activity

This new field tracks the PID of the group leader used with parallel
query.  For parallel workers and the leader, the value is set to the
PID of the group leader.  So, for the group leader, the value is the
same as its own PID.  Note that this reflects what PGPROC stores in
shared memory, so as leader_pid is NULL if a backend has never been
involved in parallel query.  If the backend is using parallel query or
has used it at least once, the value is set until the backend exits.

Author: Julien Rouhaud
Reviewed-by: Sergei Kornilov, Guillaume Lelarge, Michael Paquier, Tomas
Vondra
Discussion: https://postgr.es/m/CAOBaU_Yy5bt0vTPZ2_LUM6cUcGeqmYNoJ8-Rgto+c2+w3defYA@mail.gmail.com
parent bf6cc19e
......@@ -622,6 +622,18 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><type>integer</type></entry>
<entry>Process ID of this backend</entry>
</row>
<row>
<entry><structfield>leader_pid</structfield></entry>
<entry><type>integer</type></entry>
<entry>
Process ID of the parallel group leader if this process is or
has been involved in parallel query, or null. This field is set
when a process wants to cooperate with parallel workers, and
remains set as long as the process exists. For a parallel group leader,
this field is set to its own process ID. For a parallel worker,
this field is set to the process ID of the parallel group leader.
</entry>
</row>
<row>
<entry><structfield>usesysid</structfield></entry>
<entry><type>oid</type></entry>
......
......@@ -741,6 +741,7 @@ CREATE VIEW pg_stat_activity AS
S.datid AS datid,
D.datname AS datname,
S.pid,
S.leader_pid,
S.usesysid,
U.rolname AS usename,
S.application_name,
......
......@@ -547,7 +547,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum
pg_stat_get_activity(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_ACTIVITY_COLS 29
#define PG_STAT_GET_ACTIVITY_COLS 30
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
......@@ -686,33 +686,40 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
values[5] = CStringGetTextDatum(clipped_activity);
pfree(clipped_activity);
proc = BackendPidGetProc(beentry->st_procpid);
if (proc != NULL)
{
uint32 raw_wait_event;
/* leader_pid */
nulls[29] = true;
raw_wait_event = UINT32_ACCESS_ONCE(proc->wait_event_info);
wait_event_type = pgstat_get_wait_event_type(raw_wait_event);
wait_event = pgstat_get_wait_event(raw_wait_event);
proc = BackendPidGetProc(beentry->st_procpid);
}
else if (beentry->st_backendType != B_BACKEND)
if (proc == NULL && (beentry->st_backendType != B_BACKEND))
{
/*
* For an auxiliary process, retrieve process info from
* AuxiliaryProcs stored in shared-memory.
*/
proc = AuxiliaryPidGetProc(beentry->st_procpid);
}
/*
* If a PGPROC entry was retrieved, display wait events and lock
* group leader information if any. To avoid extra overhead, no
* extra lock is being held, so there is no guarantee of
* consistency across multiple rows.
*/
if (proc != NULL)
{
uint32 raw_wait_event;
PGPROC *leader;
raw_wait_event =
UINT32_ACCESS_ONCE(proc->wait_event_info);
wait_event_type =
pgstat_get_wait_event_type(raw_wait_event);
raw_wait_event = UINT32_ACCESS_ONCE(proc->wait_event_info);
wait_event_type = pgstat_get_wait_event_type(raw_wait_event);
wait_event = pgstat_get_wait_event(raw_wait_event);
leader = proc->lockGroupLeader;
if (leader)
{
values[29] = Int32GetDatum(leader->pid);
nulls[29] = false;
}
}
......@@ -908,6 +915,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[26] = true;
nulls[27] = true;
nulls[28] = true;
nulls[29] = true;
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
......
......@@ -5175,9 +5175,9 @@
proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'int4',
proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc}',
proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
prosrc => 'pg_stat_get_activity' },
{ oid => '3318',
descr => 'statistics: information about progress of backends running maintenance command',
......
......@@ -1730,6 +1730,7 @@ pg_shmem_allocations| SELECT pg_get_shmem_allocations.name,
pg_stat_activity| SELECT s.datid,
d.datname,
s.pid,
s.leader_pid,
s.usesysid,
u.rolname AS usename,
s.application_name,
......@@ -1747,7 +1748,7 @@ pg_stat_activity| SELECT s.datid,
s.backend_xmin,
s.query,
s.backend_type
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
LEFT JOIN pg_database d ON ((s.datid = d.oid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_all_indexes| SELECT c.oid AS relid,
......@@ -1851,7 +1852,7 @@ pg_stat_gssapi| SELECT s.pid,
s.gss_auth AS gss_authenticated,
s.gss_princ AS principal,
s.gss_enc AS encrypted
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
WHERE (s.client_port IS NOT NULL);
pg_stat_progress_analyze| SELECT s.pid,
s.datid,
......@@ -1984,7 +1985,7 @@ pg_stat_replication| SELECT s.pid,
w.spill_txns,
w.spill_count,
w.spill_bytes
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
......@@ -1996,7 +1997,7 @@ pg_stat_ssl| SELECT s.pid,
s.ssl_client_dn AS client_dn,
s.ssl_client_serial AS client_serial,
s.ssl_issuer_dn AS issuer_dn
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
WHERE (s.client_port IS NOT NULL);
pg_stat_subscription| SELECT su.oid AS subid,
su.subname,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment