Commit cc5f8136 authored by Andres Freund's avatar Andres Freund

Add support for coordinating record typmods among parallel workers.

Tuples can have type RECORDOID and a typmod number that identifies a blessed
TupleDesc in a backend-private cache.  To support the sharing of such tuples
through shared memory and temporary files, provide a typmod registry in
shared memory.

To achieve that, introduce per-session DSM segments, created on demand when a
backend first runs a parallel query.  The per-session DSM segment has a
table-of-contents just like the per-query DSM segment, and initially the
contents are a shared record typmod registry and a DSA area to provide the
space it needs to grow.

State relating to the current session is accessed via a Session object
reached through global variable CurrentSession that may require significant
redesign further down the road as we figure out what else needs to be shared
or remodelled.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
parent 9b6cb465
......@@ -13,6 +13,6 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = bufmask.o heaptuple.o indextuple.o printsimple.o printtup.o \
reloptions.o scankey.o tupconvert.o tupdesc.o
reloptions.o scankey.o session.o tupconvert.o tupdesc.o
include $(top_srcdir)/src/backend/common.mk
/*-------------------------------------------------------------------------
*
* session.c
* Encapsulation of user session.
*
* This is intended to contain data that needs to be shared between backends
* performing work for a client session. In particular such a session is
* shared between the leader and worker processes for parallel queries. At
* some later point it might also become useful infrastructure for separating
* backends from client connections, e.g. for the purpose of pooling.
*
* Currently this infrastructure is used to share:
* - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc.
*
* Portions Copyright (c) 2017, PostgreSQL Global Development Group
*
* src/backend/access/common/session.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/session.h"
#include "storage/lwlock.h"
#include "storage/shm_toc.h"
#include "utils/memutils.h"
#include "utils/typcache.h"
/* Magic number for per-session DSM TOC. */
#define SESSION_MAGIC 0xabb0fbc9
/*
* We want to create a DSA area to store shared state that has the same
* lifetime as a session. So far, it's only used to hold the shared record
* type registry. We don't want it to have to create any DSM segments just
* yet in common cases, so we'll give it enough space to hold a very small
* SharedRecordTypmodRegistry.
*/
#define SESSION_DSA_SIZE 0x30000
/*
* Magic numbers for state sharing in the per-session DSM area.
*/
#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001)
#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002)
/* This backend's current session. */
Session *CurrentSession = NULL;
/*
* Set up CurrentSession to point to an empty Session object.
*/
void
InitializeSession(void)
{
CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session));
}
/*
* Initialize the per-session DSM segment if it isn't already initialized, and
* return its handle so that worker processes can attach to it.
*
* Unlike the per-context DSM segment, this segement and its contents are
* reused for future parallel queries.
*
* Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of
* resources.
*/
dsm_handle
GetSessionDsmHandle(void)
{
shm_toc_estimator estimator;
shm_toc *toc;
dsm_segment *seg;
size_t typmod_registry_size;
size_t size;
void *dsa_space;
void *typmod_registry_space;
dsa_area *dsa;
MemoryContext old_context;
/*
* If we have already created a session-scope DSM segment in this backend,
* return its handle. The same segment will be used for the rest of this
* backend's lifetime.
*/
if (CurrentSession->segment != NULL)
return dsm_segment_handle(CurrentSession->segment);
/* Otherwise, prepare to set one up. */
old_context = MemoryContextSwitchTo(TopMemoryContext);
shm_toc_initialize_estimator(&estimator);
/* Estimate space for the per-session DSA area. */
shm_toc_estimate_keys(&estimator, 1);
shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE);
/* Estimate space for the per-session record typmod registry. */
typmod_registry_size = SharedRecordTypmodRegistryEstimate();
shm_toc_estimate_keys(&estimator, 1);
shm_toc_estimate_chunk(&estimator, typmod_registry_size);
/* Set up segment and TOC. */
size = shm_toc_estimate(&estimator);
seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (seg == NULL)
{
MemoryContextSwitchTo(old_context);
return DSM_HANDLE_INVALID;
}
toc = shm_toc_create(SESSION_MAGIC,
dsm_segment_address(seg),
size);
/* Create per-session DSA area. */
dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE);
dsa = dsa_create_in_place(dsa_space,
SESSION_DSA_SIZE,
LWTRANCHE_SESSION_DSA,
seg);
shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space);
/* Create session-scoped shared record typmod registry. */
typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size);
SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
typmod_registry_space, seg, dsa);
shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY,
typmod_registry_space);
/*
* If we got this far, we can pin the shared memory so it stays mapped for
* the rest of this backend's life. If we don't make it this far, cleanup
* callbacks for anything we installed above (ie currently
* SharedRecordTypemodRegistry) will run when the DSM segment is detached
* by CurrentResourceOwner so we aren't left with a broken CurrentSession.
*/
dsm_pin_mapping(seg);
dsa_pin_mapping(dsa);
/* Make segment and area available via CurrentSession. */
CurrentSession->segment = seg;
CurrentSession->area = dsa;
MemoryContextSwitchTo(old_context);
return dsm_segment_handle(seg);
}
/*
* Attach to a per-session DSM segment provided by a parallel leader.
*/
void
AttachSession(dsm_handle handle)
{
dsm_segment *seg;
shm_toc *toc;
void *dsa_space;
void *typmod_registry_space;
dsa_area *dsa;
MemoryContext old_context;
old_context = MemoryContextSwitchTo(TopMemoryContext);
/* Attach to the DSM segment. */
seg = dsm_attach(handle);
if (seg == NULL)
elog(ERROR, "could not attach to per-session DSM segment");
toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg));
/* Attach to the DSA area. */
dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false);
dsa = dsa_attach_in_place(dsa_space, seg);
/* Make them available via the current session. */
CurrentSession->segment = seg;
CurrentSession->area = dsa;
/* Attach to the shared record typmod registry. */
typmod_registry_space =
shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false);
SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
typmod_registry_space);
/* Remain attached until end of backend or DetachSession(). */
dsm_pin_mapping(seg);
dsa_pin_mapping(dsa);
MemoryContextSwitchTo(old_context);
}
/*
* Detach from the current session DSM segment. It's not strictly necessary
* to do this explicitly since we'll detach automatically at backend exit, but
* if we ever reuse parallel workers it will become important for workers to
* detach from one session before attaching to another. Note that this runs
* detach hooks.
*/
void
DetachSession(void)
{
/* Runs detach hooks. */
dsm_detach(CurrentSession->segment);
CurrentSession->segment = NULL;
dsa_detach(CurrentSession->area);
CurrentSession->area = NULL;
}
......@@ -184,6 +184,22 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
return desc;
}
/*
* TupleDescCopy
* Copy a tuple descriptor into caller-supplied memory.
* The memory may be shared memory mapped at any address, and must
* be sufficient to hold TupleDescSize(src) bytes.
*
* !!! Constraints and defaults are not copied !!!
*/
void
TupleDescCopy(TupleDesc dst, TupleDesc src)
{
memcpy(dst, src, TupleDescSize(src));
dst->constr = NULL;
dst->tdrefcount = -1;
}
/*
* TupleDescCopyEntry
* This function copies a single attribute structure from one tuple
......
......@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/parallel.h"
#include "access/session.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
......@@ -36,6 +37,7 @@
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
#include "utils/typcache.h"
/*
......@@ -51,8 +53,9 @@
#define PARALLEL_MAGIC 0x50477c7c
/*
* Magic numbers for parallel state sharing. Higher-level code should use
* smaller values, leaving these very large ones for use by this module.
* Magic numbers for per-context parallel state sharing. Higher-level code
* should use smaller values, leaving these very large ones for use by this
* module.
*/
#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
......@@ -63,6 +66,7 @@
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
......@@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size segsize = 0;
int i;
FixedParallelState *fps;
dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
Snapshot transaction_snapshot = GetTransactionSnapshot();
Snapshot active_snapshot = GetActiveSnapshot();
......@@ -211,6 +216,21 @@ InitializeParallelDSM(ParallelContext *pcxt)
* Normally, the user will have requested at least one worker process, but
* if by chance they have not, we can skip a bunch of things here.
*/
if (pcxt->nworkers > 0)
{
/* Get (or create) the per-session DSM segment's handle. */
session_dsm_handle = GetSessionDsmHandle();
/*
* If we weren't able to create a per-session DSM segment, then we can
* continue but we can't safely launch any workers because their
* record typmods would be incompatible so they couldn't exchange
* tuples.
*/
if (session_dsm_handle == DSM_HANDLE_INVALID)
pcxt->nworkers = 0;
}
if (pcxt->nworkers > 0)
{
/* Estimate space for various kinds of state sharing. */
......@@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
/* If you add more chunks here, you probably need to add keys. */
shm_toc_estimate_keys(&pcxt->estimator, 6);
shm_toc_estimate_keys(&pcxt->estimator, 7);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
......@@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
Size lnamelen;
......@@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeSnapshot(active_snapshot, asnapspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
/* Provide the handle for per-session segment. */
session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
sizeof(dsm_handle));
*(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
session_dsm_handle_space);
/* Serialize transaction state. */
tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
SerializeTransactionState(tstatelen, tstatespace);
......@@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg)
char *asnapspace;
char *tstatespace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
/* Set flag to indicate that we're initializing a parallel worker. */
InitializingParallelWorker = true;
......@@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg)
combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
RestoreComboCIDState(combocidspace);
/* Attach to the per-session DSM segment and contained objects. */
session_dsm_handle_space =
shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
AttachSession(*(dsm_handle *) session_dsm_handle_space);
/* Restore transaction snapshot. */
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
......@@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg)
/* Shut down the parallel-worker transaction. */
EndParallelWorkerTransaction();
/* Detach from the per-session DSM segment. */
DetachSession();
/* Report success. */
pq_putmessage('X', NULL, 0);
}
......
......@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
LWLockTranchesAllocated = 64;
LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
......@@ -510,6 +510,12 @@ RegisterLWLockTranches(void)
"predicate_lock_manager");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
LWLockRegisterTranche(LWTRANCHE_SESSION_DSA,
"session_dsa");
LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE,
"session_record_table");
LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
"session_typmod_table");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
/* Register named tranches. */
......
This diff is collapsed.
......@@ -21,6 +21,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/session.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/xlog.h"
......@@ -1027,6 +1028,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
/* initialize client encoding */
InitializeClientEncoding();
/* Initialize this backend's session state. */
InitializeSession();
/* report this backend in the PgBackendStatus array */
if (!bootstrap)
pgstat_bestart();
......
/*-------------------------------------------------------------------------
*
* session.h
* Encapsulation of user session.
*
* Copyright (c) 2017, PostgreSQL Global Development Group
*
* src/include/access/session.h
*
*-------------------------------------------------------------------------
*/
#ifndef SESSION_H
#define SESSION_H
#include "lib/dshash.h"
/* Defined in typcache.c */
typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry;
/*
* A struct encapsulating some elements of a user's session. For now this
* manages state that applies to parallel query, but it principle it could
* include other things that are currently global variables.
*/
typedef struct Session
{
dsm_segment *segment; /* The session-scoped DSM segment. */
dsa_area *area; /* The session-scoped DSA area. */
/* State managed by typcache.c. */
SharedRecordTypmodRegistry *shared_typmod_registry;
dshash_table *shared_record_table;
dshash_table *shared_typmod_table;
} Session;
extern void InitializeSession(void);
extern dsm_handle GetSessionDsmHandle(void);
extern void AttachSession(dsm_handle handle);
extern void DetachSession(void);
/* The current session, or NULL for none. */
extern Session *CurrentSession;
#endif /* SESSION_H */
......@@ -92,6 +92,12 @@ extern TupleDesc CreateTupleDescCopy(TupleDesc tupdesc);
extern TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc);
#define TupleDescSize(src) \
(offsetof(struct tupleDesc, attrs) + \
(src)->natts * sizeof(FormData_pg_attribute))
extern void TupleDescCopy(TupleDesc dst, TupleDesc src);
extern void TupleDescCopyEntry(TupleDesc dst, AttrNumber dstAttno,
TupleDesc src, AttrNumber srcAttno);
......
......@@ -212,6 +212,9 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_SESSION_DSA,
LWTRANCHE_SESSION_RECORD_TABLE,
LWTRANCHE_SESSION_TYPMOD_TABLE,
LWTRANCHE_TBM,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
......
......@@ -18,6 +18,8 @@
#include "access/tupdesc.h"
#include "fmgr.h"
#include "storage/dsm.h"
#include "utils/dsa.h"
/* DomainConstraintCache is an opaque struct known only within typcache.c */
......@@ -143,6 +145,7 @@ typedef struct DomainConstraintRef
MemoryContextCallback callback; /* used to release refcount when done */
} DomainConstraintRef;
typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry;
extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags);
......@@ -164,4 +167,11 @@ extern void assign_record_type_typmod(TupleDesc tupDesc);
extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
extern size_t SharedRecordTypmodRegistryEstimate(void);
extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
dsm_segment *segment, dsa_area *area);
extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *);
#endif /* TYPCACHE_H */
......@@ -2016,6 +2016,10 @@ SharedInvalRelmapMsg
SharedInvalSmgrMsg
SharedInvalSnapshotMsg
SharedInvalidationMessage
SharedRecordTableKey
SharedRecordTableEntry
SharedRecordTypmodRegistry
SharedTypmodTableEntry
ShellTypeInfo
ShippableCacheEntry
ShippableCacheKey
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment