Commit 2848dc5f authored by Tom Lane's avatar Tom Lane

Make the world safe (more or less) for dropped columns in plpgsql rowtypes.

parent a039148c
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.145 2003/09/25 06:57:59 petere Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.146 2003/09/25 23:02:11 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -315,23 +315,6 @@ ExecEvalAggref(AggrefExprState *aggref, ExprContext *econtext, bool *isNull) ...@@ -315,23 +315,6 @@ ExecEvalAggref(AggrefExprState *aggref, ExprContext *econtext, bool *isNull)
* *
* Returns a Datum whose value is the value of a range * Returns a Datum whose value is the value of a range
* variable with respect to given expression context. * variable with respect to given expression context.
*
*
* As an entry condition, we expect that the datatype the
* plan expects to get (as told by our "variable" argument) is in
* fact the datatype of the attribute the plan says to fetch (as
* seen in the current context, identified by our "econtext"
* argument).
*
* If we fetch a Type A attribute and Caller treats it as if it
* were Type B, there will be undefined results (e.g. crash).
* One way these might mismatch now is that we're accessing a
* catalog class and the type information in the pg_attribute
* class does not match the hardcoded pg_attribute information
* (in pg_attribute.h) for the class in question.
*
* We have an Assert to make sure this entry condition is met.
*
* ---------------------------------------------------------------- */ * ---------------------------------------------------------------- */
static Datum static Datum
ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull) ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull)
...@@ -369,11 +352,40 @@ ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull) ...@@ -369,11 +352,40 @@ ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull)
attnum = variable->varattno; attnum = variable->varattno;
/* (See prolog for explanation of this Assert) */ /*
Assert(attnum <= 0 || * Some checks that are only applied for user attribute numbers
(attnum - 1 <= tuple_type->natts - 1 && * (bogus system attnums will be caught inside heap_getattr).
tuple_type->attrs[attnum - 1] != NULL && */
variable->vartype == tuple_type->attrs[attnum - 1]->atttypid)); if (attnum > 0)
{
/*
* This assert checks that the attnum is valid.
*/
Assert(attnum <= tuple_type->natts &&
tuple_type->attrs[attnum - 1] != NULL);
/*
* If the attribute's column has been dropped, we force a NULL result.
* This case should not happen in normal use, but it could happen if
* we are executing a plan cached before the column was dropped.
*/
if (tuple_type->attrs[attnum - 1]->attisdropped)
{
*isNull = true;
return (Datum) 0;
}
/*
* This assert checks that the datatype the plan expects to get (as
* told by our "variable" argument) is in fact the datatype of the
* attribute being fetched (as seen in the current context, identified
* by our "econtext" argument). Otherwise crashes are likely.
*
* Note that we can't check dropped columns, since their atttypid
* has been zeroed.
*/
Assert(variable->vartype == tuple_type->attrs[attnum - 1]->atttypid);
}
/* /*
* If the attribute number is invalid, then we are supposed to return * If the attribute number is invalid, then we are supposed to return
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeFunctionscan.c,v 1.21 2003/09/25 06:57:59 petere Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeFunctionscan.c,v 1.22 2003/09/25 23:02:12 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
static TupleTableSlot *FunctionNext(FunctionScanState *node); static TupleTableSlot *FunctionNext(FunctionScanState *node);
static bool tupledesc_mismatch(TupleDesc tupdesc1, TupleDesc tupdesc2); static bool tupledesc_match(TupleDesc dst_tupdesc, TupleDesc src_tupdesc);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* Scan Support * Scan Support
...@@ -86,8 +86,7 @@ FunctionNext(FunctionScanState *node) ...@@ -86,8 +86,7 @@ FunctionNext(FunctionScanState *node)
* need to do this for functions returning RECORD, but might as * need to do this for functions returning RECORD, but might as
* well do it always. * well do it always.
*/ */
if (funcTupdesc && if (funcTupdesc && !tupledesc_match(node->tupdesc, funcTupdesc))
tupledesc_mismatch(node->tupdesc, funcTupdesc))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("query-specified return row and actual function return row do not match"))); errmsg("query-specified return row and actual function return row do not match")));
...@@ -364,26 +363,36 @@ ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt) ...@@ -364,26 +363,36 @@ ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt)
tuplestore_rescan(node->tuplestorestate); tuplestore_rescan(node->tuplestorestate);
} }
/*
* Check that function result tuple type (src_tupdesc) matches or can be
* considered to match what the query expects (dst_tupdesc).
*
* We really only care about number of attributes and data type.
* Also, we can ignore type mismatch on columns that are dropped in the
* destination type, so long as the physical storage matches. This is
* helpful in some cases involving out-of-date cached plans.
*/
static bool static bool
tupledesc_mismatch(TupleDesc tupdesc1, TupleDesc tupdesc2) tupledesc_match(TupleDesc dst_tupdesc, TupleDesc src_tupdesc)
{ {
int i; int i;
if (tupdesc1->natts != tupdesc2->natts) if (dst_tupdesc->natts != src_tupdesc->natts)
return true; return false;
for (i = 0; i < tupdesc1->natts; i++) for (i = 0; i < dst_tupdesc->natts; i++)
{ {
Form_pg_attribute attr1 = tupdesc1->attrs[i]; Form_pg_attribute dattr = dst_tupdesc->attrs[i];
Form_pg_attribute attr2 = tupdesc2->attrs[i]; Form_pg_attribute sattr = src_tupdesc->attrs[i];
/* if (dattr->atttypid == sattr->atttypid)
* We really only care about number of attributes and data type continue; /* no worries */
*/ if (!dattr->attisdropped)
if (attr1->atttypid != attr2->atttypid) return false;
return true; if (dattr->attlen != sattr->attlen ||
dattr->attalign != sattr->attalign)
return false;
} }
return false; return true;
} }
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
* procedural language * procedural language
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/pl/plpgsql/src/gram.y,v 1.46 2003/07/27 21:49:54 tgl Exp $ * $Header: /cvsroot/pgsql/src/pl/plpgsql/src/gram.y,v 1.47 2003/09/25 23:02:12 tgl Exp $
* *
* This software is copyrighted by Jan Wieck - Hamburg. * This software is copyrighted by Jan Wieck - Hamburg.
* *
...@@ -47,7 +47,6 @@ static PLpgSQL_expr *read_sql_stmt(const char *sqlstart); ...@@ -47,7 +47,6 @@ static PLpgSQL_expr *read_sql_stmt(const char *sqlstart);
static PLpgSQL_type *read_datatype(int tok); static PLpgSQL_type *read_datatype(int tok);
static PLpgSQL_stmt *make_select_stmt(void); static PLpgSQL_stmt *make_select_stmt(void);
static PLpgSQL_stmt *make_fetch_stmt(void); static PLpgSQL_stmt *make_fetch_stmt(void);
static PLpgSQL_expr *make_tupret_expr(PLpgSQL_row *row);
static void check_assignable(PLpgSQL_datum *datum); static void check_assignable(PLpgSQL_datum *datum);
%} %}
...@@ -493,7 +492,7 @@ decl_cursor_arglist : decl_cursor_arg ...@@ -493,7 +492,7 @@ decl_cursor_arglist : decl_cursor_arg
new->dtype = PLPGSQL_DTYPE_ROW; new->dtype = PLPGSQL_DTYPE_ROW;
new->refname = strdup("*internal*"); new->refname = strdup("*internal*");
new->lineno = plpgsql_scanner_lineno(); new->lineno = plpgsql_scanner_lineno();
new->rowtypeclass = InvalidOid; new->rowtupdesc = NULL;
/* /*
* We make temporary fieldnames/varnos arrays that * We make temporary fieldnames/varnos arrays that
* are much bigger than necessary. We will resize * are much bigger than necessary. We will resize
...@@ -1176,24 +1175,24 @@ stmt_return : K_RETURN lno ...@@ -1176,24 +1175,24 @@ stmt_return : K_RETURN lno
new = malloc(sizeof(PLpgSQL_stmt_return)); new = malloc(sizeof(PLpgSQL_stmt_return));
memset(new, 0, sizeof(PLpgSQL_stmt_return)); memset(new, 0, sizeof(PLpgSQL_stmt_return));
new->expr = NULL;
new->retrecno = -1;
new->retrowno = -1;
if (plpgsql_curr_compile->fn_retistuple && if (plpgsql_curr_compile->fn_retistuple &&
!plpgsql_curr_compile->fn_retset) !plpgsql_curr_compile->fn_retset)
{ {
new->retrecno = -1;
switch (yylex()) switch (yylex())
{ {
case K_NULL: case K_NULL:
new->expr = NULL;
break; break;
case T_ROW: case T_ROW:
new->expr = make_tupret_expr(yylval.row); new->retrowno = yylval.row->rowno;
break; break;
case T_RECORD: case T_RECORD:
new->retrecno = yylval.rec->recno; new->retrecno = yylval.rec->recno;
new->expr = NULL;
break; break;
default: default:
...@@ -1874,7 +1873,7 @@ make_select_stmt(void) ...@@ -1874,7 +1873,7 @@ make_select_stmt(void)
row->dtype = PLPGSQL_DTYPE_ROW; row->dtype = PLPGSQL_DTYPE_ROW;
row->refname = strdup("*internal*"); row->refname = strdup("*internal*");
row->lineno = plpgsql_scanner_lineno(); row->lineno = plpgsql_scanner_lineno();
row->rowtypeclass = InvalidOid; row->rowtupdesc = NULL;
row->nfields = nfields; row->nfields = nfields;
row->fieldnames = malloc(sizeof(char *) * nfields); row->fieldnames = malloc(sizeof(char *) * nfields);
row->varnos = malloc(sizeof(int) * nfields); row->varnos = malloc(sizeof(int) * nfields);
...@@ -2007,7 +2006,7 @@ make_fetch_stmt(void) ...@@ -2007,7 +2006,7 @@ make_fetch_stmt(void)
row->dtype = PLPGSQL_DTYPE_ROW; row->dtype = PLPGSQL_DTYPE_ROW;
row->refname = strdup("*internal*"); row->refname = strdup("*internal*");
row->lineno = plpgsql_scanner_lineno(); row->lineno = plpgsql_scanner_lineno();
row->rowtypeclass = InvalidOid; row->rowtupdesc = NULL;
row->nfields = nfields; row->nfields = nfields;
row->fieldnames = malloc(sizeof(char *) * nfields); row->fieldnames = malloc(sizeof(char *) * nfields);
row->varnos = malloc(sizeof(int) * nfields); row->varnos = malloc(sizeof(int) * nfields);
...@@ -2041,36 +2040,6 @@ make_fetch_stmt(void) ...@@ -2041,36 +2040,6 @@ make_fetch_stmt(void)
} }
static PLpgSQL_expr *
make_tupret_expr(PLpgSQL_row *row)
{
PLpgSQL_dstring ds;
PLpgSQL_expr *expr;
int i;
char buf[16];
expr = malloc(sizeof(PLpgSQL_expr) + sizeof(int) * (row->nfields - 1));
expr->dtype = PLPGSQL_DTYPE_EXPR;
plpgsql_dstring_init(&ds);
plpgsql_dstring_append(&ds, "SELECT ");
for (i = 0; i < row->nfields; i++)
{
sprintf(buf, "%s$%d", (i > 0) ? "," : "", i + 1);
plpgsql_dstring_append(&ds, buf);
expr->params[i] = row->varnos[i];
}
expr->query = strdup(plpgsql_dstring_get(&ds));
expr->plan = NULL;
expr->plan_argtypes = NULL;
expr->nparams = row->nfields;
plpgsql_dstring_free(&ds);
return expr;
}
static void static void
check_assignable(PLpgSQL_datum *datum) check_assignable(PLpgSQL_datum *datum)
{ {
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* procedural language * procedural language
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_comp.c,v 1.67 2003/08/18 19:16:02 tgl Exp $ * $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_comp.c,v 1.68 2003/09/25 23:02:12 tgl Exp $
* *
* This software is copyrighted by Jan Wieck - Hamburg. * This software is copyrighted by Jan Wieck - Hamburg.
* *
...@@ -899,7 +899,8 @@ plpgsql_parse_dblword(char *word) ...@@ -899,7 +899,8 @@ plpgsql_parse_dblword(char *word)
row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]); row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
for (i = 0; i < row->nfields; i++) for (i = 0; i < row->nfields; i++)
{ {
if (strcmp(row->fieldnames[i], cp[1]) == 0) if (row->fieldnames[i] &&
strcmp(row->fieldnames[i], cp[1]) == 0)
{ {
plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]); plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]);
pfree(cp[0]); pfree(cp[0]);
...@@ -1005,7 +1006,8 @@ plpgsql_parse_tripword(char *word) ...@@ -1005,7 +1006,8 @@ plpgsql_parse_tripword(char *word)
row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]); row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
for (i = 0; i < row->nfields; i++) for (i = 0; i < row->nfields; i++)
{ {
if (strcmp(row->fieldnames[i], cp[2]) == 0) if (row->fieldnames[i] &&
strcmp(row->fieldnames[i], cp[2]) == 0)
{ {
plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]); plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]);
pfree(cp[0]); pfree(cp[0]);
...@@ -1396,6 +1398,8 @@ plpgsql_parse_wordrowtype(char *word) ...@@ -1396,6 +1398,8 @@ plpgsql_parse_wordrowtype(char *word)
*/ */
plpgsql_yylval.row = plpgsql_build_rowtype(classOid); plpgsql_yylval.row = plpgsql_build_rowtype(classOid);
plpgsql_adddatum((PLpgSQL_datum *) plpgsql_yylval.row);
pfree(cp[0]); pfree(cp[0]);
pfree(cp[1]); pfree(cp[1]);
...@@ -1439,6 +1443,8 @@ plpgsql_parse_dblwordrowtype(char *word) ...@@ -1439,6 +1443,8 @@ plpgsql_parse_dblwordrowtype(char *word)
*/ */
plpgsql_yylval.row = plpgsql_build_rowtype(classOid); plpgsql_yylval.row = plpgsql_build_rowtype(classOid);
plpgsql_adddatum((PLpgSQL_datum *) plpgsql_yylval.row);
pfree(cp); pfree(cp);
return T_ROW; return T_ROW;
...@@ -1451,23 +1457,20 @@ PLpgSQL_row * ...@@ -1451,23 +1457,20 @@ PLpgSQL_row *
plpgsql_build_rowtype(Oid classOid) plpgsql_build_rowtype(Oid classOid)
{ {
PLpgSQL_row *row; PLpgSQL_row *row;
HeapTuple classtup; Relation rel;
Form_pg_class classStruct; Form_pg_class classStruct;
const char *relname; const char *relname;
int i; int i;
MemoryContext oldcxt;
/* /*
* Fetch the pg_class tuple. * Open the relation to get info.
*/ */
classtup = SearchSysCache(RELOID, rel = heap_open(classOid, AccessShareLock);
ObjectIdGetDatum(classOid), classStruct = RelationGetForm(rel);
0, 0, 0); relname = RelationGetRelationName(rel);
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for relation %u", classOid);
classStruct = (Form_pg_class) GETSTRUCT(classtup);
relname = NameStr(classStruct->relname);
/* accept relation, sequence, view, or type pg_class entries */ /* accept relation, sequence, view, or composite type entries */
if (classStruct->relkind != RELKIND_RELATION && if (classStruct->relkind != RELKIND_RELATION &&
classStruct->relkind != RELKIND_SEQUENCE && classStruct->relkind != RELKIND_SEQUENCE &&
classStruct->relkind != RELKIND_VIEW && classStruct->relkind != RELKIND_VIEW &&
...@@ -1484,30 +1487,34 @@ plpgsql_build_rowtype(Oid classOid) ...@@ -1484,30 +1487,34 @@ plpgsql_build_rowtype(Oid classOid)
memset(row, 0, sizeof(PLpgSQL_row)); memset(row, 0, sizeof(PLpgSQL_row));
row->dtype = PLPGSQL_DTYPE_ROW; row->dtype = PLPGSQL_DTYPE_ROW;
/*
* This is a bit ugly --- need a permanent copy of the rel's tupdesc.
* Someday all these mallocs should go away in favor of a per-function
* memory context ...
*/
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
row->rowtupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
MemoryContextSwitchTo(oldcxt);
row->nfields = classStruct->relnatts; row->nfields = classStruct->relnatts;
row->rowtypeclass = classStruct->reltype;
row->fieldnames = malloc(sizeof(char *) * row->nfields); row->fieldnames = malloc(sizeof(char *) * row->nfields);
row->varnos = malloc(sizeof(int) * row->nfields); row->varnos = malloc(sizeof(int) * row->nfields);
for (i = 0; i < row->nfields; i++) for (i = 0; i < row->nfields; i++)
{ {
HeapTuple attrtup;
Form_pg_attribute attrStruct; Form_pg_attribute attrStruct;
HeapTuple typetup;
const char *attname;
PLpgSQL_var *var;
/* /*
* Get the attribute and it's type * Get the attribute and check for dropped column
*/ */
attrtup = SearchSysCache(ATTNUM, attrStruct = RelationGetDescr(rel)->attrs[i];
ObjectIdGetDatum(classOid),
Int16GetDatum(i + 1), if (!attrStruct->attisdropped)
0, 0); {
if (!HeapTupleIsValid(attrtup)) const char *attname;
elog(ERROR, "cache lookup failed for attribute %d of relation %u", HeapTuple typetup;
i + 1, classOid); PLpgSQL_var *var;
attrStruct = (Form_pg_attribute) GETSTRUCT(attrtup);
attname = NameStr(attrStruct->attname); attname = NameStr(attrStruct->attname);
...@@ -1519,17 +1526,17 @@ plpgsql_build_rowtype(Oid classOid) ...@@ -1519,17 +1526,17 @@ plpgsql_build_rowtype(Oid classOid)
attrStruct->atttypid); attrStruct->atttypid);
/* /*
* Create the internal variable * Create the internal variable for the field
* *
* We know if the table definitions contain a default value or if the * We know if the table definitions contain a default value or if
* field is declared in the table as NOT NULL. But it's possible * the field is declared in the table as NOT NULL. But it's
* to create a table field as NOT NULL without a default value and * possible to create a table field as NOT NULL without a default
* that would lead to problems later when initializing the * value and that would lead to problems later when initializing
* variables due to entering a block at execution time. Thus we * the variables due to entering a block at execution time. Thus
* ignore this information for now. * we ignore this information for now.
*/ */
var = malloc(sizeof(PLpgSQL_var)); var = malloc(sizeof(PLpgSQL_var));
memset(var, 0, sizeof(PLpgSQL_var)); MemSet(var, 0, sizeof(PLpgSQL_var));
var->dtype = PLPGSQL_DTYPE_VAR; var->dtype = PLPGSQL_DTYPE_VAR;
var->refname = malloc(strlen(relname) + strlen(attname) + 2); var->refname = malloc(strlen(relname) + strlen(attname) + 2);
strcpy(var->refname, relname); strcpy(var->refname, relname);
...@@ -1552,10 +1559,16 @@ plpgsql_build_rowtype(Oid classOid) ...@@ -1552,10 +1559,16 @@ plpgsql_build_rowtype(Oid classOid)
row->varnos[i] = var->varno; row->varnos[i] = var->varno;
ReleaseSysCache(typetup); ReleaseSysCache(typetup);
ReleaseSysCache(attrtup); }
else
{
/* Leave a hole in the row structure for the dropped col */
row->fieldnames[i] = NULL;
row->varnos[i] = -1;
}
} }
ReleaseSysCache(classtup); heap_close(rel, AccessShareLock);
return row; return row;
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* procedural language * procedural language
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.90 2003/08/04 00:43:33 momjian Exp $ * $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.91 2003/09/25 23:02:12 tgl Exp $
* *
* This software is copyrighted by Jan Wieck - Hamburg. * This software is copyrighted by Jan Wieck - Hamburg.
* *
...@@ -148,6 +148,9 @@ static void exec_move_row(PLpgSQL_execstate * estate, ...@@ -148,6 +148,9 @@ static void exec_move_row(PLpgSQL_execstate * estate,
PLpgSQL_rec * rec, PLpgSQL_rec * rec,
PLpgSQL_row * row, PLpgSQL_row * row,
HeapTuple tup, TupleDesc tupdesc); HeapTuple tup, TupleDesc tupdesc);
static HeapTuple make_tuple_from_row(PLpgSQL_execstate * estate,
PLpgSQL_row * row,
TupleDesc tupdesc);
static Datum exec_cast_value(Datum value, Oid valtype, static Datum exec_cast_value(Datum value, Oid valtype,
Oid reqtype, Oid reqtype,
FmgrInfo *reqinput, FmgrInfo *reqinput,
...@@ -1574,6 +1577,22 @@ exec_stmt_return(PLpgSQL_execstate * estate, PLpgSQL_stmt_return * stmt) ...@@ -1574,6 +1577,22 @@ exec_stmt_return(PLpgSQL_execstate * estate, PLpgSQL_stmt_return * stmt)
return PLPGSQL_RC_RETURN; return PLPGSQL_RC_RETURN;
} }
if (stmt->retrowno >= 0)
{
PLpgSQL_row *row = (PLpgSQL_row *) (estate->datums[stmt->retrowno]);
if (row->rowtupdesc) /* should always be true here */
{
estate->retval = (Datum) make_tuple_from_row(estate, row,
row->rowtupdesc);
if (estate->retval == (Datum) NULL) /* should not happen */
elog(ERROR, "row not compatible with its own tupdesc");
estate->rettupdesc = row->rowtupdesc;
estate->retisnull = false;
}
return PLPGSQL_RC_RETURN;
}
if (stmt->expr != NULL) if (stmt->expr != NULL)
{ {
exec_run_select(estate, stmt->expr, 1, NULL); exec_run_select(estate, stmt->expr, 1, NULL);
...@@ -1650,37 +1669,11 @@ exec_stmt_return_next(PLpgSQL_execstate * estate, ...@@ -1650,37 +1669,11 @@ exec_stmt_return_next(PLpgSQL_execstate * estate,
} }
else if (stmt->row) else if (stmt->row)
{ {
Datum *dvalues; tuple = make_tuple_from_row(estate, stmt->row, tupdesc);
char *nulls; if (tuple == NULL)
int i;
if (natts != stmt->row->nfields)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("wrong record type supplied in RETURN NEXT"))); errmsg("wrong record type supplied in RETURN NEXT")));
dvalues = (Datum *) palloc0(natts * sizeof(Datum));
nulls = (char *) palloc(natts * sizeof(char));
MemSet(nulls, 'n', natts);
for (i = 0; i < natts; i++)
{
PLpgSQL_var *var;
var = (PLpgSQL_var *) (estate->datums[stmt->row->varnos[i]]);
if (var->datatype->typoid != tupdesc->attrs[i]->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("wrong record type supplied in RETURN NEXT")));
dvalues[i] = var->value;
if (!var->isnull)
nulls[i] = ' ';
}
tuple = heap_formtuple(tupdesc, dvalues, nulls);
pfree(dvalues);
pfree(nulls);
free_tuple = true; free_tuple = true;
} }
else if (stmt->expr) else if (stmt->expr)
...@@ -3412,7 +3405,8 @@ exec_move_row(PLpgSQL_execstate * estate, ...@@ -3412,7 +3405,8 @@ exec_move_row(PLpgSQL_execstate * estate,
* expected if it's from an inheritance-child table of the current * expected if it's from an inheritance-child table of the current
* table, or it might have fewer if the table has had columns added by * table, or it might have fewer if the table has had columns added by
* ALTER TABLE. Ignore extra columns and assume NULL for missing * ALTER TABLE. Ignore extra columns and assume NULL for missing
* columns, the same as heap_getattr would do. * columns, the same as heap_getattr would do. We also have to skip
* over dropped columns in either the source or destination.
* *
* If we have no tuple data at all, we'll assign NULL to all columns of * If we have no tuple data at all, we'll assign NULL to all columns of
* the row variable. * the row variable.
...@@ -3420,25 +3414,35 @@ exec_move_row(PLpgSQL_execstate * estate, ...@@ -3420,25 +3414,35 @@ exec_move_row(PLpgSQL_execstate * estate,
if (row != NULL) if (row != NULL)
{ {
int t_natts; int t_natts;
int i; int fnum;
int anum;
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
t_natts = tup->t_data->t_natts; t_natts = tup->t_data->t_natts;
else else
t_natts = 0; t_natts = 0;
for (i = 0; i < row->nfields; i++) anum = 0;
for (fnum = 0; fnum < row->nfields; fnum++)
{ {
PLpgSQL_var *var; PLpgSQL_var *var;
Datum value; Datum value;
bool isnull; bool isnull;
Oid valtype; Oid valtype;
var = (PLpgSQL_var *) (estate->datums[row->varnos[i]]); if (row->varnos[fnum] < 0)
if (i < t_natts) continue; /* skip dropped column in row struct */
var = (PLpgSQL_var *) (estate->datums[row->varnos[fnum]]);
while (anum < t_natts && tupdesc->attrs[anum]->attisdropped)
anum++; /* skip dropped column in tuple */
if (anum < t_natts)
{ {
value = SPI_getbinval(tup, tupdesc, i + 1, &isnull); value = SPI_getbinval(tup, tupdesc, anum + 1, &isnull);
valtype = SPI_gettypeid(tupdesc, i + 1); valtype = SPI_gettypeid(tupdesc, anum + 1);
anum++;
} }
else else
{ {
...@@ -3447,7 +3451,7 @@ exec_move_row(PLpgSQL_execstate * estate, ...@@ -3447,7 +3451,7 @@ exec_move_row(PLpgSQL_execstate * estate,
valtype = InvalidOid; valtype = InvalidOid;
} }
exec_assign_value(estate, estate->datums[row->varnos[i]], exec_assign_value(estate, (PLpgSQL_datum *) var,
value, valtype, &isnull); value, valtype, &isnull);
} }
...@@ -3457,6 +3461,54 @@ exec_move_row(PLpgSQL_execstate * estate, ...@@ -3457,6 +3461,54 @@ exec_move_row(PLpgSQL_execstate * estate,
elog(ERROR, "unsupported target"); elog(ERROR, "unsupported target");
} }
/* ----------
* make_tuple_from_row Make a tuple from the values of a row object
*
* A NULL return indicates rowtype mismatch; caller must raise suitable error
* ----------
*/
static HeapTuple
make_tuple_from_row(PLpgSQL_execstate * estate,
PLpgSQL_row * row,
TupleDesc tupdesc)
{
int natts = tupdesc->natts;
HeapTuple tuple;
Datum *dvalues;
char *nulls;
int i;
if (natts != row->nfields)
return NULL;
dvalues = (Datum *) palloc0(natts * sizeof(Datum));
nulls = (char *) palloc(natts * sizeof(char));
MemSet(nulls, 'n', natts);
for (i = 0; i < natts; i++)
{
PLpgSQL_var *var;
if (tupdesc->attrs[i]->attisdropped)
continue; /* leave the column as null */
if (row->varnos[i] < 0) /* should not happen */
elog(ERROR, "dropped rowtype entry for non-dropped column");
var = (PLpgSQL_var *) (estate->datums[row->varnos[i]]);
if (var->datatype->typoid != tupdesc->attrs[i]->atttypid)
return NULL;
dvalues[i] = var->value;
if (!var->isnull)
nulls[i] = ' ';
}
tuple = heap_formtuple(tupdesc, dvalues, nulls);
pfree(dvalues);
pfree(nulls);
return tuple;
}
/* ---------- /* ----------
* exec_cast_value Cast a value if required * exec_cast_value Cast a value if required
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* procedural language * procedural language
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_funcs.c,v 1.29 2003/08/04 00:43:33 momjian Exp $ * $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_funcs.c,v 1.30 2003/09/25 23:02:12 tgl Exp $
* *
* This software is copyrighted by Jan Wieck - Hamburg. * This software is copyrighted by Jan Wieck - Hamburg.
* *
...@@ -848,15 +848,14 @@ dump_return(PLpgSQL_stmt_return * stmt) ...@@ -848,15 +848,14 @@ dump_return(PLpgSQL_stmt_return * stmt)
{ {
dump_ind(); dump_ind();
printf("RETURN "); printf("RETURN ");
if (stmt->retrecno > 0) if (stmt->retrecno >= 0)
printf("record %d", stmt->retrecno); printf("record %d", stmt->retrecno);
else else if (stmt->retrowno >= 0)
{ printf("row %d", stmt->retrowno);
if (stmt->expr == NULL) else if (stmt->expr == NULL)
printf("NULL"); printf("NULL");
else else
dump_expr(stmt->expr); dump_expr(stmt->expr);
}
printf("\n"); printf("\n");
} }
...@@ -1031,6 +1030,7 @@ plpgsql_dumptree(PLpgSQL_function * func) ...@@ -1031,6 +1030,7 @@ plpgsql_dumptree(PLpgSQL_function * func)
printf("ROW %-16s fields", row->refname); printf("ROW %-16s fields", row->refname);
for (i = 0; i < row->nfields; i++) for (i = 0; i < row->nfields; i++)
{ {
if (row->fieldnames[i])
printf(" %s=var %d", row->fieldnames[i], printf(" %s=var %d", row->fieldnames[i],
row->varnos[i]); row->varnos[i]);
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* procedural language * procedural language
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/pl/plpgsql/src/plpgsql.h,v 1.40 2003/08/18 19:16:02 tgl Exp $ * $Header: /cvsroot/pgsql/src/pl/plpgsql/src/plpgsql.h,v 1.41 2003/09/25 23:02:12 tgl Exp $
* *
* This software is copyrighted by Jan Wieck - Hamburg. * This software is copyrighted by Jan Wieck - Hamburg.
* *
...@@ -207,8 +207,15 @@ typedef struct ...@@ -207,8 +207,15 @@ typedef struct
int rowno; int rowno;
char *refname; char *refname;
int lineno; int lineno;
Oid rowtypeclass; TupleDesc rowtupdesc;
/*
* Note: TupleDesc is only set up for named rowtypes, else it is NULL.
*
* Note: if the underlying rowtype contains a dropped column, the
* corresponding fieldnames[] entry will be NULL, and there is no
* corresponding var (varnos[] will be -1).
*/
int nfields; int nfields;
char **fieldnames; char **fieldnames;
int *varnos; int *varnos;
...@@ -449,6 +456,7 @@ typedef struct ...@@ -449,6 +456,7 @@ typedef struct
int lineno; int lineno;
PLpgSQL_expr *expr; PLpgSQL_expr *expr;
int retrecno; int retrecno;
int retrowno;
} PLpgSQL_stmt_return; } PLpgSQL_stmt_return;
typedef struct typedef struct
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment