Commit f5206262 authored by Tom Lane's avatar Tom Lane

Try to instill some sanity in plperl's function result processing.

Get rid of static variables for SETOF result, don't crash when called
from non-FROM context, eliminate dead code, etc.
parent 90f6f4b4
......@@ -33,7 +33,7 @@
* ENHANCEMENTS, OR MODIFICATIONS.
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.61 2004/11/21 22:13:37 tgl Exp $
* $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.62 2004/11/22 20:31:53 tgl Exp $
*
**********************************************************************/
......@@ -83,8 +83,8 @@ typedef struct plperl_proc_desc
bool lanpltrusted;
bool fn_retistuple; /* true, if function returns tuple */
bool fn_retisset; /* true, if function returns set */
Oid ret_oid; /* Oid of returning type */
FmgrInfo result_in_func;
Oid result_oid; /* Oid of result type */
FmgrInfo result_in_func; /* I/O function and arg for result type */
Oid result_typioparam;
int nargs;
FmgrInfo arg_out_func[FUNC_MAX_ARGS];
......@@ -101,9 +101,6 @@ static int plperl_firstcall = 1;
static bool plperl_safe_init_done = false;
static PerlInterpreter *plperl_interp = NULL;
static HV *plperl_proc_hash = NULL;
static AV *g_column_keys = NULL;
static SV *srf_perlret = NULL; /* keep returned value */
static int g_attr_num = 0;
/* this is saved and restored by plperl_call_handler */
static plperl_proc_desc *plperl_current_prodesc = NULL;
......@@ -163,27 +160,7 @@ plperl_init(void)
return;
/************************************************************
* Free the proc hash table
************************************************************/
if (plperl_proc_hash != NULL)
{
hv_undef(plperl_proc_hash);
SvREFCNT_dec((SV *) plperl_proc_hash);
plperl_proc_hash = NULL;
}
/************************************************************
* Destroy the existing Perl interpreter
************************************************************/
if (plperl_interp != NULL)
{
perl_destruct(plperl_interp);
perl_free(plperl_interp);
plperl_interp = NULL;
}
/************************************************************
* Now recreate a new Perl interpreter
* Create the Perl interpreter
************************************************************/
plperl_init_interp();
......@@ -217,8 +194,7 @@ plperl_init_all(void)
static void
plperl_init_interp(void)
{
char *embedding[3] = {
static char *embedding[3] = {
"", "-e",
/*
......@@ -238,7 +214,7 @@ plperl_init_interp(void)
perl_run(plperl_interp);
/************************************************************
* Initialize the proc and query hash tables
* Initialize the procedure hash table
************************************************************/
plperl_proc_hash = newHV();
}
......@@ -269,7 +245,6 @@ plperl_safe_init(void)
;
SV *res;
float safe_version;
res = eval_pv(safe_module, FALSE); /* TRUE = croak if failure */
......@@ -415,54 +390,6 @@ plperl_trigger_build_args(FunctionCallInfo fcinfo)
}
/**********************************************************************
* check return value from plperl function
**********************************************************************/
static int
plperl_is_set(SV *sv)
{
int i = 0;
int len = 0;
int set = 0;
int other = 0;
AV *input_av;
SV **val;
if (SvTYPE(sv) != SVt_RV)
return 0;
if (SvTYPE(SvRV(sv)) == SVt_PVHV)
return 0;
if (SvTYPE(SvRV(sv)) == SVt_PVAV)
{
input_av = (AV *) SvRV(sv);
len = av_len(input_av) + 1;
for (i = 0; i < len; i++)
{
val = av_fetch(input_av, i, FALSE);
if (SvTYPE(*val) == SVt_RV)
set = 1;
else
other = 1;
}
}
if (len == 0)
return 1;
if (set && !other)
return 1;
if (!set && other)
return 0;
if (set && other)
elog(ERROR, "plperl: check your return value structure");
if (!set && !other)
elog(ERROR, "plperl: check your return value structure");
return 0; /* for compiler */
}
/**********************************************************************
* extract a list of keys from a hash
**********************************************************************/
......@@ -505,7 +432,6 @@ plperl_get_key(AV *keys, int index)
* extract a value for a given key from a hash
*
* return NULL on error or if we got an undef
*
**********************************************************************/
static char *
plperl_get_elem(HV *hash, char *key)
......@@ -516,6 +442,28 @@ plperl_get_elem(HV *hash, char *key)
return SvTYPE(*svp) == SVt_NULL ? NULL : SvPV(*svp, PL_na);
}
/*
* Obtain tuple descriptor for a function returning tuple
*
* NB: copy the result if needed for any great length of time
*/
static TupleDesc
get_function_tupdesc(Oid result_type, ReturnSetInfo *rsinfo)
{
if (result_type == RECORDOID)
{
/* We must get the information from call context */
if (!rsinfo || !IsA(rsinfo, ReturnSetInfo) ||
rsinfo->expectedDesc == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("could not determine row description for function returning record")));
return rsinfo->expectedDesc;
}
else /* ordinary composite type */
return lookup_rowtype_tupdesc(result_type, -1);
}
/**********************************************************************
* set up the new tuple returned from a trigger
**********************************************************************/
......@@ -630,16 +578,10 @@ plperl_call_handler(PG_FUNCTION_ARGS)
PG_TRY();
{
/************************************************************
* Connect to SPI manager
************************************************************/
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI manager");
/************************************************************
/*
* Determine if called as function or trigger and
* call appropriate subhandler
************************************************************/
*/
if (CALLED_AS_TRIGGER(fcinfo))
retval = PointerGetDatum(plperl_trigger_handler(fcinfo));
else
......@@ -910,6 +852,10 @@ plperl_func_handler(PG_FUNCTION_ARGS)
SV *perlret;
Datum retval;
/* Connect to SPI manager */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI manager");
/* Find or compile the function */
prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false);
......@@ -920,19 +866,14 @@ plperl_func_handler(PG_FUNCTION_ARGS)
************************************************************/
if (!prodesc->fn_retisset)
perlret = plperl_call_perl_func(prodesc, fcinfo);
else if (SRF_IS_FIRSTCALL())
perlret = plperl_call_perl_func(prodesc, fcinfo);
else
{
if (SRF_IS_FIRSTCALL()) /* call function only once */
srf_perlret = plperl_call_perl_func(prodesc, fcinfo);
perlret = srf_perlret;
}
/* Get back the SV stashed on initial call */
FuncCallContext *funcctx = (FuncCallContext *) fcinfo->flinfo->fn_extra;
if (prodesc->fn_retisset && SRF_IS_FIRSTCALL())
{
if (prodesc->fn_retistuple)
g_column_keys = newAV();
if (SvTYPE(perlret) != SVt_RV)
elog(ERROR, "plperl: set-returning function must return reference");
perlret = (SV *) funcctx->user_fctx;
}
/************************************************************
......@@ -947,147 +888,78 @@ plperl_func_handler(PG_FUNCTION_ARGS)
if (!(perlret && SvOK(perlret) && SvTYPE(perlret) != SVt_NULL))
{
/* return NULL if Perl code returned undef */
fcinfo->isnull = true;
ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
if (perlret)
SvREFCNT_dec(perlret);
if (rsi && IsA(rsi, ReturnSetInfo))
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
if (prodesc->fn_retisset && !(perlret && SvTYPE(SvRV(perlret)) == SVt_PVAV))
if (prodesc->fn_retisset &&
(SvTYPE(perlret) != SVt_RV || SvTYPE(SvRV(perlret)) != SVt_PVAV))
elog(ERROR, "plperl: set-returning function must return reference to array");
if (prodesc->fn_retistuple && perlret && SvTYPE(perlret) != SVt_RV)
if (prodesc->fn_retistuple && SvTYPE(perlret) != SVt_RV)
elog(ERROR, "plperl: composite-returning function must return a reference");
if (prodesc->fn_retisset && !fcinfo->resultinfo)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (prodesc->fn_retistuple && fcinfo->resultinfo) /* set of tuples */
if (prodesc->fn_retisset && prodesc->fn_retistuple)
{
/*
* This branch will be taken when the function call
* appears in a context that can return a set of tuples,
* even if it only actually returns a single tuple
* (e.g. select a from foo() where foo returns a singleton
* of some composite type with member a). In this case, the
* return value will be a hashref. If a rowset is returned
* it will be an arrayref whose members will be hashrefs.
*
* Care is taken in the code only to refer to the appropriate
* one of ret_hv and ret_av, only one of which is therefore
* valid for any given call.
*
* XXX This code is in dire need of cleanup.
*/
/* SRF support */
HV *ret_hv = NULL;
AV *ret_av = NULL;
/* set of tuples */
AV *ret_av = (AV *) SvRV(perlret);
FuncCallContext *funcctx;
int call_cntr;
int max_calls;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
bool isset;
char **values = NULL;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
isset = plperl_is_set(perlret);
if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
ret_hv = (HV *) SvRV(perlret);
else
ret_av = (AV *) SvRV(perlret);
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
int i;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
{
if (isset)
funcctx->max_calls = hv_iterinit(ret_hv);
else
funcctx->max_calls = 1;
}
else
{
if (isset)
funcctx->max_calls = av_len(ret_av) + 1;
else
funcctx->max_calls = 1;
}
tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
g_attr_num = tupdesc->natts;
funcctx->user_fctx = (void *) perlret;
for (i = 0; i < tupdesc->natts; i++)
av_store(g_column_keys, i + 1,
newSVpv(SPI_fname(tupdesc, i+1), 0));
funcctx->max_calls = av_len(ret_av) + 1;
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
/* Cache a copy of the result's tupdesc and attinmeta */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
tupdesc = get_function_tupdesc(prodesc->result_oid,
(ReturnSetInfo *) fcinfo->resultinfo);
tupdesc = CreateTupleDescCopy(tupdesc);
funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
attinmeta = funcctx->attinmeta;
tupdesc = attinmeta->tupdesc;
if (call_cntr < max_calls)
if (funcctx->call_cntr < funcctx->max_calls)
{
SV **svp;
HV *row_hv;
char **values;
HeapTuple tuple;
Datum result;
int i;
char *column_key;
char *elem;
if (isset)
{
HV *row_hv;
SV **svp;
svp = av_fetch(ret_av, call_cntr, FALSE);
row_hv = (HV *) SvRV(*svp);
svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
values = (char **) palloc(g_attr_num * sizeof(char *));
if (SvTYPE(*svp) != SVt_RV)
elog(ERROR, "plperl: check your return value structure");
row_hv = (HV *) SvRV(*svp);
for (i = 0; i < g_attr_num; i++)
{
column_key = plperl_get_key(g_column_keys, i + 1);
elem = plperl_get_elem(row_hv, column_key);
if (elem)
values[i] = elem;
else
values[i] = NULL;
}
}
else
values = (char **) palloc(tupdesc->natts * sizeof(char *));
for (i = 0; i < tupdesc->natts; i++)
{
int i;
char *column_key;
values = (char **) palloc(g_attr_num * sizeof(char *));
for (i = 0; i < g_attr_num; i++)
{
column_key = SPI_fname(tupdesc, i + 1);
elem = plperl_get_elem(ret_hv, column_key);
if (elem)
values[i] = elem;
else
values[i] = NULL;
}
column_key = SPI_fname(tupdesc, i + 1);
values[i] = plperl_get_elem(row_hv, column_key);
}
tuple = BuildTupleFromCStrings(attinmeta, values);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
retval = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, retval);
}
else
{
......@@ -1095,95 +967,91 @@ plperl_func_handler(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
}
else if (prodesc->fn_retisset) /* set of non-tuples */
else if (prodesc->fn_retisset)
{
/* set of non-tuples */
AV *ret_av = (AV *) SvRV(perlret);
FuncCallContext *funcctx;
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
funcctx->max_calls = av_len((AV *) SvRV(perlret)) + 1;
funcctx->user_fctx = (void *) perlret;
funcctx->max_calls = av_len(ret_av) + 1;
}
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls)
{
Datum result;
AV *array;
SV **svp;
array = (AV *) SvRV(perlret);
svp = av_fetch(array, funcctx->call_cntr, FALSE);
svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
if (SvTYPE(*svp) != SVt_NULL)
{
char *val = SvPV(*svp, PL_na);
fcinfo->isnull = false;
result = FunctionCall3(&prodesc->result_in_func,
PointerGetDatum(SvPV(*svp, PL_na)),
retval = FunctionCall3(&prodesc->result_in_func,
PointerGetDatum(val),
ObjectIdGetDatum(prodesc->result_typioparam),
Int32GetDatum(-1));
}
else
{
fcinfo->isnull = true;
result = (Datum) 0;
retval = (Datum) 0;
}
SRF_RETURN_NEXT(funcctx, result);
SRF_RETURN_NEXT(funcctx, retval);
}
else
{
if (perlret)
SvREFCNT_dec(perlret);
SvREFCNT_dec(perlret);
SRF_RETURN_DONE(funcctx);
}
}
else if (!fcinfo->isnull) /* non-null singleton */
else if (prodesc->fn_retistuple)
{
if (prodesc->fn_retistuple) /* singleton perl hash to Datum */
/* singleton perl hash to Datum */
HV *perlhash = (HV *) SvRV(perlret);
TupleDesc td;
int i;
char **values;
AttInMetadata *attinmeta;
HeapTuple tup;
/*
* XXX should cache the attinmetadata instead of recomputing
*/
td = get_function_tupdesc(prodesc->result_oid,
(ReturnSetInfo *) fcinfo->resultinfo);
/* td = CreateTupleDescCopy(td); */
attinmeta = TupleDescGetAttInMetadata(td);
values = (char **) palloc(td->natts * sizeof(char *));
for (i = 0; i < td->natts; i++)
{
TupleDesc td = lookup_rowtype_tupdesc(prodesc->ret_oid, (int32) -1);
HV *perlhash = (HV *) SvRV(perlret);
int i;
char **values;
char *key,
*val;
AttInMetadata *attinmeta;
HeapTuple tup;
if (!td)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("no TupleDesc info available")));
values = (char **) palloc(td->natts * sizeof(char *));
for (i = 0; i < td->natts; i++)
{
char *key;
key = SPI_fname(td, i + 1);
val = plperl_get_elem(perlhash, key);
if (val)
values[i] = val;
else
values[i] = NULL;
}
attinmeta = TupleDescGetAttInMetadata(td);
tup = BuildTupleFromCStrings(attinmeta, values);
retval = HeapTupleGetDatum(tup);
key = SPI_fname(td, i + 1);
values[i] = plperl_get_elem(perlhash, key);
}
else
/* perl string to Datum */
retval = FunctionCall3(&prodesc->result_in_func,
PointerGetDatum(SvPV(perlret, PL_na)),
ObjectIdGetDatum(prodesc->result_typioparam),
Int32GetDatum(-1));
tup = BuildTupleFromCStrings(attinmeta, values);
retval = HeapTupleGetDatum(tup);
}
else
{
/* perl string to Datum */
char *val = SvPV(perlret, PL_na);
retval = FunctionCall3(&prodesc->result_in_func,
CStringGetDatum(val),
ObjectIdGetDatum(prodesc->result_typioparam),
Int32GetDatum(-1));
}
else /* null singleton */
retval = (Datum) 0;
SvREFCNT_dec(perlret);
return retval;
......@@ -1202,6 +1070,10 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
SV *svTD;
HV *hvTD;
/* Connect to SPI manager */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI manager");
/* Find or compile the function */
prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
......@@ -1248,7 +1120,6 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
{
if (!fcinfo->isnull)
{
HeapTuple trv;
if (strcasecmp(tmp, "SKIP") == 0)
......@@ -1441,17 +1312,10 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
}
}
prodesc->fn_retisset = procStruct->proretset; /* true, if function
* returns set */
if (typeStruct->typtype == 'c' || procStruct->prorettype == RECORDOID)
{
prodesc->fn_retistuple = true;
prodesc->ret_oid =
procStruct->prorettype == RECORDOID ?
typeStruct->typrelid :
procStruct->prorettype;
}
prodesc->result_oid = procStruct->prorettype;
prodesc->fn_retisset = procStruct->proretset;
prodesc->fn_retistuple = (typeStruct->typtype == 'c' ||
procStruct->prorettype == RECORDOID);
perm_fmgr_info(typeStruct->typinput, &(prodesc->result_in_func));
prodesc->result_typioparam = getTypeIOParam(typeTup);
......@@ -1509,7 +1373,6 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
* create the text of the anonymous subroutine.
* we do not use a named subroutine so that we can call directly
* through the reference.
*
************************************************************/
prosrcdatum = SysCacheGetAttr(PROCOID, procTup,
Anum_pg_proc_prosrc, &isnull);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment