Commit cf407f16 authored by Tom Lane's avatar Tom Lane

Refactor crosstab() to build and return a tuplestore instead of using

value-per-call mode.  This should be more efficient in normal usage,
but the real problem with the prior coding was that it returned with
a SPI call still active.  That could cause problems if execution was
interleaved with anything else that might use SPI.
parent 76cc2fe6
/* /*
* $PostgreSQL: pgsql/contrib/tablefunc/tablefunc.c,v 1.56 2008/11/30 23:23:52 tgl Exp $ * $PostgreSQL: pgsql/contrib/tablefunc/tablefunc.c,v 1.57 2008/12/01 01:30:18 tgl Exp $
* *
* *
* tablefunc * tablefunc
...@@ -94,12 +94,6 @@ typedef struct ...@@ -94,12 +94,6 @@ typedef struct
bool use_carry; /* use second generated value */ bool use_carry; /* use second generated value */
} normal_rand_fctx; } normal_rand_fctx;
typedef struct
{
SPITupleTable *spi_tuptable; /* sql results from user query */
char *lastrowid; /* rowid of the last tuple sent */
} crosstab_fctx;
#define xpfree(var_) \ #define xpfree(var_) \
do { \ do { \
if (var_ != NULL) \ if (var_ != NULL) \
...@@ -356,304 +350,254 @@ PG_FUNCTION_INFO_V1(crosstab); ...@@ -356,304 +350,254 @@ PG_FUNCTION_INFO_V1(crosstab);
Datum Datum
crosstab(PG_FUNCTION_ARGS) crosstab(PG_FUNCTION_ARGS)
{ {
FuncCallContext *funcctx; char *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
TupleDesc ret_tupdesc; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
Tuplestorestate *tupstore;
TupleDesc tupdesc;
int call_cntr; int call_cntr;
int max_calls; int max_calls;
AttInMetadata *attinmeta; AttInMetadata *attinmeta;
SPITupleTable *spi_tuptable = NULL; SPITupleTable *spi_tuptable;
TupleDesc spi_tupdesc; TupleDesc spi_tupdesc;
char *lastrowid = NULL; bool firstpass;
crosstab_fctx *fctx; char *lastrowid;
int i; int i;
int num_categories; int num_categories;
bool firstpass = false; MemoryContext per_query_ctx;
MemoryContext oldcontext; MemoryContext oldcontext;
int ret;
int proc;
/* stuff done only on the first call of the function */ /* check to see if caller supports us returning a tuplestore */
if (SRF_IS_FIRSTCALL()) if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
{ ereport(ERROR,
char *sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
TupleDesc tupdesc; errmsg("set-valued function called in context that cannot accept a set")));
int ret; if (!(rsinfo->allowedModes & SFRM_Materialize))
int proc; ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
/* create a function context for cross-call persistence */ errmsg("materialize mode required, but it is not " \
funcctx = SRF_FIRSTCALL_INIT(); "allowed in this context")));
/* Connect to SPI manager */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
if ((ret = SPI_connect()) < 0)
/* internal error */
elog(ERROR, "crosstab: SPI_connect returned %d", ret);
/* Retrieve the desired rows */ /* Connect to SPI manager */
ret = SPI_execute(sql, true, 0); if ((ret = SPI_connect()) < 0)
proc = SPI_processed; /* internal error */
elog(ERROR, "crosstab: SPI_connect returned %d", ret);
/* Check for qualifying tuples */ /* Retrieve the desired rows */
if ((ret == SPI_OK_SELECT) && (proc > 0)) ret = SPI_execute(sql, true, 0);
{ proc = SPI_processed;
spi_tuptable = SPI_tuptable;
spi_tupdesc = spi_tuptable->tupdesc;
/*----------
* The provided SQL query must always return three columns.
*
* 1. rowname
* the label or identifier for each row in the final result
* 2. category
* the label or identifier for each column in the final result
* 3. values
* the value for each column in the final result
*----------
*/
if (spi_tupdesc->natts != 3)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid source data SQL statement"),
errdetail("The provided SQL must return 3 "
"columns: rowid, category, and values.")));
}
else
{
/* no qualifying tuples */
SPI_finish();
SRF_RETURN_DONE(funcctx);
}
/* get a tuple descriptor for our result type */ /* If no qualifying tuples, fall out early */
switch (get_call_result_type(fcinfo, NULL, &tupdesc)) if (ret != SPI_OK_SELECT || proc <= 0)
{ {
case TYPEFUNC_COMPOSITE: SPI_finish();
/* success */ rsinfo->isDone = ExprEndResult;
break; PG_RETURN_NULL();
case TYPEFUNC_RECORD: }
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
/* spi_tuptable = SPI_tuptable;
* Check that return tupdesc is compatible with the data we got from spi_tupdesc = spi_tuptable->tupdesc;
* SPI, at least based on number and type of attributes
*/
if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("return and sql tuple descriptions are " \
"incompatible")));
/* /*----------
* switch to memory context appropriate for multiple function calls * The provided SQL query must always return three columns.
*/ *
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); * 1. rowname
* the label or identifier for each row in the final result
* 2. category
* the label or identifier for each column in the final result
* 3. values
* the value for each column in the final result
*----------
*/
if (spi_tupdesc->natts != 3)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid source data SQL statement"),
errdetail("The provided SQL must return 3 "
"columns: rowid, category, and values.")));
/* make sure we have a persistent copy of the tupdesc */ /* get a tuple descriptor for our result type */
tupdesc = CreateTupleDescCopy(tupdesc); switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* success */
break;
case TYPEFUNC_RECORD:
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
/* /*
* Generate attribute metadata needed later to produce tuples from raw * Check that return tupdesc is compatible with the data we got from
* C strings * SPI, at least based on number and type of attributes
*/ */
attinmeta = TupleDescGetAttInMetadata(tupdesc); if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
funcctx->attinmeta = attinmeta; ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("return and sql tuple descriptions are " \
"incompatible")));
/* allocate memory for user context */ /*
fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx)); * switch to long-lived memory context
*/
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* /* make sure we have a persistent copy of the result tupdesc */
* Save spi data for use across calls tupdesc = CreateTupleDescCopy(tupdesc);
*/
fctx->spi_tuptable = spi_tuptable;
fctx->lastrowid = NULL;
funcctx->user_fctx = fctx;
/* total number of tuples to be returned */ /* initialize our tuplestore in long-lived context */
funcctx->max_calls = proc; tupstore =
tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
false, work_mem);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
firstpass = true;
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
/* /*
* initialize per-call variables * Generate attribute metadata needed later to produce tuples from raw
* C strings
*/ */
call_cntr = funcctx->call_cntr; attinmeta = TupleDescGetAttInMetadata(tupdesc);
max_calls = funcctx->max_calls;
/* user context info */
fctx = (crosstab_fctx *) funcctx->user_fctx;
lastrowid = fctx->lastrowid;
spi_tuptable = fctx->spi_tuptable;
/* the sql tuple */
spi_tupdesc = spi_tuptable->tupdesc;
/* attribute return type and return tuple description */ /* total number of tuples to be examined */
attinmeta = funcctx->attinmeta; max_calls = proc;
ret_tupdesc = attinmeta->tupdesc;
/* the return tuple always must have 1 rowid + num_categories columns */ /* the return tuple always must have 1 rowid + num_categories columns */
num_categories = ret_tupdesc->natts - 1; num_categories = tupdesc->natts - 1;
if (call_cntr < max_calls) /* do when there is more left to send */ firstpass = true;
lastrowid = NULL;
for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
{ {
HeapTuple tuple;
Datum result;
char **values;
bool skip_tuple = false; bool skip_tuple = false;
char **values;
/* allocate and zero space */
values = (char **) palloc0((1 + num_categories) * sizeof(char *));
while (true) /*
* now loop through the sql results and assign each value in
* sequence to the next category
*/
for (i = 0; i < num_categories; i++)
{ {
/* allocate space */ HeapTuple spi_tuple;
values = (char **) palloc((1 + num_categories) * sizeof(char *)); char *rowid;
/* see if we've gone too far already */
if (call_cntr >= max_calls)
break;
/* and make sure it's clear */ /* get the next sql result tuple */
memset(values, '\0', (1 + num_categories) * sizeof(char *)); spi_tuple = spi_tuptable->vals[call_cntr];
/* get the rowid from the current sql result tuple */
rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
/* /*
* now loop through the sql results and assign each value in * If this is the first pass through the values for this
* sequence to the next category * rowid, set the first column to rowid
*/ */
for (i = 0; i < num_categories; i++) if (i == 0)
{ {
HeapTuple spi_tuple; xpstrdup(values[0], rowid);
char *rowid = NULL;
/* see if we've gone too far already */
if (call_cntr >= max_calls)
break;
/* get the next sql result tuple */
spi_tuple = spi_tuptable->vals[call_cntr];
/* get the rowid from the current sql result tuple */
rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
/*
* If this is the first pass through the values for this
* rowid, set the first column to rowid
*/
if (i == 0)
{
xpstrdup(values[0], rowid);
/*
* Check to see if the rowid is the same as that of the
* last tuple sent -- if so, skip this tuple entirely
*/
if (!firstpass && xstreq(lastrowid, rowid))
{
skip_tuple = true;
break;
}
}
/* /*
* If rowid hasn't changed on us, continue building the ouput * Check to see if the rowid is the same as that of the
* tuple. * last tuple sent -- if so, skip this tuple entirely
*/ */
if (xstreq(rowid, values[0])) if (!firstpass && xstreq(lastrowid, rowid))
{
/*
* Get the next category item value, which is always
* attribute number three.
*
* Be careful to assign the value to the array index based
* on which category we are presently processing.
*/
values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
/*
* increment the counter since we consume a row for each
* category, but not for last pass because the API will do
* that for us
*/
if (i < (num_categories - 1))
call_cntr = ++funcctx->call_cntr;
}
else
{ {
/* xpfree(rowid);
* We'll fill in NULLs for the missing values, but we need skip_tuple = true;
* to decrement the counter since this sql result row
* doesn't belong to the current output tuple.
*/
call_cntr = --funcctx->call_cntr;
break; break;
} }
xpfree(rowid);
} }
/* /*
* switch to memory context appropriate for multiple function * If rowid hasn't changed on us, continue building the output
* calls * tuple.
*/ */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); if (xstreq(rowid, values[0]))
xpfree(fctx->lastrowid);
xpstrdup(fctx->lastrowid, values[0]);
lastrowid = fctx->lastrowid;
MemoryContextSwitchTo(oldcontext);
if (!skip_tuple)
{ {
/* build the tuple */ /*
tuple = BuildTupleFromCStrings(attinmeta, values); * Get the next category item value, which is always
* attribute number three.
/* make the tuple into a datum */ *
result = HeapTupleGetDatum(tuple); * Be careful to assign the value to the array index based
* on which category we are presently processing.
/* Clean up */ */
for (i = 0; i < num_categories + 1; i++) values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
if (values[i] != NULL)
xpfree(values[i]);
xpfree(values);
SRF_RETURN_NEXT(funcctx, result); /*
* increment the counter since we consume a row for each
* category, but not for last pass because the outer loop
* will do that for us
*/
if (i < (num_categories - 1))
call_cntr++;
xpfree(rowid);
} }
else else
{ {
/* /*
* Skipping this tuple entirely, but we need to advance the * We'll fill in NULLs for the missing values, but we need
* counter like the API would if we had returned one. * to decrement the counter since this sql result row
* doesn't belong to the current output tuple.
*/ */
call_cntr = ++funcctx->call_cntr; call_cntr--;
xpfree(rowid);
break;
}
}
/* we'll start over at the top */ if (!skip_tuple)
xpfree(values); {
HeapTuple tuple;
/* see if we've gone too far already */ /* build the tuple */
if (call_cntr >= max_calls) tuple = BuildTupleFromCStrings(attinmeta, values);
{
/* release SPI related resources */
SPI_finish();
SRF_RETURN_DONE(funcctx);
}
/* need to reset this before the next tuple is started */ /* switch to appropriate context while storing the tuple */
skip_tuple = false; oldcontext = MemoryContextSwitchTo(per_query_ctx);
} tuplestore_puttuple(tupstore, tuple);
MemoryContextSwitchTo(oldcontext);
heap_freetuple(tuple);
} }
/* Remember current rowid */
xpfree(lastrowid);
xpstrdup(lastrowid, values[0]);
firstpass = false;
/* Clean up */
for (i = 0; i < num_categories + 1; i++)
if (values[i] != NULL)
pfree(values[i]);
pfree(values);
} }
else
/* do when there is no more left */ /* let the caller know we're sending back a tuplestore */
{ rsinfo->returnMode = SFRM_Materialize;
/* release SPI related resources */ rsinfo->setResult = tupstore;
SPI_finish(); rsinfo->setDesc = tupdesc;
SRF_RETURN_DONE(funcctx);
} /* release SPI related resources (and return to caller's context) */
SPI_finish();
return (Datum) 0;
} }
/* /*
...@@ -1613,6 +1557,10 @@ compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc) ...@@ -1613,6 +1557,10 @@ compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
Form_pg_attribute sql_attr; Form_pg_attribute sql_attr;
Oid sql_atttypid; Oid sql_atttypid;
if (ret_tupdesc->natts < 2 ||
sql_tupdesc->natts < 3)
return false;
/* check the rowid types match */ /* check the rowid types match */
ret_atttypid = ret_tupdesc->attrs[0]->atttypid; ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
sql_atttypid = sql_tupdesc->attrs[0]->atttypid; sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment