Commit 16762b51 authored by Tom Lane's avatar Tom Lane

Speed up array element assignment in plpgsql by caching type information.

Cache assorted data in the PLpgSQL_arrayelem struct to avoid repetitive
catalog lookups over multiple executions of the same statement.

Pavel Stehule
parent 821fd903
...@@ -998,6 +998,8 @@ assign_var : T_DATUM ...@@ -998,6 +998,8 @@ assign_var : T_DATUM
new->dtype = PLPGSQL_DTYPE_ARRAYELEM; new->dtype = PLPGSQL_DTYPE_ARRAYELEM;
new->subscript = $3; new->subscript = $3;
new->arrayparentno = $1; new->arrayparentno = $1;
/* initialize cached type data to "not valid" */
new->parenttypoid = InvalidOid;
plpgsql_adddatum((PLpgSQL_datum *) new); plpgsql_adddatum((PLpgSQL_datum *) new);
......
...@@ -874,7 +874,8 @@ copy_plpgsql_datum(PLpgSQL_datum *datum) ...@@ -874,7 +874,8 @@ copy_plpgsql_datum(PLpgSQL_datum *datum)
/* /*
* These datum records are read-only at runtime, so no need to * These datum records are read-only at runtime, so no need to
* copy them * copy them (well, ARRAYELEM contains some cached type data,
* but we'd just as soon centralize the caching anyway)
*/ */
result = datum; result = datum;
break; break;
...@@ -3986,20 +3987,16 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -3986,20 +3987,16 @@ exec_assign_value(PLpgSQL_execstate *estate,
/* /*
* Target is an element of an array * Target is an element of an array
*/ */
PLpgSQL_arrayelem *arrayelem;
int nsubscripts; int nsubscripts;
int i; int i;
PLpgSQL_expr *subscripts[MAXDIM]; PLpgSQL_expr *subscripts[MAXDIM];
int subscriptvals[MAXDIM]; int subscriptvals[MAXDIM];
bool oldarrayisnull;
Oid arraytypeid,
arrayelemtypeid;
int32 arraytypmod;
int16 arraytyplen,
elemtyplen;
bool elemtypbyval;
char elemtypalign;
Datum oldarraydatum, Datum oldarraydatum,
coerced_value; coerced_value;
bool oldarrayisnull;
Oid parenttypoid;
int32 parenttypmod;
ArrayType *oldarrayval; ArrayType *oldarrayval;
ArrayType *newarrayval; ArrayType *newarrayval;
SPITupleTable *save_eval_tuptable; SPITupleTable *save_eval_tuptable;
...@@ -4020,13 +4017,14 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4020,13 +4017,14 @@ exec_assign_value(PLpgSQL_execstate *estate,
* back to find the base array datum, and save the subscript * back to find the base array datum, and save the subscript
* expressions as we go. (We are scanning right to left here, * expressions as we go. (We are scanning right to left here,
* but want to evaluate the subscripts left-to-right to * but want to evaluate the subscripts left-to-right to
* minimize surprises.) * minimize surprises.) Note that arrayelem is left pointing
* to the leftmost arrayelem datum, where we will cache the
* array element type data.
*/ */
nsubscripts = 0; nsubscripts = 0;
do do
{ {
PLpgSQL_arrayelem *arrayelem = (PLpgSQL_arrayelem *) target; arrayelem = (PLpgSQL_arrayelem *) target;
if (nsubscripts >= MAXDIM) if (nsubscripts >= MAXDIM)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
...@@ -4038,24 +4036,51 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4038,24 +4036,51 @@ exec_assign_value(PLpgSQL_execstate *estate,
/* Fetch current value of array datum */ /* Fetch current value of array datum */
exec_eval_datum(estate, target, exec_eval_datum(estate, target,
&arraytypeid, &arraytypmod, &parenttypoid, &parenttypmod,
&oldarraydatum, &oldarrayisnull); &oldarraydatum, &oldarrayisnull);
/* Update cached type data if necessary */
if (arrayelem->parenttypoid != parenttypoid ||
arrayelem->parenttypmod != parenttypmod)
{
Oid arraytypoid;
int32 arraytypmod = parenttypmod;
int16 arraytyplen;
Oid elemtypoid;
int16 elemtyplen;
bool elemtypbyval;
char elemtypalign;
/* If target is domain over array, reduce to base type */ /* If target is domain over array, reduce to base type */
arraytypeid = getBaseTypeAndTypmod(arraytypeid, &arraytypmod); arraytypoid = getBaseTypeAndTypmod(parenttypoid,
&arraytypmod);
/* ... and identify the element type */ /* ... and identify the element type */
arrayelemtypeid = get_element_type(arraytypeid); elemtypoid = get_element_type(arraytypoid);
if (!OidIsValid(arrayelemtypeid)) if (!OidIsValid(elemtypoid))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("subscripted object is not an array"))); errmsg("subscripted object is not an array")));
get_typlenbyvalalign(arrayelemtypeid, /* Collect needed data about the types */
arraytyplen = get_typlen(arraytypoid);
get_typlenbyvalalign(elemtypoid,
&elemtyplen, &elemtyplen,
&elemtypbyval, &elemtypbyval,
&elemtypalign); &elemtypalign);
arraytyplen = get_typlen(arraytypeid);
/* Now safe to update the cached data */
arrayelem->parenttypoid = parenttypoid;
arrayelem->parenttypmod = parenttypmod;
arrayelem->arraytypoid = arraytypoid;
arrayelem->arraytypmod = arraytypmod;
arrayelem->arraytyplen = arraytyplen;
arrayelem->elemtypoid = elemtypoid;
arrayelem->elemtyplen = elemtyplen;
arrayelem->elemtypbyval = elemtypbyval;
arrayelem->elemtypalign = elemtypalign;
}
/* /*
* Evaluate the subscripts, switch into left-to-right order. * Evaluate the subscripts, switch into left-to-right order.
...@@ -4093,8 +4118,8 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4093,8 +4118,8 @@ exec_assign_value(PLpgSQL_execstate *estate,
/* Coerce source value to match array element type. */ /* Coerce source value to match array element type. */
coerced_value = exec_simple_cast_value(value, coerced_value = exec_simple_cast_value(value,
valtype, valtype,
arrayelemtypeid, arrayelem->elemtypoid,
arraytypmod, arrayelem->arraytypmod,
*isNull); *isNull);
/* /*
...@@ -4107,12 +4132,12 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4107,12 +4132,12 @@ exec_assign_value(PLpgSQL_execstate *estate,
* array, either, so that's a no-op too. This is all ugly but * array, either, so that's a no-op too. This is all ugly but
* corresponds to the current behavior of ExecEvalArrayRef(). * corresponds to the current behavior of ExecEvalArrayRef().
*/ */
if (arraytyplen > 0 && /* fixed-length array? */ if (arrayelem->arraytyplen > 0 && /* fixed-length array? */
(oldarrayisnull || *isNull)) (oldarrayisnull || *isNull))
return; return;
if (oldarrayisnull) if (oldarrayisnull)
oldarrayval = construct_empty_array(arrayelemtypeid); oldarrayval = construct_empty_array(arrayelem->elemtypoid);
else else
oldarrayval = (ArrayType *) DatumGetPointer(oldarraydatum); oldarrayval = (ArrayType *) DatumGetPointer(oldarraydatum);
...@@ -4124,16 +4149,17 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4124,16 +4149,17 @@ exec_assign_value(PLpgSQL_execstate *estate,
subscriptvals, subscriptvals,
coerced_value, coerced_value,
*isNull, *isNull,
arraytyplen, arrayelem->arraytyplen,
elemtyplen, arrayelem->elemtyplen,
elemtypbyval, arrayelem->elemtypbyval,
elemtypalign); arrayelem->elemtypalign);
/* /*
* Avoid leaking the result of exec_simple_cast_value, if it * Avoid leaking the result of exec_simple_cast_value, if it
* performed a conversion to a pass-by-ref type. * performed a conversion to a pass-by-ref type.
*/ */
if (!*isNull && coerced_value != value && !elemtypbyval) if (!*isNull && coerced_value != value &&
!arrayelem->elemtypbyval)
pfree(DatumGetPointer(coerced_value)); pfree(DatumGetPointer(coerced_value));
/* /*
...@@ -4145,7 +4171,7 @@ exec_assign_value(PLpgSQL_execstate *estate, ...@@ -4145,7 +4171,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
*isNull = false; *isNull = false;
exec_assign_value(estate, target, exec_assign_value(estate, target,
PointerGetDatum(newarrayval), PointerGetDatum(newarrayval),
arraytypeid, isNull); arrayelem->arraytypoid, isNull);
/* /*
* Avoid leaking the modified array value, too. * Avoid leaking the modified array value, too.
......
...@@ -299,6 +299,16 @@ typedef struct ...@@ -299,6 +299,16 @@ typedef struct
int dno; int dno;
PLpgSQL_expr *subscript; PLpgSQL_expr *subscript;
int arrayparentno; /* dno of parent array variable */ int arrayparentno; /* dno of parent array variable */
/* Remaining fields are cached info about the array variable's type */
Oid parenttypoid; /* type of array variable; 0 if not yet set */
int32 parenttypmod; /* typmod of array variable */
Oid arraytypoid; /* OID of actual array type */
int32 arraytypmod; /* typmod of array (and its elements too) */
int16 arraytyplen; /* typlen of array type */
Oid elemtypoid; /* OID of array element type */
int16 elemtyplen; /* typlen of element type */
bool elemtypbyval; /* element type is pass-by-value? */
char elemtypalign; /* typalign of element type */
} PLpgSQL_arrayelem; } PLpgSQL_arrayelem;
......
...@@ -4509,3 +4509,65 @@ NOTICE: {"(35,78)","(88,76)"} ...@@ -4509,3 +4509,65 @@ NOTICE: {"(35,78)","(88,76)"}
drop function foreach_test(anyarray); drop function foreach_test(anyarray);
drop type xy_tuple; drop type xy_tuple;
--
-- Assorted tests for array subscript assignment
--
create temp table rtype (id int, ar text[]);
create function arrayassign1() returns text[] language plpgsql as $$
declare
r record;
begin
r := row(12, '{foo,bar,baz}')::rtype;
r.ar[2] := 'replace';
return r.ar;
end$$;
select arrayassign1();
arrayassign1
-------------------
{foo,replace,baz}
(1 row)
select arrayassign1(); -- try again to exercise internal caching
arrayassign1
-------------------
{foo,replace,baz}
(1 row)
create domain orderedarray as int[2]
constraint sorted check (value[1] < value[2]);
select '{1,2}'::orderedarray;
orderedarray
--------------
{1,2}
(1 row)
select '{2,1}'::orderedarray; -- fail
ERROR: value for domain orderedarray violates check constraint "sorted"
create function testoa(x1 int, x2 int, x3 int) returns orderedarray
language plpgsql as $$
declare res orderedarray;
begin
res := array[x1, x2];
res[2] := x3;
return res;
end$$;
select testoa(1,2,3);
testoa
--------
{1,3}
(1 row)
select testoa(1,2,3); -- try again to exercise internal caching
testoa
--------
{1,3}
(1 row)
select testoa(2,1,3); -- fail at initial assign
ERROR: value for domain orderedarray violates check constraint "sorted"
CONTEXT: PL/pgSQL function "testoa" line 4 at assignment
select testoa(1,2,1); -- fail at update
ERROR: value for domain orderedarray violates check constraint "sorted"
CONTEXT: PL/pgSQL function "testoa" line 5 at assignment
drop function arrayassign1();
drop function testoa(x1 int, x2 int, x3 int);
...@@ -3559,3 +3559,44 @@ select foreach_test(ARRAY[[(10,20),(40,69)],[(35,78),(88,76)]]::xy_tuple[]); ...@@ -3559,3 +3559,44 @@ select foreach_test(ARRAY[[(10,20),(40,69)],[(35,78),(88,76)]]::xy_tuple[]);
drop function foreach_test(anyarray); drop function foreach_test(anyarray);
drop type xy_tuple; drop type xy_tuple;
--
-- Assorted tests for array subscript assignment
--
create temp table rtype (id int, ar text[]);
create function arrayassign1() returns text[] language plpgsql as $$
declare
r record;
begin
r := row(12, '{foo,bar,baz}')::rtype;
r.ar[2] := 'replace';
return r.ar;
end$$;
select arrayassign1();
select arrayassign1(); -- try again to exercise internal caching
create domain orderedarray as int[2]
constraint sorted check (value[1] < value[2]);
select '{1,2}'::orderedarray;
select '{2,1}'::orderedarray; -- fail
create function testoa(x1 int, x2 int, x3 int) returns orderedarray
language plpgsql as $$
declare res orderedarray;
begin
res := array[x1, x2];
res[2] := x3;
return res;
end$$;
select testoa(1,2,3);
select testoa(1,2,3); -- try again to exercise internal caching
select testoa(2,1,3); -- fail at initial assign
select testoa(1,2,1); -- fail at update
drop function arrayassign1();
drop function testoa(x1 int, x2 int, x3 int);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment