Commit 175ff659 authored by Robert Haas's avatar Robert Haas

Fix possible crash reading pg_stat_activity.

With the old code, a backend that read pg_stat_activity without ever
having executed a parallel query might see a backend in the midst of
executing one waiting on a DSA LWLock, resulting in a crash.  The
solution is for backends to register the tranche at startup time, not
the first time a parallel query is executed.

Report by Andreas Seltenreich.  Patch by me, reviewed by Thomas Munro.
parent 82f8107b
......@@ -486,7 +486,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
pei->area = dsa_create_in_place(area_space, dsa_minsize,
LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa",
pcxt->seg);
}
......
......@@ -508,6 +508,8 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager");
LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER,
"predicate_lock_manager");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
......
......@@ -276,11 +276,6 @@ static char dsa_size_class_map[] = {
*/
#define DSA_FULLNESS_CLASSES 4
/*
* Maximum length of a DSA name.
*/
#define DSA_MAXLEN 64
/*
* A dsa_area_pool represents a set of objects of a given size class.
*
......@@ -326,7 +321,6 @@ typedef struct
Size freed_segment_counter;
/* The LWLock tranche ID. */
int lwlock_tranche_id;
char lwlock_tranche_name[DSA_MAXLEN];
/* The general lock (protects everything except object pools). */
LWLock lock;
} dsa_area_control;
......@@ -405,7 +399,7 @@ static void unlink_segment(dsa_area *area, dsa_segment_map *segment_map);
static dsa_segment_map *get_best_segment(dsa_area *area, Size npages);
static dsa_segment_map *make_new_segment(dsa_area *area, Size requested_pages);
static dsa_area *create_internal(void *place, size_t size,
int tranche_id, const char *tranche_name,
int tranche_id,
dsm_handle control_handle,
dsm_segment *control_segment);
static dsa_area *attach_internal(void *place, dsm_segment *segment,
......@@ -419,12 +413,10 @@ static void check_for_freed_segments(dsa_area *area);
* We can't allocate a LWLock tranche_id within this function, because tranche
* IDs are a scarce resource; there are only 64k available, using low numbers
* when possible matters, and we have no provision for recycling them. So,
* we require the caller to provide one. The caller must also provide the
* tranche name, so that we can distinguish LWLocks belonging to different
* DSAs.
* we require the caller to provide one.
*/
dsa_area *
dsa_create(int tranche_id, const char *tranche_name)
dsa_create(int tranche_id)
{
dsm_segment *segment;
dsa_area *area;
......@@ -446,7 +438,7 @@ dsa_create(int tranche_id, const char *tranche_name)
/* Create a new DSA area with the control objet in this segment. */
area = create_internal(dsm_segment_address(segment),
DSA_INITIAL_SEGMENT_SIZE,
tranche_id, tranche_name,
tranche_id,
dsm_segment_handle(segment), segment);
/* Clean up when the control segment detaches. */
......@@ -474,12 +466,11 @@ dsa_create(int tranche_id, const char *tranche_name)
*/
dsa_area *
dsa_create_in_place(void *place, size_t size,
int tranche_id, const char *tranche_name,
dsm_segment *segment)
int tranche_id, dsm_segment *segment)
{
dsa_area *area;
area = create_internal(place, size, tranche_id, tranche_name,
area = create_internal(place, size, tranche_id,
DSM_HANDLE_INVALID, NULL);
/*
......@@ -1139,7 +1130,7 @@ dsa_minimum_size(void)
*/
static dsa_area *
create_internal(void *place, size_t size,
int tranche_id, const char *tranche_name,
int tranche_id,
dsm_handle control_handle,
dsm_segment *control_segment)
{
......@@ -1192,7 +1183,6 @@ create_internal(void *place, size_t size,
control->refcnt = 1;
control->freed_segment_counter = 0;
control->lwlock_tranche_id = tranche_id;
strlcpy(control->lwlock_tranche_name, tranche_name, DSA_MAXLEN);
/*
* Create the dsa_area object that this backend will use to access the
......@@ -1204,8 +1194,6 @@ create_internal(void *place, size_t size,
area->mapping_pinned = false;
memset(area->segment_maps, 0, sizeof(dsa_segment_map) * DSA_MAX_SEGMENTS);
area->high_segment_index = 0;
LWLockRegisterTranche(control->lwlock_tranche_id,
control->lwlock_tranche_name);
LWLockInitialize(&control->lock, control->lwlock_tranche_id);
for (i = 0; i < DSA_NUM_SIZE_CLASSES; ++i)
LWLockInitialize(DSA_SCLASS_LOCK(area, i),
......@@ -1262,8 +1250,6 @@ attach_internal(void *place, dsm_segment *segment, dsa_handle handle)
memset(&area->segment_maps[0], 0,
sizeof(dsa_segment_map) * DSA_MAX_SEGMENTS);
area->high_segment_index = 0;
LWLockRegisterTranche(control->lwlock_tranche_id,
control->lwlock_tranche_name);
/* Set up the segment map for this process's mapping. */
segment_map = &area->segment_maps[0];
......
......@@ -90,10 +90,9 @@ typedef dsm_handle dsa_handle;
extern void dsa_startup(void);
extern dsa_area *dsa_create(int tranche_id, const char *tranche_name);
extern dsa_area *dsa_create(int tranche_id);
extern dsa_area *dsa_create_in_place(void *place, Size size,
int tranche_id, const char *tranche_name,
dsm_segment *segment);
int tranche_id, dsm_segment *segment);
extern dsa_area *dsa_attach(dsa_handle handle);
extern dsa_area *dsa_attach_in_place(void *place, dsm_segment *segment);
extern void dsa_release_in_place(void *place);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment