Commit 84a42560 authored by Tom Lane's avatar Tom Lane

Add array_remove() and array_replace() functions.

These functions support removing or replacing array element value(s)
matching a given search value.  Although intended mainly to support a
future array-foreign-key feature, they seem useful in their own right.

Marco Nenciarini and Gabriele Bartolini, reviewed by Alex Hunsaker
parent f9951252
......@@ -10316,6 +10316,12 @@ SELECT NULLIF(value, '(none)') ...
<indexterm>
<primary>array_prepend</primary>
</indexterm>
<indexterm>
<primary>array_remove</primary>
</indexterm>
<indexterm>
<primary>array_replace</primary>
</indexterm>
<indexterm>
<primary>array_to_string</primary>
</indexterm>
......@@ -10432,6 +10438,29 @@ SELECT NULLIF(value, '(none)') ...
<entry><literal>array_prepend(1, ARRAY[2,3])</literal></entry>
<entry><literal>{1,2,3}</literal></entry>
</row>
<row>
<entry>
<literal>
<function>array_remove</function>(<type>anyarray</type>, <type>anyelement</type>)
</literal>
</entry>
<entry><type>anyarray</type></entry>
<entry>remove all elements equal to the given value from the array
(array must be one-dimensional)</entry>
<entry><literal>array_remove(ARRAY[1,2,3,2], 2)</literal></entry>
<entry><literal>{1,3}</literal></entry>
</row>
<row>
<entry>
<literal>
<function>array_replace</function>(<type>anyarray</type>, <type>anyelement</type>, <type>anyelement</type>)
</literal>
</entry>
<entry><type>anyarray</type></entry>
<entry>replace each array element equal to the given value with a new value</entry>
<entry><literal>array_replace(ARRAY[1,2,5,4], 5, 3)</literal></entry>
<entry><literal>{1,2,3,4}</literal></entry>
</row>
<row>
<entry>
<literal>
......
......@@ -124,6 +124,11 @@ static ArrayType *create_array_envelope(int ndims, int *dimv, int *lbv, int nbyt
static ArrayType *array_fill_internal(ArrayType *dims, ArrayType *lbs,
Datum value, bool isnull, Oid elmtype,
FunctionCallInfo fcinfo);
static ArrayType *array_replace_internal(ArrayType *array,
Datum search, bool search_isnull,
Datum replace, bool replace_isnull,
bool remove, Oid collation,
FunctionCallInfo fcinfo);
/*
......@@ -5174,3 +5179,304 @@ array_unnest(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
}
/*
* array_replace/array_remove support
*
* Find all array entries matching (not distinct from) search/search_isnull,
* and delete them if remove is true, else replace them with
* replace/replace_isnull. Comparisons are done using the specified
* collation. fcinfo is passed only for caching purposes.
*/
static ArrayType *
array_replace_internal(ArrayType *array,
Datum search, bool search_isnull,
Datum replace, bool replace_isnull,
bool remove, Oid collation,
FunctionCallInfo fcinfo)
{
ArrayType *result;
Oid element_type;
Datum *values;
bool *nulls;
int *dim;
int ndim;
int nitems,
nresult;
int i;
int32 nbytes = 0;
int32 dataoffset;
bool hasnulls;
int typlen;
bool typbyval;
char typalign;
char *arraydataptr;
bits8 *bitmap;
int bitmask;
bool changed = false;
TypeCacheEntry *typentry;
FunctionCallInfoData locfcinfo;
element_type = ARR_ELEMTYPE(array);
ndim = ARR_NDIM(array);
dim = ARR_DIMS(array);
nitems = ArrayGetNItems(ndim, dim);
/* Return input array unmodified if it is empty */
if (nitems <= 0)
return array;
/*
* We can't remove elements from multi-dimensional arrays, since the
* result might not be rectangular.
*/
if (remove && ndim > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("removing elements from multidimensional arrays is not supported")));
/*
* We arrange to look up the equality function only once per series of
* calls, assuming the element type doesn't change underneath us.
*/
typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra;
if (typentry == NULL ||
typentry->type_id != element_type)
{
typentry = lookup_type_cache(element_type,
TYPECACHE_EQ_OPR_FINFO);
if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify an equality operator for type %s",
format_type_be(element_type))));
fcinfo->flinfo->fn_extra = (void *) typentry;
}
typlen = typentry->typlen;
typbyval = typentry->typbyval;
typalign = typentry->typalign;
/*
* Detoast values if they are toasted. The replacement value must be
* detoasted for insertion into the result array, while detoasting the
* search value only once saves cycles.
*/
if (typlen == -1)
{
if (!search_isnull)
search = PointerGetDatum(PG_DETOAST_DATUM(search));
if (!replace_isnull)
replace = PointerGetDatum(PG_DETOAST_DATUM(replace));
}
/* Prepare to apply the comparison operator */
InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
collation, NULL, NULL);
/* Allocate temporary arrays for new values */
values = (Datum *) palloc(nitems * sizeof(Datum));
nulls = (bool *) palloc(nitems * sizeof(bool));
/* Loop over source data */
arraydataptr = ARR_DATA_PTR(array);
bitmap = ARR_NULLBITMAP(array);
bitmask = 1;
hasnulls = false;
nresult = 0;
for (i = 0; i < nitems; i++)
{
Datum elt;
bool isNull;
bool oprresult;
bool skip = false;
/* Get source element, checking for NULL */
if (bitmap && (*bitmap & bitmask) == 0)
{
isNull = true;
/* If searching for NULL, we have a match */
if (search_isnull)
{
if (remove)
{
skip = true;
changed = true;
}
else if (!replace_isnull)
{
values[nresult] = replace;
isNull = false;
changed = true;
}
}
}
else
{
isNull = false;
elt = fetch_att(arraydataptr, typbyval, typlen);
arraydataptr = att_addlength_datum(arraydataptr, typlen, elt);
arraydataptr = (char *) att_align_nominal(arraydataptr, typalign);
if (search_isnull)
{
/* no match possible, keep element */
values[nresult] = elt;
}
else
{
/*
* Apply the operator to the element pair
*/
locfcinfo.arg[0] = elt;
locfcinfo.arg[1] = search;
locfcinfo.argnull[0] = false;
locfcinfo.argnull[1] = false;
locfcinfo.isnull = false;
oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
if (!oprresult)
{
/* no match, keep element */
values[nresult] = elt;
}
else
{
/* match, so replace or delete */
changed = true;
if (remove)
skip = true;
else
{
values[nresult] = replace;
isNull = replace_isnull;
}
}
}
}
if (!skip)
{
nulls[nresult] = isNull;
if (isNull)
hasnulls = true;
else
{
/* Update total result size */
nbytes = att_addlength_datum(nbytes, typlen, values[nresult]);
nbytes = att_align_nominal(nbytes, typalign);
/* check for overflow of total request */
if (!AllocSizeIsValid(nbytes))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("array size exceeds the maximum allowed (%d)",
(int) MaxAllocSize)));
}
nresult++;
}
/* advance bitmap pointer if any */
if (bitmap)
{
bitmask <<= 1;
if (bitmask == 0x100)
{
bitmap++;
bitmask = 1;
}
}
}
/*
* If not changed just return the original array
*/
if (!changed)
{
pfree(values);
pfree(nulls);
return array;
}
/* Allocate and initialize the result array */
if (hasnulls)
{
dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nresult);
nbytes += dataoffset;
}
else
{
dataoffset = 0; /* marker for no null bitmap */
nbytes += ARR_OVERHEAD_NONULLS(ndim);
}
result = (ArrayType *) palloc0(nbytes);
SET_VARSIZE(result, nbytes);
result->ndim = ndim;
result->dataoffset = dataoffset;
result->elemtype = element_type;
memcpy(ARR_DIMS(result), ARR_DIMS(array), 2 * ndim * sizeof(int));
if (remove)
{
/* Adjust the result length */
ARR_DIMS(result)[0] = nresult;
}
/* Insert data into result array */
CopyArrayEls(result,
values, nulls, nresult,
typlen, typbyval, typalign,
false);
pfree(values);
pfree(nulls);
return result;
}
/*
* Remove any occurrences of an element from an array
*
* If used on a multi-dimensional array this will raise an error.
*/
Datum
array_remove(PG_FUNCTION_ARGS)
{
ArrayType *array;
Datum search = PG_GETARG_DATUM(1);
bool search_isnull = PG_ARGISNULL(1);
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
array = PG_GETARG_ARRAYTYPE_P(0);
array = array_replace_internal(array,
search, search_isnull,
(Datum) 0, true,
true, PG_GET_COLLATION(),
fcinfo);
PG_RETURN_ARRAYTYPE_P(array);
}
/*
* Replace any occurrences of an element in an array
*/
Datum
array_replace(PG_FUNCTION_ARGS)
{
ArrayType *array;
Datum search = PG_GETARG_DATUM(1);
bool search_isnull = PG_ARGISNULL(1);
Datum replace = PG_GETARG_DATUM(2);
bool replace_isnull = PG_ARGISNULL(2);
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
array = PG_GETARG_ARRAYTYPE_P(0);
array = array_replace_internal(array,
search, search_isnull,
replace, replace_isnull,
false, PG_GET_COLLATION(),
fcinfo);
PG_RETURN_ARRAYTYPE_P(array);
}
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201206171
#define CATALOG_VERSION_NO 201207111
#endif
......@@ -867,6 +867,10 @@ DATA(insert OID = 1286 ( array_fill PGNSP PGUID 12 1 0 0 0 f f f f f f i 3 0 22
DESCR("array constructor with value");
DATA(insert OID = 2331 ( unnest PGNSP PGUID 12 1 100 0 0 f f f f t t i 1 0 2283 "2277" _null_ _null_ _null_ _null_ array_unnest _null_ _null_ _null_ ));
DESCR("expand array to set of rows");
DATA(insert OID = 3167 ( array_remove PGNSP PGUID 12 1 0 0 0 f f f f f f i 2 0 2277 "2277 2283" _null_ _null_ _null_ _null_ array_remove _null_ _null_ _null_ ));
DESCR("remove any occurrences of an element from an array");
DATA(insert OID = 3168 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f f i 3 0 2277 "2277 2283 2283" _null_ _null_ _null_ _null_ array_replace _null_ _null_ _null_ ));
DESCR("replace any occurrences of an element in an array");
DATA(insert OID = 2333 ( array_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f f i 2 0 2281 "2281 2283" _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ ));
DESCR("aggregate transition function");
DATA(insert OID = 2334 ( array_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f f i 1 0 2277 "2281" _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ ));
......
......@@ -211,6 +211,8 @@ extern Datum generate_subscripts_nodir(PG_FUNCTION_ARGS);
extern Datum array_fill(PG_FUNCTION_ARGS);
extern Datum array_fill_with_lower_bounds(PG_FUNCTION_ARGS);
extern Datum array_unnest(PG_FUNCTION_ARGS);
extern Datum array_remove(PG_FUNCTION_ARGS);
extern Datum array_replace(PG_FUNCTION_ARGS);
extern Datum array_ref(ArrayType *array, int nSubscripts, int *indx,
int arraytyplen, int elmlen, bool elmbyval, char elmalign,
......
......@@ -1542,6 +1542,68 @@ select unnest(array[1,2,3,null,4,null,null,5,6]::text[]);
6
(9 rows)
select array_remove(array[1,2,2,3], 2);
array_remove
--------------
{1,3}
(1 row)
select array_remove(array[1,2,2,3], 5);
array_remove
--------------
{1,2,2,3}
(1 row)
select array_remove(array[1,NULL,NULL,3], NULL);
array_remove
--------------
{1,3}
(1 row)
select array_remove(array['A','CC','D','C','RR'], 'RR');
array_remove
--------------
{A,CC,D,C}
(1 row)
select array_remove('{{1,2,2},{1,4,3}}', 2); -- not allowed
ERROR: removing elements from multidimensional arrays is not supported
select array_replace(array[1,2,5,4],5,3);
array_replace
---------------
{1,2,3,4}
(1 row)
select array_replace(array[1,2,5,4],5,NULL);
array_replace
---------------
{1,2,NULL,4}
(1 row)
select array_replace(array[1,2,NULL,4,NULL],NULL,5);
array_replace
---------------
{1,2,5,4,5}
(1 row)
select array_replace(array['A','B','DD','B'],'B','CC');
array_replace
---------------
{A,CC,DD,CC}
(1 row)
select array_replace(array[1,NULL,3],NULL,NULL);
array_replace
---------------
{1,NULL,3}
(1 row)
select array_replace(array['AB',NULL,'CDE'],NULL,'12');
array_replace
---------------
{AB,12,CDE}
(1 row)
-- Insert/update on a column that is array of composite
create temp table t1 (f1 int8_tbl[]);
insert into t1 (f1[5].q1) values(42);
......
......@@ -432,6 +432,17 @@ select unnest(array[1,2,3,4.5]::float8[]);
select unnest(array[1,2,3,4.5]::numeric[]);
select unnest(array[1,2,3,null,4,null,null,5,6]);
select unnest(array[1,2,3,null,4,null,null,5,6]::text[]);
select array_remove(array[1,2,2,3], 2);
select array_remove(array[1,2,2,3], 5);
select array_remove(array[1,NULL,NULL,3], NULL);
select array_remove(array['A','CC','D','C','RR'], 'RR');
select array_remove('{{1,2,2},{1,4,3}}', 2); -- not allowed
select array_replace(array[1,2,5,4],5,3);
select array_replace(array[1,2,5,4],5,NULL);
select array_replace(array[1,2,NULL,4,NULL],NULL,5);
select array_replace(array['A','B','DD','B'],'B','CC');
select array_replace(array[1,NULL,3],NULL,NULL);
select array_replace(array['AB',NULL,'CDE'],NULL,'12');
-- Insert/update on a column that is array of composite
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment