Commit dcb2bda9 authored by Tom Lane's avatar Tom Lane

Improve plpgsql's ability to cope with rowtypes containing dropped columns,

by supporting conversions in places that used to demand exact rowtype match.

Since this issue is certain to come up elsewhere (in fact, already has,
in ExecEvalConvertRowtype), factor out the support code into new core
functions for tuple conversion.  I chose to put these in a new source
file since heaptuple.c is already overly long.

Heavily revised version of a patch by Pavel Stehule.
parent 40001705
......@@ -4,7 +4,7 @@
# Makefile for access/common
#
# IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/access/common/Makefile,v 1.25 2008/02/19 11:49:12 petere Exp $
# $PostgreSQL: pgsql/src/backend/access/common/Makefile,v 1.26 2009/08/06 20:44:31 tgl Exp $
#
#-------------------------------------------------------------------------
......@@ -12,6 +12,7 @@ subdir = src/backend/access/common
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = heaptuple.o indextuple.o printtup.o reloptions.o scankey.o tupdesc.o
OBJS = heaptuple.o indextuple.o printtup.o reloptions.o scankey.o \
tupconvert.o tupdesc.o
include $(top_srcdir)/src/backend/common.mk
This diff is collapsed.
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/execQual.c,v 1.250 2009/06/11 17:25:38 tgl Exp $
* $PostgreSQL: pgsql/src/backend/executor/execQual.c,v 1.251 2009/08/06 20:44:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -37,6 +37,7 @@
#include "postgres.h"
#include "access/nbtree.h"
#include "access/tupconvert.h"
#include "catalog/pg_type.h"
#include "commands/typecmds.h"
#include "executor/execdebug.h"
......@@ -2548,13 +2549,6 @@ ExecEvalConvertRowtype(ConvertRowtypeExprState *cstate,
Datum tupDatum;
HeapTupleHeader tuple;
HeapTupleData tmptup;
AttrNumber *attrMap;
Datum *invalues;
bool *inisnull;
Datum *outvalues;
bool *outisnull;
int i;
int outnatts;
tupDatum = ExecEvalExpr(cstate->arg, econtext, isNull, isDone);
......@@ -2566,110 +2560,51 @@ ExecEvalConvertRowtype(ConvertRowtypeExprState *cstate,
/* Lookup tupdescs if first time through or after rescan */
if (cstate->indesc == NULL)
{
get_cached_rowtype(exprType((Node *) convert->arg), -1,
&cstate->indesc, econtext);
cstate->initialized = false;
}
if (cstate->outdesc == NULL)
{
get_cached_rowtype(convert->resulttype, -1,
&cstate->outdesc, econtext);
cstate->initialized = false;
}
Assert(HeapTupleHeaderGetTypeId(tuple) == cstate->indesc->tdtypeid);
Assert(HeapTupleHeaderGetTypMod(tuple) == cstate->indesc->tdtypmod);
/* if first time through, initialize */
if (cstate->attrMap == NULL)
/* if first time through, initialize conversion map */
if (!cstate->initialized)
{
MemoryContext old_cxt;
int n;
/* allocate state in long-lived memory context */
/* allocate map in long-lived memory context */
old_cxt = MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
/* prepare map from old to new attribute numbers */
n = cstate->outdesc->natts;
cstate->attrMap = (AttrNumber *) palloc0(n * sizeof(AttrNumber));
for (i = 0; i < n; i++)
{
Form_pg_attribute att = cstate->outdesc->attrs[i];
char *attname;
Oid atttypid;
int32 atttypmod;
int j;
if (att->attisdropped)
continue; /* attrMap[i] is already 0 */
attname = NameStr(att->attname);
atttypid = att->atttypid;
atttypmod = att->atttypmod;
for (j = 0; j < cstate->indesc->natts; j++)
{
att = cstate->indesc->attrs[j];
if (att->attisdropped)
continue;
if (strcmp(attname, NameStr(att->attname)) == 0)
{
/* Found it, check type */
if (atttypid != att->atttypid || atttypmod != att->atttypmod)
elog(ERROR, "attribute \"%s\" of type %s does not match corresponding attribute of type %s",
attname,
format_type_be(cstate->indesc->tdtypeid),
format_type_be(cstate->outdesc->tdtypeid));
cstate->attrMap[i] = (AttrNumber) (j + 1);
break;
}
}
if (cstate->attrMap[i] == 0)
elog(ERROR, "attribute \"%s\" of type %s does not exist",
attname,
format_type_be(cstate->indesc->tdtypeid));
}
/* preallocate workspace for Datum arrays */
n = cstate->indesc->natts + 1; /* +1 for NULL */
cstate->invalues = (Datum *) palloc(n * sizeof(Datum));
cstate->inisnull = (bool *) palloc(n * sizeof(bool));
n = cstate->outdesc->natts;
cstate->outvalues = (Datum *) palloc(n * sizeof(Datum));
cstate->outisnull = (bool *) palloc(n * sizeof(bool));
cstate->map = convert_tuples_by_name(cstate->indesc,
cstate->outdesc,
gettext_noop("could not convert row type"));
cstate->initialized = true;
MemoryContextSwitchTo(old_cxt);
}
attrMap = cstate->attrMap;
invalues = cstate->invalues;
inisnull = cstate->inisnull;
outvalues = cstate->outvalues;
outisnull = cstate->outisnull;
outnatts = cstate->outdesc->natts;
/*
* heap_deform_tuple needs a HeapTuple not a bare HeapTupleHeader.
* No-op if no conversion needed (not clear this can happen here).
*/
tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
tmptup.t_data = tuple;
/*
* Extract all the values of the old tuple, offsetting the arrays so that
* invalues[0] is NULL and invalues[1] is the first source attribute; this
* exactly matches the numbering convention in attrMap.
*/
heap_deform_tuple(&tmptup, cstate->indesc, invalues + 1, inisnull + 1);
invalues[0] = (Datum) 0;
inisnull[0] = true;
if (cstate->map == NULL)
return tupDatum;
/*
* Transpose into proper fields of the new tuple.
* do_convert_tuple needs a HeapTuple not a bare HeapTupleHeader.
*/
for (i = 0; i < outnatts; i++)
{
int j = attrMap[i];
outvalues[i] = invalues[j];
outisnull[i] = inisnull[j];
}
tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
tmptup.t_data = tuple;
/*
* Now form the new tuple.
*/
result = heap_form_tuple(cstate->outdesc, outvalues, outisnull);
result = do_convert_tuple(&tmptup, cstate->map);
return HeapTupleGetDatum(result);
}
......
/*-------------------------------------------------------------------------
*
* tupconvert.h
* Tuple conversion support.
*
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/tupconvert.h,v 1.1 2009/08/06 20:44:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef TUPCONVERT_H
#define TUPCONVERT_H
#include "access/htup.h"
typedef struct TupleConversionMap
{
TupleDesc indesc; /* tupdesc for source rowtype */
TupleDesc outdesc; /* tupdesc for result rowtype */
AttrNumber *attrMap; /* indexes of input fields, or 0 for null */
Datum *invalues; /* workspace for deconstructing source */
bool *inisnull;
Datum *outvalues; /* workspace for constructing result */
bool *outisnull;
} TupleConversionMap;
extern TupleConversionMap *convert_tuples_by_position(TupleDesc indesc,
TupleDesc outdesc,
const char *msg);
extern TupleConversionMap *convert_tuples_by_name(TupleDesc indesc,
TupleDesc outdesc,
const char *msg);
extern HeapTuple do_convert_tuple(HeapTuple tuple, TupleConversionMap *map);
extern void free_conversion_map(TupleConversionMap *map);
#endif /* TUPCONVERT_H */
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.205 2009/06/11 14:49:11 momjian Exp $
* $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.206 2009/08/06 20:44:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -750,11 +750,9 @@ typedef struct ConvertRowtypeExprState
ExprState *arg; /* input tuple value */
TupleDesc indesc; /* tupdesc for source rowtype */
TupleDesc outdesc; /* tupdesc for result rowtype */
AttrNumber *attrMap; /* indexes of input fields, or 0 for null */
Datum *invalues; /* workspace for deconstructing source */
bool *inisnull;
Datum *outvalues; /* workspace for constructing result */
bool *outisnull;
/* use "struct" so we needn't include tupconvert.h here */
struct TupleConversionMap *map;
bool initialized;
} ConvertRowtypeExprState;
/* ----------------
......
# $PostgreSQL: pgsql/src/pl/plpgsql/src/nls.mk,v 1.9 2009/06/26 19:33:52 petere Exp $
# $PostgreSQL: pgsql/src/pl/plpgsql/src/nls.mk,v 1.10 2009/08/06 20:44:31 tgl Exp $
CATALOG_NAME := plpgsql
AVAIL_LANGUAGES := de es fr ja ro
GETTEXT_FILES := pl_comp.c pl_exec.c pl_gram.c pl_funcs.c pl_handler.c pl_scan.c
GETTEXT_TRIGGERS:= _ errmsg errmsg_plural:1,2 errdetail errdetail_log errdetail_plural:1,2 errhint errcontext validate_tupdesc_compat:3 yyerror plpgsql_yyerror
GETTEXT_TRIGGERS:= _ errmsg errmsg_plural:1,2 errdetail errdetail_log errdetail_plural:1,2 errhint errcontext yyerror plpgsql_yyerror
.PHONY: gettext-files
gettext-files: distprep
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.247 2009/08/04 21:22:46 alvherre Exp $
* $PostgreSQL: pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.248 2009/08/06 20:44:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -18,6 +18,7 @@
#include <ctype.h>
#include "access/transam.h"
#include "access/tupconvert.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/spi_priv.h"
......@@ -191,8 +192,6 @@ static Datum exec_simple_cast_value(Datum value, Oid valtype,
Oid reqtype, int32 reqtypmod,
bool isnull);
static void exec_init_tuple_store(PLpgSQL_execstate *estate);
static void validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
const char *msg);
static void exec_set_found(PLpgSQL_execstate *estate, bool state);
static void plpgsql_create_econtext(PLpgSQL_execstate *estate);
static void plpgsql_destroy_econtext(PLpgSQL_execstate *estate);
......@@ -383,14 +382,21 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo)
* expected result type. XXX would be better to cache the tupdesc
* instead of repeating get_call_result_type()
*/
HeapTuple rettup = (HeapTuple) DatumGetPointer(estate.retval);
TupleDesc tupdesc;
TupleConversionMap *tupmap;
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* got the expected result rowtype, now check it */
validate_tupdesc_compat(tupdesc, estate.rettupdesc,
"returned record type does not match expected record type");
tupmap = convert_tuples_by_position(estate.rettupdesc,
tupdesc,
gettext_noop("returned record type does not match expected record type"));
/* it might need conversion */
if (tupmap)
rettup = do_convert_tuple(rettup, tupmap);
/* no need to free map, we're about to return anyway */
break;
case TYPEFUNC_RECORD:
......@@ -415,9 +421,7 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo)
* Copy tuple to upper executor memory, as a tuple Datum. Make
* sure it is labeled with the caller-supplied tuple type.
*/
estate.retval =
PointerGetDatum(SPI_returntuple((HeapTuple) DatumGetPointer(estate.retval),
tupdesc));
estate.retval = PointerGetDatum(SPI_returntuple(rettup, tupdesc));
}
else
{
......@@ -706,11 +710,20 @@ plpgsql_exec_trigger(PLpgSQL_function *func,
rettup = NULL;
else
{
validate_tupdesc_compat(trigdata->tg_relation->rd_att,
estate.rettupdesc,
"returned row structure does not match the structure of the triggering table");
TupleConversionMap *tupmap;
rettup = (HeapTuple) DatumGetPointer(estate.retval);
/* check rowtype compatibility */
tupmap = convert_tuples_by_position(estate.rettupdesc,
trigdata->tg_relation->rd_att,
gettext_noop("returned row structure does not match the structure of the triggering table"));
/* it might need conversion */
if (tupmap)
rettup = do_convert_tuple(rettup, tupmap);
/* no need to free map, we're about to return anyway */
/* Copy tuple to upper executor memory */
rettup = SPI_copytuple((HeapTuple) DatumGetPointer(estate.retval));
rettup = SPI_copytuple(rettup);
}
/*
......@@ -2192,6 +2205,7 @@ exec_stmt_return_next(PLpgSQL_execstate *estate,
case PLPGSQL_DTYPE_REC:
{
PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar;
TupleConversionMap *tupmap;
if (!HeapTupleIsValid(rec->tup))
ereport(ERROR,
......@@ -2200,9 +2214,16 @@ exec_stmt_return_next(PLpgSQL_execstate *estate,
rec->refname),
errdetail("The tuple structure of a not-yet-assigned"
" record is indeterminate.")));
validate_tupdesc_compat(tupdesc, rec->tupdesc,
"wrong record type supplied in RETURN NEXT");
tupmap = convert_tuples_by_position(rec->tupdesc,
tupdesc,
gettext_noop("wrong record type supplied in RETURN NEXT"));
tuple = rec->tup;
/* it might need conversion */
if (tupmap)
{
tuple = do_convert_tuple(tuple, tupmap);
free_conversion_map(tupmap);
}
}
break;
......@@ -2286,6 +2307,7 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
{
Portal portal;
uint32 processed = 0;
TupleConversionMap *tupmap;
if (!estate->retisset)
ereport(ERROR,
......@@ -2308,8 +2330,9 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
stmt->params);
}
validate_tupdesc_compat(estate->rettupdesc, portal->tupDesc,
"structure of query does not match function result type");
tupmap = convert_tuples_by_position(portal->tupDesc,
estate->rettupdesc,
gettext_noop("structure of query does not match function result type"));
while (true)
{
......@@ -2325,7 +2348,11 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
{
HeapTuple tuple = SPI_tuptable->vals[i];
if (tupmap)
tuple = do_convert_tuple(tuple, tupmap);
tuplestore_puttuple(estate->tuple_store, tuple);
if (tupmap)
heap_freetuple(tuple);
processed++;
}
MemoryContextSwitchTo(old_cxt);
......@@ -2333,6 +2360,9 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
SPI_freetuptable(SPI_tuptable);
}
if (tupmap)
free_conversion_map(tupmap);
SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(portal);
......@@ -5121,45 +5151,6 @@ exec_simple_check_plan(PLpgSQL_expr *expr)
expr->expr_simple_type = exprType((Node *) tle->expr);
}
/*
* Validates compatibility of supplied TupleDesc pair by checking number and type
* of attributes.
*/
static void
validate_tupdesc_compat(TupleDesc expected, TupleDesc returned, const char *msg)
{
int i;
const char *dropped_column_type = gettext_noop("N/A (dropped column)");
if (!expected || !returned)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg))));
if (expected->natts != returned->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg)),
errdetail("Number of returned columns (%d) does not match "
"expected column count (%d).",
returned->natts, expected->natts)));
for (i = 0; i < expected->natts; i++)
if (expected->attrs[i]->atttypid != returned->attrs[i]->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg)),
errdetail("Returned type %s does not match expected type "
"%s in column \"%s\".",
OidIsValid(returned->attrs[i]->atttypid) ?
format_type_be(returned->attrs[i]->atttypid) :
_(dropped_column_type),
OidIsValid(expected->attrs[i]->atttypid) ?
format_type_be(expected->attrs[i]->atttypid) :
_(dropped_column_type),
NameStr(expected->attrs[i]->attname))));
}
/* ----------
* exec_set_found Set the global found variable
* to true/false
......
......@@ -3287,6 +3287,57 @@ select * from return_dquery();
(4 rows)
drop function return_dquery();
-- test RETURN QUERY with dropped columns
create table tabwithcols(a int, b int, c int, d int);
insert into tabwithcols values(10,20,30,40),(50,60,70,80);
create or replace function returnqueryf()
returns setof tabwithcols as $$
begin
return query select * from tabwithcols;
return query execute 'select * from tabwithcols';
end;
$$ language plpgsql;
select * from returnqueryf();
a | b | c | d
----+----+----+----
10 | 20 | 30 | 40
50 | 60 | 70 | 80
10 | 20 | 30 | 40
50 | 60 | 70 | 80
(4 rows)
alter table tabwithcols drop column b;
select * from returnqueryf();
a | c | d
----+----+----
10 | 30 | 40
50 | 70 | 80
10 | 30 | 40
50 | 70 | 80
(4 rows)
alter table tabwithcols drop column d;
select * from returnqueryf();
a | c
----+----
10 | 30
50 | 70
10 | 30
50 | 70
(4 rows)
alter table tabwithcols add column d int;
select * from returnqueryf();
a | c | d
----+----+---
10 | 30 |
50 | 70 |
10 | 30 |
50 | 70 |
(4 rows)
drop function returnqueryf();
drop table tabwithcols;
-- Tests for 8.4's new RAISE features
create or replace function raise_test() returns void as $$
begin
......
......@@ -2684,6 +2684,36 @@ select * from return_dquery();
drop function return_dquery();
-- test RETURN QUERY with dropped columns
create table tabwithcols(a int, b int, c int, d int);
insert into tabwithcols values(10,20,30,40),(50,60,70,80);
create or replace function returnqueryf()
returns setof tabwithcols as $$
begin
return query select * from tabwithcols;
return query execute 'select * from tabwithcols';
end;
$$ language plpgsql;
select * from returnqueryf();
alter table tabwithcols drop column b;
select * from returnqueryf();
alter table tabwithcols drop column d;
select * from returnqueryf();
alter table tabwithcols add column d int;
select * from returnqueryf();
drop function returnqueryf();
drop table tabwithcols;
-- Tests for 8.4's new RAISE features
create or replace function raise_test() returns void as $$
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment