Commit a9ed875f authored by Tom Lane's avatar Tom Lane

Code review for tqueue.c: fix memory leaks, speed it up, other fixes.

When doing record typmod remapping, tqueue.c did fresh catalog lookups
for each tuple it processed, which was pretty horrible performance-wise
(it seemed to about halve the already none-too-quick speed of bulk reads
in parallel mode).  Worse, it insisted on putting bits of that data into
TopMemoryContext, from where it never freed them, causing a
session-lifespan memory leak.  (I suppose this was coded with the idea
that the sender process would quit after finishing the query ---
but the receiver uses the same code.)

Restructure to avoid repetitive catalog lookups and to keep that data
in a query-lifespan context, in or below the context where the
TQueueDestReceiver or TupleQueueReader itself lives.

Fix some other bugs such as continuing to use a tupledesc after
releasing our refcount on it.  Clean up cavalier datatype choices
(typmods are int32, please, not int, and certainly not Oid).  Improve
comments and error message wording.
parent f9e439b1
...@@ -3,16 +3,25 @@ ...@@ -3,16 +3,25 @@
* tqueue.c * tqueue.c
* Use shm_mq to send & receive tuples between parallel backends * Use shm_mq to send & receive tuples between parallel backends
* *
* Most of the complexity in this module arises from transient RECORD types,
* which all have type RECORDOID and are distinguished by typmod numbers
* that are managed per-backend (see src/backend/utils/cache/typcache.c).
* The sender's set of RECORD typmod assignments probably doesn't match the
* receiver's. To deal with this, we make the sender send a description
* of each transient RECORD type appearing in the data it sends. The
* receiver finds or creates a matching type in its own typcache, and then
* maps the sender's typmod for that type to its own typmod.
*
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
* under the hood, writes tuples from the executor to a shm_mq. If * under the hood, writes tuples from the executor to a shm_mq. If
* necessary, it also writes control messages describing transient * necessary, it also writes control messages describing transient
* record types used within the tuple. * record types used within the tuple.
* *
* A TupleQueueReader reads tuples, and if any are sent control messages, * A TupleQueueReader reads tuples, and control messages if any are sent,
* from a shm_mq and returns the tuples. If transient record types are * from a shm_mq and returns the tuples. If transient record types are
* in use, it registers those types based on the received control messages * in use, it registers those types locally based on the control messages
* and rewrites the typemods sent by the remote side to the corresponding * and rewrites the typmods sent by the remote side to the corresponding
* local record typemods. * local record typmods.
* *
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
...@@ -38,82 +47,178 @@ ...@@ -38,82 +47,178 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/typcache.h" #include "utils/typcache.h"
/*
* The data transferred through the shm_mq is divided into messages.
* One-byte messages are mode-switch messages, telling the receiver to switch
* between "control" and "data" modes. (We always start up in "data" mode.)
* Otherwise, when in "data" mode, each message is a tuple. When in "control"
* mode, each message defines one transient-typmod-to-tupledesc mapping to
* let us interpret future tuples. Both of those cases certainly require
* more than one byte, so no confusion is possible.
*/
#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
#define TUPLE_QUEUE_MODE_DATA 'd'
/*
* Both the sender and receiver build trees of TupleRemapInfo nodes to help
* them identify which (sub) fields of transmitted tuples are composite and
* may thus need remap processing. We might need to look within arrays and
* ranges, not only composites, to find composite sub-fields. A NULL
* TupleRemapInfo pointer indicates that it is known that the described field
* is not composite and has no composite substructure.
*
* Note that we currently have to look at each composite field at runtime,
* even if we believe it's of a named composite type (i.e., not RECORD).
* This is because we allow the actual value to be a compatible transient
* RECORD type. That's grossly inefficient, and it would be good to get
* rid of the requirement, but it's not clear what would need to change.
*
* Also, we allow the top-level tuple structure, as well as the actual
* structure of composite subfields, to change from one tuple to the next
* at runtime. This may well be entirely historical, but it's mostly free
* to support given the previous requirement; and other places in the system
* also permit this, so it's not entirely clear if we could drop it.
*/
typedef enum typedef enum
{ {
TQUEUE_REMAP_NONE, /* no special processing required */
TQUEUE_REMAP_ARRAY, /* array */ TQUEUE_REMAP_ARRAY, /* array */
TQUEUE_REMAP_RANGE, /* range */ TQUEUE_REMAP_RANGE, /* range */
TQUEUE_REMAP_RECORD /* composite type, named or anonymous */ TQUEUE_REMAP_RECORD /* composite type, named or transient */
} RemapClass; } TupleRemapClass;
typedef struct typedef struct TupleRemapInfo TupleRemapInfo;
typedef struct ArrayRemapInfo
{ {
int natts; int16 typlen; /* array element type's storage properties */
RemapClass mapping[FLEXIBLE_ARRAY_MEMBER]; bool typbyval;
} RemapInfo; char typalign;
TupleRemapInfo *element_remap; /* array element type's remap info */
} ArrayRemapInfo;
typedef struct typedef struct RangeRemapInfo
{ {
DestReceiver pub; TypeCacheEntry *typcache; /* range type's typcache entry */
shm_mq_handle *handle; TupleRemapInfo *bound_remap; /* range bound type's remap info */
MemoryContext tmpcontext; } RangeRemapInfo;
HTAB *recordhtab;
char mode; typedef struct RecordRemapInfo
TupleDesc tupledesc; {
RemapInfo *remapinfo; /* Original (remote) type ID info last seen for this composite field */
Oid rectypid;
int32 rectypmod;
/* Local RECORD typmod, or -1 if unset; not used on sender side */
int32 localtypmod;
/* If no fields of the record require remapping, these are NULL: */
TupleDesc tupledesc; /* copy of record's tupdesc */
TupleRemapInfo **field_remap; /* each field's remap info */
} RecordRemapInfo;
struct TupleRemapInfo
{
TupleRemapClass remapclass;
union
{
ArrayRemapInfo arr;
RangeRemapInfo rng;
RecordRemapInfo rec;
} u;
};
/*
* DestReceiver object's private contents
*
* queue and tupledesc are pointers to data supplied by DestReceiver's caller.
* The recordhtab and remap info are owned by the DestReceiver and are kept
* in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
* created while traversing each tuple to find record subfields.
*/
typedef struct TQueueDestReceiver
{
DestReceiver pub; /* public fields */
shm_mq_handle *queue; /* shm_mq to send to */
MemoryContext mycontext; /* context containing TQueueDestReceiver */
MemoryContext tmpcontext; /* per-tuple context, if needed */
HTAB *recordhtab; /* table of transmitted typmods, if needed */
char mode; /* current message mode */
TupleDesc tupledesc; /* current top-level tuple descriptor */
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
} TQueueDestReceiver; } TQueueDestReceiver;
typedef struct RecordTypemodMap /*
* Hash table entries for mapping remote to local typmods.
*/
typedef struct RecordTypmodMap
{ {
int remotetypmod; int32 remotetypmod; /* hash key (must be first!) */
int localtypmod; int32 localtypmod;
} RecordTypemodMap; } RecordTypmodMap;
/*
* TupleQueueReader object's private contents
*
* queue and tupledesc are pointers to data supplied by reader's caller.
* The typmodmap and remap info are owned by the TupleQueueReader and
* are kept in mycontext.
*
* "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
*/
struct TupleQueueReader struct TupleQueueReader
{ {
shm_mq_handle *queue; shm_mq_handle *queue; /* shm_mq to receive from */
char mode; MemoryContext mycontext; /* context containing TupleQueueReader */
TupleDesc tupledesc; HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
RemapInfo *remapinfo; char mode; /* current message mode */
HTAB *typmodmap; TupleDesc tupledesc; /* current top-level tuple descriptor */
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
}; };
#define TUPLE_QUEUE_MODE_CONTROL 'c' /* Local function prototypes */
#define TUPLE_QUEUE_MODE_DATA 'd' static void TQExamine(TQueueDestReceiver *tqueue,
TupleRemapInfo *remapinfo,
static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value);
Datum value); static void TQExamineArray(TQueueDestReceiver *tqueue,
static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value); ArrayRemapInfo *remapinfo,
static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value); Datum value);
static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value); static void TQExamineRange(TQueueDestReceiver *tqueue,
static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, RangeRemapInfo *remapinfo,
TupleDesc tupledesc); Datum value);
static void TQExamineRecord(TQueueDestReceiver *tqueue,
RecordRemapInfo *remapinfo,
Datum value);
static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
TupleDesc tupledesc);
static void TupleQueueHandleControlMessage(TupleQueueReader *reader, static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
Size nbytes, char *data); Size nbytes, char *data);
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
Size nbytes, HeapTupleHeader data); Size nbytes, HeapTupleHeader data);
static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader, static HeapTuple TQRemapTuple(TupleQueueReader *reader,
TupleDesc tupledesc, RemapInfo *remapinfo, TupleDesc tupledesc,
HeapTuple tuple); TupleRemapInfo **field_remapinfo,
static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, HeapTuple tuple);
Datum value); static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value); Datum value, bool *changed);
static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value); static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value); Datum value, bool *changed);
static RemapClass GetRemapClass(Oid typeid); static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); Datum value, bool *changed);
static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
Datum value, bool *changed);
static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
MemoryContext mycontext);
static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
MemoryContext mycontext);
static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
MemoryContext mycontext);
/* /*
* Receive a tuple. * Receive a tuple from a query, and send it to the designated shm_mq.
* *
* This is, at core, pretty simple: just send the tuple to the designated * Returns TRUE if successful, FALSE if shm_mq has been detached.
* shm_mq. The complicated part is that if the tuple contains transient
* record types (see lookup_rowtype_tupdesc), we need to send control
* information to the shm_mq receiver so that those typemods can be correctly
* interpreted, as they are merely held in a backend-local cache. Worse, the
* record type may not at the top level: we could have a range over an array
* type over a range type over a range type over an array type over a record,
* or something like that.
*/ */
static bool static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
...@@ -124,43 +229,49 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) ...@@ -124,43 +229,49 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
shm_mq_result result; shm_mq_result result;
/* /*
* Test to see whether the tupledesc has changed; if so, set up for the * If first time through, compute remapping info for the top-level fields.
* new tupledesc. This is a strange test both because the executor really * On later calls, if the tupledesc has changed, set up for the new
* tupledesc. (This is a strange test both because the executor really
* shouldn't change the tupledesc, and also because it would be unsafe if * shouldn't change the tupledesc, and also because it would be unsafe if
* the old tupledesc could be freed and a new one allocated at the same * the old tupledesc could be freed and a new one allocated at the same
* address. But since some very old code in printtup.c uses a similar * address. But since some very old code in printtup.c uses a similar
* test, we adopt it here as well. * approach, we adopt it here as well.)
*
* Here and elsewhere in this module, when replacing remapping info we
* pfree the top-level object because that's easy, but we don't bother to
* recursively free any substructure. This would lead to query-lifespan
* memory leaks if the mapping info actually changed frequently, but since
* we don't expect that to happen, it doesn't seem worth expending code to
* prevent it.
*/ */
if (tqueue->tupledesc != tupledesc) if (tqueue->tupledesc != tupledesc)
{ {
if (tqueue->remapinfo != NULL) /* Is it worth trying to free substructure of the remap tree? */
pfree(tqueue->remapinfo); if (tqueue->field_remapinfo != NULL)
tqueue->remapinfo = BuildRemapInfo(tupledesc); pfree(tqueue->field_remapinfo);
tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
tqueue->mycontext);
tqueue->tupledesc = tupledesc; tqueue->tupledesc = tupledesc;
} }
tuple = ExecMaterializeSlot(slot);
/* /*
* When, because of the types being transmitted, no record typemod mapping * When, because of the types being transmitted, no record typmod mapping
* can be needed, we can skip a good deal of work. * can be needed, we can skip a good deal of work.
*/ */
if (tqueue->remapinfo != NULL) if (tqueue->field_remapinfo != NULL)
{ {
RemapInfo *remapinfo = tqueue->remapinfo; TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
AttrNumber i; int i;
MemoryContext oldcontext = NULL; MemoryContext oldcontext = NULL;
/* Deform the tuple so we can examine it, if not done already. */ /* Deform the tuple so we can examine fields, if not done already. */
slot_getallattrs(slot); slot_getallattrs(slot);
/* Iterate over each attribute and search it for transient typemods. */ /* Iterate over each attribute and search it for transient typmods. */
Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts); for (i = 0; i < tupledesc->natts; i++)
for (i = 0; i < remapinfo->natts; ++i)
{ {
/* Ignore nulls and types that don't need special handling. */ /* Ignore nulls and types that don't need special handling. */
if (slot->tts_isnull[i] || if (slot->tts_isnull[i] || remapinfo[i] == NULL)
remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
continue; continue;
/* Switch to temporary memory context to avoid leaking. */ /* Switch to temporary memory context to avoid leaking. */
...@@ -168,16 +279,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) ...@@ -168,16 +279,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{ {
if (tqueue->tmpcontext == NULL) if (tqueue->tmpcontext == NULL)
tqueue->tmpcontext = tqueue->tmpcontext =
AllocSetContextCreate(TopMemoryContext, AllocSetContextCreate(tqueue->mycontext,
"tqueue temporary context", "tqueue sender temp context",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext); oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
} }
/* Invoke the appropriate walker function. */ /* Examine the value. */
tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]); TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
} }
/* If we used the temp context, reset it and restore prior context. */ /* If we used the temp context, reset it and restore prior context. */
...@@ -191,217 +302,232 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) ...@@ -191,217 +302,232 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA) if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
{ {
tqueue->mode = TUPLE_QUEUE_MODE_DATA; tqueue->mode = TUPLE_QUEUE_MODE_DATA;
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
} }
} }
/* Send the tuple itself. */ /* Send the tuple itself. */
result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); tuple = ExecMaterializeSlot(slot);
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
/* Check for failure. */
if (result == SHM_MQ_DETACHED) if (result == SHM_MQ_DETACHED)
return false; return false;
else if (result != SHM_MQ_SUCCESS) else if (result != SHM_MQ_SUCCESS)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send tuples"))); errmsg("could not send tuple to shared-memory queue")));
return true; return true;
} }
/* /*
* Invoke the appropriate walker function based on the given RemapClass. * Examine the given datum and send any necessary control messages for
* transient record types contained in it.
*
* remapinfo is previously-computed remapping info about the datum's type.
*
* This function just dispatches based on the remap class.
*/ */
static void static void
tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value) TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
{ {
/* This is recursive, so it could be driven to stack overflow. */
check_stack_depth(); check_stack_depth();
switch (walktype) switch (remapinfo->remapclass)
{ {
case TQUEUE_REMAP_NONE:
break;
case TQUEUE_REMAP_ARRAY: case TQUEUE_REMAP_ARRAY:
tqueueWalkArray(tqueue, value); TQExamineArray(tqueue, &remapinfo->u.arr, value);
break; break;
case TQUEUE_REMAP_RANGE: case TQUEUE_REMAP_RANGE:
tqueueWalkRange(tqueue, value); TQExamineRange(tqueue, &remapinfo->u.rng, value);
break; break;
case TQUEUE_REMAP_RECORD: case TQUEUE_REMAP_RECORD:
tqueueWalkRecord(tqueue, value); TQExamineRecord(tqueue, &remapinfo->u.rec, value);
break; break;
} }
} }
/* /*
* Walk a record and send control messages for transient record types * Examine a record datum and send any necessary control messages for
* contained therein. * transient record types contained in it.
*/ */
static void static void
tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value) TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
Datum value)
{ {
HeapTupleHeader tup; HeapTupleHeader tup;
Oid typeid; Oid typid;
Oid typmod; int32 typmod;
TupleDesc tupledesc; TupleDesc tupledesc;
RemapInfo *remapinfo;
/* Extract typmod from tuple. */ /* Extract type OID and typmod from tuple. */
tup = DatumGetHeapTupleHeader(value); tup = DatumGetHeapTupleHeader(value);
typeid = HeapTupleHeaderGetTypeId(tup); typid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup); typmod = HeapTupleHeaderGetTypMod(tup);
/* Look up tuple descriptor in typecache. */
tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
/* /*
* If this is a transient record time, send its TupleDesc as a control * If first time through, or if this isn't the same composite type as last
* message. (tqueueSendTypemodInfo is smart enough to do this only once * time, consider sending a control message, and then look up the
* per typmod.) * necessary information for examining the fields.
*/ */
if (typeid == RECORDOID) if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
tqueueSendTypmodInfo(tqueue, typmod, tupledesc); {
/* Free any old data. */
if (remapinfo->tupledesc != NULL)
FreeTupleDesc(remapinfo->tupledesc);
/* Is it worth trying to free substructure of the remap tree? */
if (remapinfo->field_remap != NULL)
pfree(remapinfo->field_remap);
/* /* Look up tuple descriptor in typcache. */
* Build the remap information for this tupledesc. We might want to think tupledesc = lookup_rowtype_tupdesc(typid, typmod);
* about keeping a cache of this information keyed by typeid and typemod,
* but let's keep it simple for now. /*
*/ * If this is a transient record type, send the tupledesc in a control
remapinfo = BuildRemapInfo(tupledesc); * message. (TQSendRecordInfo is smart enough to do this only once
* per typmod.)
*/
if (typid == RECORDOID)
TQSendRecordInfo(tqueue, typmod, tupledesc);
/* Figure out whether fields need recursive processing. */
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
tqueue->mycontext);
if (remapinfo->field_remap != NULL)
{
/*
* We need to inspect the record contents, so save a copy of the
* tupdesc. (We could possibly just reference the typcache's
* copy, but then it's problematic when to release the refcount.)
*/
MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
MemoryContextSwitchTo(oldcontext);
}
else
{
/* No fields of the record require remapping. */
remapinfo->tupledesc = NULL;
}
remapinfo->rectypid = typid;
remapinfo->rectypmod = typmod;
/* Release reference count acquired by lookup_rowtype_tupdesc. */
DecrTupleDescRefCount(tupledesc);
}
/* /*
* If remapping is required, deform the tuple and process each field. When * If field remapping is required, deform the tuple and examine each
* BuildRemapInfo is null, the data types are such that there can be no * field.
* transient record types here, so we can skip all this work.
*/ */
if (remapinfo != NULL) if (remapinfo->field_remap != NULL)
{ {
Datum *values; Datum *values;
bool *isnull; bool *isnull;
HeapTupleData tdata; HeapTupleData tdata;
AttrNumber i; int i;
/* Deform the tuple so we can check each column within. */ /* Deform the tuple so we can check each column within. */
values = palloc(tupledesc->natts * sizeof(Datum)); tupledesc = remapinfo->tupledesc;
isnull = palloc(tupledesc->natts * sizeof(bool)); values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
tdata.t_len = HeapTupleHeaderGetDatumLength(tup); tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
ItemPointerSetInvalid(&(tdata.t_self)); ItemPointerSetInvalid(&(tdata.t_self));
tdata.t_tableOid = InvalidOid; tdata.t_tableOid = InvalidOid;
tdata.t_data = tup; tdata.t_data = tup;
heap_deform_tuple(&tdata, tupledesc, values, isnull); heap_deform_tuple(&tdata, tupledesc, values, isnull);
/* Recursively check each non-NULL attribute. */ /* Recursively check each interesting non-NULL attribute. */
for (i = 0; i < tupledesc->natts; ++i) for (i = 0; i < tupledesc->natts; i++)
if (!isnull[i]) {
tqueueWalk(tqueue, remapinfo->mapping[i], values[i]); if (!isnull[i] && remapinfo->field_remap[i])
} TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
}
/* Release reference count acquired by lookup_rowtype_tupdesc. */ /* Need not clean up, since we're in a short-lived context. */
DecrTupleDescRefCount(tupledesc); }
} }
/* /*
* Walk a record and send control messages for transient record types * Examine an array datum and send any necessary control messages for
* contained therein. * transient record types contained in it.
*/ */
static void static void
tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value) TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
Datum value)
{ {
ArrayType *arr = DatumGetArrayTypeP(value); ArrayType *arr = DatumGetArrayTypeP(value);
Oid typeid = ARR_ELEMTYPE(arr); Oid typid = ARR_ELEMTYPE(arr);
RemapClass remapclass;
int16 typlen;
bool typbyval;
char typalign;
Datum *elem_values; Datum *elem_values;
bool *elem_nulls; bool *elem_nulls;
int num_elems; int num_elems;
int i; int i;
remapclass = GetRemapClass(typeid);
/*
* If the elements of the array don't need to be walked, we shouldn't have
* been called in the first place: GetRemapClass should have returned NULL
* when asked about this array type.
*/
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Deconstruct the array. */ /* Deconstruct the array. */
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); deconstruct_array(arr, typid, remapinfo->typlen,
deconstruct_array(arr, typeid, typlen, typbyval, typalign, remapinfo->typbyval, remapinfo->typalign,
&elem_values, &elem_nulls, &num_elems); &elem_values, &elem_nulls, &num_elems);
/* Walk each element. */ /* Examine each element. */
for (i = 0; i < num_elems; ++i) for (i = 0; i < num_elems; i++)
{
if (!elem_nulls[i]) if (!elem_nulls[i])
tqueueWalk(tqueue, remapclass, elem_values[i]); TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
}
} }
/* /*
* Walk a range type and send control messages for transient record types * Examine a range datum and send any necessary control messages for
* contained therein. * transient record types contained in it.
*/ */
static void static void
tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value) TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
Datum value)
{ {
RangeType *range = DatumGetRangeType(value); RangeType *range = DatumGetRangeType(value);
Oid typeid = RangeTypeGetOid(range);
RemapClass remapclass;
TypeCacheEntry *typcache;
RangeBound lower; RangeBound lower;
RangeBound upper; RangeBound upper;
bool empty; bool empty;
/* /* Extract the lower and upper bounds. */
* Extract the lower and upper bounds. It might be worth implementing range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
* some caching scheme here so that we don't look up the same typeids in
* the type cache repeatedly, but for now let's keep it simple.
*/
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
if (typcache->rngelemtype == NULL)
elog(ERROR, "type %u is not a range type", typeid);
range_deserialize(typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */ /* Nothing to do for an empty range. */
if (empty) if (empty)
return; return;
/* /* Examine each bound, if present. */
* If the range bounds don't need to be walked, we shouldn't have been
* called in the first place: GetRemapClass should have returned NULL when
* asked about this range type.
*/
remapclass = GetRemapClass(typcache->rngelemtype->type_id);
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Walk each bound, if present. */
if (!upper.infinite) if (!upper.infinite)
tqueueWalk(tqueue, remapclass, upper.val); TQExamine(tqueue, remapinfo->bound_remap, upper.val);
if (!lower.infinite) if (!lower.infinite)
tqueueWalk(tqueue, remapclass, lower.val); TQExamine(tqueue, remapinfo->bound_remap, lower.val);
} }
/* /*
* Send tuple descriptor information for a transient typemod, unless we've * Send tuple descriptor information for a transient typmod, unless we've
* already done so previously. * already done so previously.
*/ */
static void static void
tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
TupleDesc tupledesc)
{ {
StringInfoData buf; StringInfoData buf;
bool found; bool found;
AttrNumber i; int i;
/* Initialize hash table if not done yet. */ /* Initialize hash table if not done yet. */
if (tqueue->recordhtab == NULL) if (tqueue->recordhtab == NULL)
{ {
HASHCTL ctl; HASHCTL ctl;
ctl.keysize = sizeof(int); MemSet(&ctl, 0, sizeof(ctl));
ctl.entrysize = sizeof(int); /* Hash table entries are just typmods */
ctl.hcxt = TopMemoryContext; ctl.keysize = sizeof(int32);
tqueue->recordhtab = hash_create("tqueue record hashtable", ctl.entrysize = sizeof(int32);
ctl.hcxt = tqueue->mycontext;
tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
100, &ctl, 100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
} }
...@@ -411,25 +537,30 @@ tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, ...@@ -411,25 +537,30 @@ tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
if (found) if (found)
return; return;
elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
/* If message queue is in data mode, switch to control mode. */ /* If message queue is in data mode, switch to control mode. */
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL) if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
{ {
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL; tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
} }
/* Assemble a control message. */ /* Assemble a control message. */
initStringInfo(&buf); initStringInfo(&buf);
appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int)); appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int)); appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
sizeof(bool)); for (i = 0; i < tupledesc->natts; i++)
for (i = 0; i < tupledesc->natts; ++i) {
appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i], appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
sizeof(FormData_pg_attribute)); sizeof(FormData_pg_attribute));
}
/* Send control message. */ /* Send control message. */
shm_mq_send(tqueue->handle, buf.len, buf.data, false); shm_mq_send(tqueue->queue, buf.len, buf.data, false);
/* We assume it's OK to leak buf because we're in a short-lived context. */
} }
/* /*
...@@ -449,7 +580,7 @@ tqueueShutdownReceiver(DestReceiver *self) ...@@ -449,7 +580,7 @@ tqueueShutdownReceiver(DestReceiver *self)
{ {
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
shm_mq_detach(shm_mq_get_queue(tqueue->handle)); shm_mq_detach(shm_mq_get_queue(tqueue->queue));
} }
/* /*
...@@ -464,8 +595,9 @@ tqueueDestroyReceiver(DestReceiver *self) ...@@ -464,8 +595,9 @@ tqueueDestroyReceiver(DestReceiver *self)
MemoryContextDelete(tqueue->tmpcontext); MemoryContextDelete(tqueue->tmpcontext);
if (tqueue->recordhtab != NULL) if (tqueue->recordhtab != NULL)
hash_destroy(tqueue->recordhtab); hash_destroy(tqueue->recordhtab);
if (tqueue->remapinfo != NULL) /* Is it worth trying to free substructure of the remap tree? */
pfree(tqueue->remapinfo); if (tqueue->field_remapinfo != NULL)
pfree(tqueue->field_remapinfo);
pfree(self); pfree(self);
} }
...@@ -484,11 +616,14 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle) ...@@ -484,11 +616,14 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
self->pub.rShutdown = tqueueShutdownReceiver; self->pub.rShutdown = tqueueShutdownReceiver;
self->pub.rDestroy = tqueueDestroyReceiver; self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue; self->pub.mydest = DestTupleQueue;
self->handle = handle; self->queue = handle;
self->mycontext = CurrentMemoryContext;
self->tmpcontext = NULL; self->tmpcontext = NULL;
self->recordhtab = NULL; self->recordhtab = NULL;
self->mode = TUPLE_QUEUE_MODE_DATA; self->mode = TUPLE_QUEUE_MODE_DATA;
self->remapinfo = NULL; /* Top-level tupledesc is not known yet */
self->tupledesc = NULL;
self->field_remapinfo = NULL;
return (DestReceiver *) self; return (DestReceiver *) self;
} }
...@@ -502,9 +637,11 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) ...@@ -502,9 +637,11 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
reader->queue = handle; reader->queue = handle;
reader->mycontext = CurrentMemoryContext;
reader->typmodmap = NULL;
reader->mode = TUPLE_QUEUE_MODE_DATA; reader->mode = TUPLE_QUEUE_MODE_DATA;
reader->tupledesc = tupledesc; reader->tupledesc = tupledesc;
reader->remapinfo = BuildRemapInfo(tupledesc); reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
return reader; return reader;
} }
...@@ -516,8 +653,11 @@ void ...@@ -516,8 +653,11 @@ void
DestroyTupleQueueReader(TupleQueueReader *reader) DestroyTupleQueueReader(TupleQueueReader *reader)
{ {
shm_mq_detach(shm_mq_get_queue(reader->queue)); shm_mq_detach(shm_mq_get_queue(reader->queue));
if (reader->remapinfo != NULL) if (reader->typmodmap != NULL)
pfree(reader->remapinfo); hash_destroy(reader->typmodmap);
/* Is it worth trying to free substructure of the remap tree? */
if (reader->field_remapinfo != NULL)
pfree(reader->field_remapinfo);
pfree(reader); pfree(reader);
} }
...@@ -567,14 +707,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) ...@@ -567,14 +707,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Assert(result == SHM_MQ_SUCCESS); Assert(result == SHM_MQ_SUCCESS);
/* /*
* OK, we got a message. Process it. * We got a message (see message spec at top of file). Process it.
*
* One-byte messages are mode switch messages, so that we can switch
* between "control" and "data" mode. Otherwise, when in "data" mode,
* each message is a tuple. When in "control" mode, each message
* provides a transient-typmod-to-tupledesc mapping to let us
* interpret future tuples. Both of those cases certainly require
* more than one byte, so no confusion is possible.
*/ */
if (nbytes == 1) if (nbytes == 1)
{ {
...@@ -592,7 +725,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) ...@@ -592,7 +725,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
TupleQueueHandleControlMessage(reader, nbytes, data); TupleQueueHandleControlMessage(reader, nbytes, data);
} }
else else
elog(ERROR, "invalid mode: %d", (int) reader->mode); elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
} }
} }
...@@ -606,220 +739,306 @@ TupleQueueHandleDataMessage(TupleQueueReader *reader, ...@@ -606,220 +739,306 @@ TupleQueueHandleDataMessage(TupleQueueReader *reader,
{ {
HeapTupleData htup; HeapTupleData htup;
/*
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
* (which had better be sufficiently aligned).
*/
ItemPointerSetInvalid(&htup.t_self); ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid; htup.t_tableOid = InvalidOid;
htup.t_len = nbytes; htup.t_len = nbytes;
htup.t_data = data; htup.t_data = data;
return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo, /*
&htup); * Either just copy the data into a regular palloc'd tuple, or remap it,
* as required.
*/
return TQRemapTuple(reader,
reader->tupledesc,
reader->field_remapinfo,
&htup);
} }
/* /*
* Remap tuple typmods per control information received from remote side. * Copy the given tuple, remapping any transient typmods contained in it.
*/ */
static HeapTuple static HeapTuple
TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, TQRemapTuple(TupleQueueReader *reader,
RemapInfo *remapinfo, HeapTuple tuple) TupleDesc tupledesc,
TupleRemapInfo **field_remapinfo,
HeapTuple tuple)
{ {
Datum *values; Datum *values;
bool *isnull; bool *isnull;
bool changed = false;
int i; int i;
/* /*
* If no remapping is necessary, just copy the tuple into a single * If no remapping is necessary, just copy the tuple into a single
* palloc'd chunk, as caller will expect. * palloc'd chunk, as caller will expect.
*/ */
if (remapinfo == NULL) if (field_remapinfo == NULL)
return heap_copytuple(tuple); return heap_copytuple(tuple);
/* Deform tuple so we can remap record typmods for individual attrs. */ /* Deform tuple so we can remap record typmods for individual attrs. */
values = palloc(tupledesc->natts * sizeof(Datum)); values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
isnull = palloc(tupledesc->natts * sizeof(bool)); isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
heap_deform_tuple(tuple, tupledesc, values, isnull); heap_deform_tuple(tuple, tupledesc, values, isnull);
Assert(tupledesc->natts == remapinfo->natts);
/* Recursively check each non-NULL attribute. */ /* Recursively process each interesting non-NULL attribute. */
for (i = 0; i < tupledesc->natts; ++i) for (i = 0; i < tupledesc->natts; i++)
{ {
if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE) if (isnull[i] || field_remapinfo[i] == NULL)
continue; continue;
values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]); values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
} }
/* Reform the modified tuple. */ /* Reconstruct the modified tuple, if anything was modified. */
return heap_form_tuple(tupledesc, values, isnull); if (changed)
return heap_form_tuple(tupledesc, values, isnull);
else
return heap_copytuple(tuple);
} }
/* /*
* Remap a value based on the specified remap class. * Process the given datum and replace any transient record typmods
* contained in it. Set *changed to TRUE if we actually changed the datum.
*
* remapinfo is previously-computed remapping info about the datum's type.
*
* This function just dispatches based on the remap class.
*/ */
static Datum static Datum
TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value) TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
Datum value, bool *changed)
{ {
/* This is recursive, so it could be driven to stack overflow. */
check_stack_depth(); check_stack_depth();
switch (remapclass) switch (remapinfo->remapclass)
{ {
case TQUEUE_REMAP_NONE:
/* caller probably shouldn't have called us at all, but... */
return value;
case TQUEUE_REMAP_ARRAY: case TQUEUE_REMAP_ARRAY:
return TupleQueueRemapArray(reader, value); return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
case TQUEUE_REMAP_RANGE: case TQUEUE_REMAP_RANGE:
return TupleQueueRemapRange(reader, value); return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
case TQUEUE_REMAP_RECORD: case TQUEUE_REMAP_RECORD:
return TupleQueueRemapRecord(reader, value); return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
} }
elog(ERROR, "unknown remap class: %d", (int) remapclass); elog(ERROR, "unrecognized tqueue remap class: %d",
(int) remapinfo->remapclass);
return (Datum) 0; return (Datum) 0;
} }
/* /*
* Remap an array. * Process the given array datum and replace any transient record typmods
* contained in it. Set *changed to TRUE if we actually changed the datum.
*/ */
static Datum static Datum
TupleQueueRemapArray(TupleQueueReader *reader, Datum value) TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
Datum value, bool *changed)
{ {
ArrayType *arr = DatumGetArrayTypeP(value); ArrayType *arr = DatumGetArrayTypeP(value);
Oid typeid = ARR_ELEMTYPE(arr); Oid typid = ARR_ELEMTYPE(arr);
RemapClass remapclass; bool element_changed = false;
int16 typlen;
bool typbyval;
char typalign;
Datum *elem_values; Datum *elem_values;
bool *elem_nulls; bool *elem_nulls;
int num_elems; int num_elems;
int i; int i;
remapclass = GetRemapClass(typeid);
/*
* If the elements of the array don't need to be walked, we shouldn't have
* been called in the first place: GetRemapClass should have returned NULL
* when asked about this array type.
*/
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Deconstruct the array. */ /* Deconstruct the array. */
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); deconstruct_array(arr, typid, remapinfo->typlen,
deconstruct_array(arr, typeid, typlen, typbyval, typalign, remapinfo->typbyval, remapinfo->typalign,
&elem_values, &elem_nulls, &num_elems); &elem_values, &elem_nulls, &num_elems);
/* Remap each element. */ /* Remap each element. */
for (i = 0; i < num_elems; ++i) for (i = 0; i < num_elems; i++)
{
if (!elem_nulls[i]) if (!elem_nulls[i])
elem_values[i] = TupleQueueRemap(reader, remapclass, elem_values[i] = TQRemap(reader,
elem_values[i]); remapinfo->element_remap,
elem_values[i],
/* Reconstruct and return the array. */ &element_changed);
arr = construct_md_array(elem_values, elem_nulls, }
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
typeid, typlen, typbyval, typalign); if (element_changed)
return PointerGetDatum(arr); {
/* Reconstruct and return the array. */
*changed = true;
arr = construct_md_array(elem_values, elem_nulls,
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
typid, remapinfo->typlen,
remapinfo->typbyval, remapinfo->typalign);
return PointerGetDatum(arr);
}
/* Else just return the value as-is. */
return value;
} }
/* /*
* Remap a range type. * Process the given range datum and replace any transient record typmods
* contained in it. Set *changed to TRUE if we actually changed the datum.
*/ */
static Datum static Datum
TupleQueueRemapRange(TupleQueueReader *reader, Datum value) TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
Datum value, bool *changed)
{ {
RangeType *range = DatumGetRangeType(value); RangeType *range = DatumGetRangeType(value);
Oid typeid = RangeTypeGetOid(range); bool bound_changed = false;
RemapClass remapclass;
TypeCacheEntry *typcache;
RangeBound lower; RangeBound lower;
RangeBound upper; RangeBound upper;
bool empty; bool empty;
/* /* Extract the lower and upper bounds. */
* Extract the lower and upper bounds. As in tqueueWalkRange, some range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
* caching might be a good idea here.
*/
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
if (typcache->rngelemtype == NULL)
elog(ERROR, "type %u is not a range type", typeid);
range_deserialize(typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */ /* Nothing to do for an empty range. */
if (empty) if (empty)
return value; return value;
/*
* If the range bounds don't need to be walked, we shouldn't have been
* called in the first place: GetRemapClass should have returned NULL when
* asked about this range type.
*/
remapclass = GetRemapClass(typcache->rngelemtype->type_id);
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Remap each bound, if present. */ /* Remap each bound, if present. */
if (!upper.infinite) if (!upper.infinite)
upper.val = TupleQueueRemap(reader, remapclass, upper.val); upper.val = TQRemap(reader, remapinfo->bound_remap,
upper.val, &bound_changed);
if (!lower.infinite) if (!lower.infinite)
lower.val = TupleQueueRemap(reader, remapclass, lower.val); lower.val = TQRemap(reader, remapinfo->bound_remap,
lower.val, &bound_changed);
/* And reserialize. */ if (bound_changed)
range = range_serialize(typcache, &lower, &upper, empty); {
return RangeTypeGetDatum(range); /* Reserialize. */
*changed = true;
range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
return RangeTypeGetDatum(range);
}
/* Else just return the value as-is. */
return value;
} }
/* /*
* Remap a record. * Process the given record datum and replace any transient record typmods
* contained in it. Set *changed to TRUE if we actually changed the datum.
*/ */
static Datum static Datum
TupleQueueRemapRecord(TupleQueueReader *reader, Datum value) TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
Datum value, bool *changed)
{ {
HeapTupleHeader tup; HeapTupleHeader tup;
Oid typeid; Oid typid;
int typmod; int32 typmod;
RecordTypemodMap *mapent; bool changed_typmod;
TupleDesc tupledesc; TupleDesc tupledesc;
RemapInfo *remapinfo;
HeapTupleData htup;
HeapTuple atup;
/* Fetch type OID and typemod. */ /* Extract type OID and typmod from tuple. */
tup = DatumGetHeapTupleHeader(value); tup = DatumGetHeapTupleHeader(value);
typeid = HeapTupleHeaderGetTypeId(tup); typid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup); typmod = HeapTupleHeaderGetTypMod(tup);
/*
* If first time through, or if this isn't the same composite type as last
* time, identify the required typmod mapping, and then look up the
* necessary information for processing the fields.
*/
if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
{
/* Free any old data. */
if (remapinfo->tupledesc != NULL)
FreeTupleDesc(remapinfo->tupledesc);
/* Is it worth trying to free substructure of the remap tree? */
if (remapinfo->field_remap != NULL)
pfree(remapinfo->field_remap);
/* If transient record type, look up matching local typmod. */
if (typid == RECORDOID)
{
RecordTypmodMap *mapent;
Assert(reader->typmodmap != NULL);
mapent = hash_search(reader->typmodmap, &typmod,
HASH_FIND, NULL);
if (mapent == NULL)
elog(ERROR, "tqueue received unrecognized remote typmod %d",
typmod);
remapinfo->localtypmod = mapent->localtypmod;
}
else
remapinfo->localtypmod = -1;
/* Look up tuple descriptor in typcache. */
tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
/* Figure out whether fields need recursive processing. */
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
reader->mycontext);
if (remapinfo->field_remap != NULL)
{
/*
* We need to inspect the record contents, so save a copy of the
* tupdesc. (We could possibly just reference the typcache's
* copy, but then it's problematic when to release the refcount.)
*/
MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
MemoryContextSwitchTo(oldcontext);
}
else
{
/* No fields of the record require remapping. */
remapinfo->tupledesc = NULL;
}
remapinfo->rectypid = typid;
remapinfo->rectypmod = typmod;
/* Release reference count acquired by lookup_rowtype_tupdesc. */
DecrTupleDescRefCount(tupledesc);
}
/* If transient record, replace remote typmod with local typmod. */ /* If transient record, replace remote typmod with local typmod. */
if (typeid == RECORDOID) if (typid == RECORDOID && typmod != remapinfo->localtypmod)
{ {
Assert(reader->typmodmap != NULL); typmod = remapinfo->localtypmod;
mapent = hash_search(reader->typmodmap, &typmod, changed_typmod = true;
HASH_FIND, NULL);
if (mapent == NULL)
elog(ERROR, "found unrecognized remote typmod %d", typmod);
typmod = mapent->localtypmod;
} }
else
changed_typmod = false;
/* /*
* Fetch tupledesc and compute remap info. We should probably cache this * If we need to change the typmod, or if there are any potentially
* so that we don't have to keep recomputing it. * remappable fields, replace the tuple.
*/ */
tupledesc = lookup_rowtype_tupdesc(typeid, typmod); if (changed_typmod || remapinfo->field_remap != NULL)
remapinfo = BuildRemapInfo(tupledesc); {
DecrTupleDescRefCount(tupledesc); HeapTupleData htup;
HeapTuple atup;
/* For now, assume we always need to change the tuple in this case. */
*changed = true;
/* Copy tuple, possibly remapping contained fields. */
ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid;
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
htup.t_data = tup;
atup = TQRemapTuple(reader,
remapinfo->tupledesc,
remapinfo->field_remap,
&htup);
/* Apply the correct labeling for a local Datum. */
HeapTupleHeaderSetTypeId(atup->t_data, typid);
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
/* And return the results. */
return HeapTupleHeaderGetDatum(atup->t_data);
}
/* Remap tuple. */ /* Else just return the value as-is. */
ItemPointerSetInvalid(&htup.t_self); return value;
htup.t_tableOid = InvalidOid;
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
htup.t_data = tup;
atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
HeapTupleHeaderSetTypeId(atup->t_data, typeid);
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
/* And return the results. */
return HeapTupleHeaderGetDatum(atup->t_data);
} }
/* /*
...@@ -833,57 +1052,54 @@ static void ...@@ -833,57 +1052,54 @@ static void
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
char *data) char *data)
{ {
int32 remotetypmod;
int natts; int natts;
int remotetypmod;
bool hasoid; bool hasoid;
char *buf = data; Size offset = 0;
int rc = 0;
int i;
Form_pg_attribute *attrs; Form_pg_attribute *attrs;
MemoryContext oldcontext;
TupleDesc tupledesc; TupleDesc tupledesc;
RecordTypemodMap *mapent; RecordTypmodMap *mapent;
bool found; bool found;
int i;
/* Extract remote typmod. */ /* Extract remote typmod. */
memcpy(&remotetypmod, &buf[rc], sizeof(int)); memcpy(&remotetypmod, &data[offset], sizeof(int32));
rc += sizeof(int); offset += sizeof(int32);
/* Extract attribute count. */ /* Extract attribute count. */
memcpy(&natts, &buf[rc], sizeof(int)); memcpy(&natts, &data[offset], sizeof(int));
rc += sizeof(int); offset += sizeof(int);
/* Extract hasoid flag. */ /* Extract hasoid flag. */
memcpy(&hasoid, &buf[rc], sizeof(bool)); memcpy(&hasoid, &data[offset], sizeof(bool));
rc += sizeof(bool); offset += sizeof(bool);
/* Extract attribute details. */ /* Extract attribute details. The tupledesc made here is just transient. */
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
attrs = palloc(natts * sizeof(Form_pg_attribute)); attrs = palloc(natts * sizeof(Form_pg_attribute));
for (i = 0; i < natts; ++i) for (i = 0; i < natts; i++)
{ {
attrs[i] = palloc(sizeof(FormData_pg_attribute)); attrs[i] = palloc(sizeof(FormData_pg_attribute));
memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute)); memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
rc += sizeof(FormData_pg_attribute); offset += sizeof(FormData_pg_attribute);
} }
MemoryContextSwitchTo(oldcontext);
/* We should have read the whole message. */ /* We should have read the whole message. */
Assert(rc == nbytes); Assert(offset == nbytes);
/* Construct TupleDesc. */ /* Construct TupleDesc, and assign a local typmod. */
tupledesc = CreateTupleDesc(natts, hasoid, attrs); tupledesc = CreateTupleDesc(natts, hasoid, attrs);
tupledesc = BlessTupleDesc(tupledesc); tupledesc = BlessTupleDesc(tupledesc);
/* Create map if it doesn't exist already. */ /* Create mapping hashtable if it doesn't exist already. */
if (reader->typmodmap == NULL) if (reader->typmodmap == NULL)
{ {
HASHCTL ctl; HASHCTL ctl;
ctl.keysize = sizeof(int); MemSet(&ctl, 0, sizeof(ctl));
ctl.entrysize = sizeof(RecordTypemodMap); ctl.keysize = sizeof(int32);
ctl.hcxt = CurTransactionContext; ctl.entrysize = sizeof(RecordTypmodMap);
reader->typmodmap = hash_create("typmodmap hashtable", ctl.hcxt = reader->mycontext;
reader->typmodmap = hash_create("tqueue receiver record type hashtable",
100, &ctl, 100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
} }
...@@ -892,139 +1108,171 @@ TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, ...@@ -892,139 +1108,171 @@ TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER, mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
&found); &found);
if (found) if (found)
elog(ERROR, "duplicate message for typmod %d", elog(ERROR, "duplicate tqueue control message for typmod %d",
remotetypmod); remotetypmod);
mapent->localtypmod = tupledesc->tdtypmod; mapent->localtypmod = tupledesc->tdtypmod;
elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
remotetypmod, tupledesc->tdtypmod); elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
remotetypmod, mapent->localtypmod);
} }
/* /*
* Build a mapping indicating what remapping class applies to each attribute * Build remap info for the specified data type, storing it in mycontext.
* described by a tupledesc. * Returns NULL if neither the type nor any subtype could require remapping.
*/ */
static RemapInfo * static TupleRemapInfo *
BuildRemapInfo(TupleDesc tupledesc) BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
{ {
RemapInfo *remapinfo; HeapTuple tup;
Size size; Form_pg_type typ;
AttrNumber i;
bool noop = true; /* This is recursive, so it could be driven to stack overflow. */
check_stack_depth();
size = offsetof(RemapInfo, mapping) + restart:
sizeof(RemapClass) * tupledesc->natts; tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
remapinfo = MemoryContextAllocZero(TopMemoryContext, size); if (!HeapTupleIsValid(tup))
remapinfo->natts = tupledesc->natts; elog(ERROR, "cache lookup failed for type %u", typid);
for (i = 0; i < tupledesc->natts; ++i) typ = (Form_pg_type) GETSTRUCT(tup);
/* Look through domains to underlying base type. */
if (typ->typtype == TYPTYPE_DOMAIN)
{ {
Form_pg_attribute attr = tupledesc->attrs[i]; typid = typ->typbasetype;
ReleaseSysCache(tup);
goto restart;
}
if (attr->attisdropped) /* If it's a true array type, deal with it that way. */
{ if (OidIsValid(typ->typelem) && typ->typlen == -1)
remapinfo->mapping[i] = TQUEUE_REMAP_NONE; {
continue; typid = typ->typelem;
} ReleaseSysCache(tup);
return BuildArrayRemapInfo(typid, mycontext);
}
remapinfo->mapping[i] = GetRemapClass(attr->atttypid); /* Similarly, deal with ranges appropriately. */
if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE) if (typ->typtype == TYPTYPE_RANGE)
noop = false; {
ReleaseSysCache(tup);
return BuildRangeRemapInfo(typid, mycontext);
} }
if (noop) /*
* If it's a composite type (including RECORD), set up for remapping. We
* don't attempt to determine the status of subfields here, since we do
* not have enough information yet; just mark everything invalid.
*/
if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
{ {
pfree(remapinfo); TupleRemapInfo *remapinfo;
remapinfo = NULL;
remapinfo = (TupleRemapInfo *)
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
remapinfo->remapclass = TQUEUE_REMAP_RECORD;
remapinfo->u.rec.rectypid = InvalidOid;
remapinfo->u.rec.rectypmod = -1;
remapinfo->u.rec.localtypmod = -1;
remapinfo->u.rec.tupledesc = NULL;
remapinfo->u.rec.field_remap = NULL;
ReleaseSysCache(tup);
return remapinfo;
} }
/* Nothing else can possibly need remapping attention. */
ReleaseSysCache(tup);
return NULL;
}
static TupleRemapInfo *
BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
{
TupleRemapInfo *remapinfo;
TupleRemapInfo *element_remapinfo;
/* See if element type requires remapping. */
element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
/* If not, the array doesn't either. */
if (element_remapinfo == NULL)
return NULL;
/* OK, set up to remap the array. */
remapinfo = (TupleRemapInfo *)
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
get_typlenbyvalalign(elemtypid,
&remapinfo->u.arr.typlen,
&remapinfo->u.arr.typbyval,
&remapinfo->u.arr.typalign);
remapinfo->u.arr.element_remap = element_remapinfo;
return remapinfo;
}
static TupleRemapInfo *
BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
{
TupleRemapInfo *remapinfo;
TupleRemapInfo *bound_remapinfo;
TypeCacheEntry *typcache;
/*
* Get range info from the typcache. We assume this pointer will stay
* valid for the duration of the query.
*/
typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
if (typcache->rngelemtype == NULL)
elog(ERROR, "type %u is not a range type", rngtypid);
/* See if range bound type requires remapping. */
bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
mycontext);
/* If not, the range doesn't either. */
if (bound_remapinfo == NULL)
return NULL;
/* OK, set up to remap the range. */
remapinfo = (TupleRemapInfo *)
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
remapinfo->remapclass = TQUEUE_REMAP_RANGE;
remapinfo->u.rng.typcache = typcache;
remapinfo->u.rng.bound_remap = bound_remapinfo;
return remapinfo; return remapinfo;
} }
/* /*
* Determine the remap class assocociated with a particular data type. * Build remap info for fields of the type described by the given tupdesc.
* * Returns an array of TupleRemapInfo pointers, or NULL if no field
* Transient record types need to have the typmod applied on the sending side * requires remapping. Data is allocated in mycontext.
* replaced with a value on the receiving side that has the same meaning.
*
* Arrays, range types, and all record types (including named composite types)
* need to searched for transient record values buried within them.
* Surprisingly, a walker is required even when the indicated type is a
* composite type, because the actual value may be a compatible transient
* record type.
*/ */
static RemapClass static TupleRemapInfo **
GetRemapClass(Oid typeid) BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
{ {
RemapClass forceResult = TQUEUE_REMAP_NONE; TupleRemapInfo **remapinfo;
RemapClass innerResult = TQUEUE_REMAP_NONE; bool noop = true;
int i;
for (;;) /* Recursively determine the remapping status of each field. */
remapinfo = (TupleRemapInfo **)
MemoryContextAlloc(mycontext,
tupledesc->natts * sizeof(TupleRemapInfo *));
for (i = 0; i < tupledesc->natts; i++)
{ {
HeapTuple tup; Form_pg_attribute attr = tupledesc->attrs[i];
Form_pg_type typ;
/* Simple cases. */
if (typeid == RECORDOID)
{
innerResult = TQUEUE_REMAP_RECORD;
break;
}
if (typeid == RECORDARRAYOID)
{
innerResult = TQUEUE_REMAP_ARRAY;
break;
}
/* Otherwise, we need a syscache lookup to figure it out. */
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", typeid);
typ = (Form_pg_type) GETSTRUCT(tup);
/* Look through domains to underlying base type. */
if (typ->typtype == TYPTYPE_DOMAIN)
{
typeid = typ->typbasetype;
ReleaseSysCache(tup);
continue;
}
/*
* Look through arrays to underlying base type, but the final return
* value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
* this is an array of integers, for example, we don't need to walk
* it.)
*/
if (OidIsValid(typ->typelem) && typ->typlen == -1)
{
typeid = typ->typelem;
ReleaseSysCache(tup);
if (forceResult == TQUEUE_REMAP_NONE)
forceResult = TQUEUE_REMAP_ARRAY;
continue;
}
/* if (attr->attisdropped)
* Similarly, look through ranges to the underlying base type, but the
* final return value must be either TQUEUE_REMAP_RANGE or
* TQUEUE_REMAP_NONE.
*/
if (typ->typtype == TYPTYPE_RANGE)
{ {
ReleaseSysCache(tup); remapinfo[i] = NULL;
if (forceResult == TQUEUE_REMAP_NONE)
forceResult = TQUEUE_REMAP_RANGE;
typeid = get_range_subtype(typeid);
continue; continue;
} }
remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
if (remapinfo[i] != NULL)
noop = false;
}
/* Walk composite types. Nothing else needs special handling. */ /* If no fields require remapping, report that by returning NULL. */
if (typ->typtype == TYPTYPE_COMPOSITE) if (noop)
innerResult = TQUEUE_REMAP_RECORD; {
ReleaseSysCache(tup); pfree(remapinfo);
break; remapinfo = NULL;
} }
if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE) return remapinfo;
return forceResult;
return innerResult;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment