Commit 2efc9241 authored by Tom Lane's avatar Tom Lane

Detoast plpgsql variables if they might live across a transaction boundary.

Up to now, it's been safe for plpgsql to store TOAST pointers in its
variables because the ActiveSnapshot for whatever query called the plpgsql
function will surely protect such TOAST values from being vacuumed away,
even if the owning table rows are committed dead.  With the introduction of
procedures, that assumption is no longer good in "non atomic" executions
of plpgsql code.  We adopt the slightly brute-force solution of detoasting
all TOAST pointers at the time they are stored into variables, if we're in
a non-atomic context, just in case the owning row goes away.

Some care is needed to avoid long-term memory leaks, since plpgsql tends
to run with CurrentMemoryContext pointing to its call-lifespan context,
but we shouldn't assume that no memory is leaked by heap_tuple_fetch_attr.
In plpgsql proper, we can do the detoasting work in the "eval_mcontext".

Most of the code thrashing here is due to the need to add this capability
to expandedrecord.c as well as plpgsql proper.  In expandedrecord.c,
we can't assume that the caller's context is short-lived, so make use of
the short-term sub-context that was already invented for checking domain
constraints.  In view of this repurposing, it seems good to rename that
variable and associated code from "domain_check_cxt" to "short_term_cxt".

Peter Eisentraut and Tom Lane

Discussion: https://postgr.es/m/5AC06865.9050005@anastigmatix.net
parent a11b3bd3
This diff is collapsed.
......@@ -321,6 +321,8 @@ typedef struct
(VARATT_IS_EXTERNAL(PTR) && VARTAG_EXTERNAL(PTR) == VARTAG_EXPANDED_RW)
#define VARATT_IS_EXTERNAL_EXPANDED(PTR) \
(VARATT_IS_EXTERNAL(PTR) && VARTAG_IS_EXPANDED(VARTAG_EXTERNAL(PTR)))
#define VARATT_IS_EXTERNAL_NON_EXPANDED(PTR) \
(VARATT_IS_EXTERNAL(PTR) && !VARTAG_IS_EXPANDED(VARTAG_EXTERNAL(PTR)))
#define VARATT_IS_SHORT(PTR) VARATT_IS_1B(PTR)
#define VARATT_IS_EXTENDED(PTR) (!VARATT_IS_4B_U(PTR))
......
......@@ -127,8 +127,10 @@ typedef struct ExpandedRecordHeader
char *fstartptr; /* start of its data area */
char *fendptr; /* end+1 of its data area */
/* Some operations on the expanded record need a short-lived context */
MemoryContext er_short_term_cxt; /* short-term memory context */
/* Working state for domain checking, used if ER_FLAG_IS_DOMAIN is set */
MemoryContext er_domain_check_cxt; /* short-term memory context */
struct ExpandedRecordHeader *er_dummy_header; /* dummy record header */
void *er_domaininfo; /* cache space for domain_check() */
......@@ -171,7 +173,7 @@ extern ExpandedRecordHeader *make_expanded_record_from_tupdesc(TupleDesc tupdesc
extern ExpandedRecordHeader *make_expanded_record_from_exprecord(ExpandedRecordHeader *olderh,
MemoryContext parentcontext);
extern void expanded_record_set_tuple(ExpandedRecordHeader *erh,
HeapTuple tuple, bool copy);
HeapTuple tuple, bool copy, bool expand_external);
extern Datum make_expanded_record_from_datum(Datum recorddatum,
MemoryContext parentcontext);
extern TupleDesc expanded_record_fetch_tupdesc(ExpandedRecordHeader *erh);
......@@ -186,13 +188,15 @@ extern Datum expanded_record_fetch_field(ExpandedRecordHeader *erh, int fnumber,
extern void expanded_record_set_field_internal(ExpandedRecordHeader *erh,
int fnumber,
Datum newValue, bool isnull,
bool expand_external,
bool check_constraints);
extern void expanded_record_set_fields(ExpandedRecordHeader *erh,
const Datum *newValues, const bool *isnulls);
const Datum *newValues, const bool *isnulls,
bool expand_external);
/* outside code should never call expanded_record_set_field_internal as such */
#define expanded_record_set_field(erh, fnumber, newValue, isnull) \
expanded_record_set_field_internal(erh, fnumber, newValue, isnull, true)
#define expanded_record_set_field(erh, fnumber, newValue, isnull, expand_external) \
expanded_record_set_field_internal(erh, fnumber, newValue, isnull, expand_external, true)
/*
* Inline-able fast cases. The expanded_record_fetch_xxx functions above
......
......@@ -20,6 +20,7 @@
#include "access/htup_details.h"
#include "access/transam.h"
#include "access/tupconvert.h"
#include "access/tuptoaster.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
......@@ -912,16 +913,20 @@ plpgsql_exec_trigger(PLpgSQL_function *func,
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
expanded_record_set_tuple(rec_new->erh, trigdata->tg_trigtuple, false);
expanded_record_set_tuple(rec_new->erh, trigdata->tg_trigtuple,
false, false);
}
else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
expanded_record_set_tuple(rec_new->erh, trigdata->tg_newtuple, false);
expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple, false);
expanded_record_set_tuple(rec_new->erh, trigdata->tg_newtuple,
false, false);
expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple,
false, false);
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple, false);
expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple,
false, false);
}
else
elog(ERROR, "unrecognized trigger action: not INSERT, DELETE, or UPDATE");
......@@ -5061,7 +5066,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
/* And assign it. */
expanded_record_set_field(erh, recfield->finfo.fnumber,
value, isNull);
value, isNull, !estate->atomic);
break;
}
......@@ -5875,7 +5880,8 @@ exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt,
tupdescs_match)
{
/* Only need to assign a new tuple value */
expanded_record_set_tuple(rec->erh, tuptab->vals[i], true);
expanded_record_set_tuple(rec->erh, tuptab->vals[i],
true, !estate->atomic);
}
else
{
......@@ -6647,7 +6653,7 @@ exec_move_row(PLpgSQL_execstate *estate,
*/
newerh = make_expanded_record_for_rec(estate, rec,
NULL, rec->erh);
expanded_record_set_tuple(newerh, NULL, false);
expanded_record_set_tuple(newerh, NULL, false, false);
assign_record_var(estate, rec, newerh);
}
else
......@@ -6689,7 +6695,7 @@ exec_move_row(PLpgSQL_execstate *estate,
else
{
/* No coercion is needed, so just assign the row value */
expanded_record_set_tuple(newerh, tup, true);
expanded_record_set_tuple(newerh, tup, true, !estate->atomic);
}
/* Complete the assignment */
......@@ -6927,7 +6933,7 @@ exec_move_row_from_fields(PLpgSQL_execstate *estate,
}
/* Insert the coerced field values into the new expanded record */
expanded_record_set_fields(newerh, values, nulls);
expanded_record_set_fields(newerh, values, nulls, !estate->atomic);
/* Complete the assignment */
assign_record_var(estate, rec, newerh);
......@@ -7194,7 +7200,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
(erh->er_typmod == rec->erh->er_typmod &&
erh->er_typmod >= 0)))
{
expanded_record_set_tuple(rec->erh, erh->fvalue, true);
expanded_record_set_tuple(rec->erh, erh->fvalue,
true, !estate->atomic);
return;
}
......@@ -7216,7 +7223,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
(rec->rectypeid == RECORDOID ||
rec->rectypeid == erh->er_typeid))
{
expanded_record_set_tuple(newerh, erh->fvalue, true);
expanded_record_set_tuple(newerh, erh->fvalue,
true, !estate->atomic);
assign_record_var(estate, rec, newerh);
return;
}
......@@ -7306,7 +7314,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
(tupTypmod == rec->erh->er_typmod &&
tupTypmod >= 0)))
{
expanded_record_set_tuple(rec->erh, &tmptup, true);
expanded_record_set_tuple(rec->erh, &tmptup,
true, !estate->atomic);
return;
}
......@@ -7323,7 +7332,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
newerh = make_expanded_record_from_typeid(tupType, tupTypmod,
mcontext);
expanded_record_set_tuple(newerh, &tmptup, true);
expanded_record_set_tuple(newerh, &tmptup,
true, !estate->atomic);
assign_record_var(estate, rec, newerh);
return;
}
......@@ -8051,7 +8061,8 @@ plpgsql_subxact_cb(SubXactEvent event, SubTransactionId mySubid,
* assign_simple_var --- assign a new value to any VAR datum.
*
* This should be the only mechanism for assignment to simple variables,
* lest we do the release of the old value incorrectly.
* lest we do the release of the old value incorrectly (not to mention
* the detoasting business).
*/
static void
assign_simple_var(PLpgSQL_execstate *estate, PLpgSQL_var *var,
......@@ -8059,6 +8070,41 @@ assign_simple_var(PLpgSQL_execstate *estate, PLpgSQL_var *var,
{
Assert(var->dtype == PLPGSQL_DTYPE_VAR ||
var->dtype == PLPGSQL_DTYPE_PROMISE);
/*
* In non-atomic contexts, we do not want to store TOAST pointers in
* variables, because such pointers might become stale after a commit.
* Forcibly detoast in such cases. We don't want to detoast (flatten)
* expanded objects, however; those should be OK across a transaction
* boundary since they're just memory-resident objects. (Elsewhere in
* this module, operations on expanded records likewise need to request
* detoasting of record fields when !estate->atomic. Expanded arrays are
* not a problem since all array entries are always detoasted.)
*/
if (!estate->atomic && !isnull && var->datatype->typlen == -1 &&
VARATT_IS_EXTERNAL_NON_EXPANDED(DatumGetPointer(newvalue)))
{
MemoryContext oldcxt;
Datum detoasted;
/*
* Do the detoasting in the eval_mcontext to avoid long-term leakage
* of whatever memory toast fetching might leak. Then we have to copy
* the detoasted datum to the function's main context, which is a
* pain, but there's little choice.
*/
oldcxt = MemoryContextSwitchTo(get_eval_mcontext(estate));
detoasted = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *) DatumGetPointer(newvalue)));
MemoryContextSwitchTo(oldcxt);
/* Now's a good time to not leak the input value if it's freeable */
if (freeable)
pfree(DatumGetPointer(newvalue));
/* Once we copy the value, it's definitely freeable */
newvalue = datumCopy(detoasted, false, -1);
freeable = true;
/* Can't clean up eval_mcontext here, but it'll happen before long */
}
/* Free the old value if needed */
if (var->freeval)
{
......
Parsed test spec with 2 sessions
starting permutation: lock assign1 vacuum unlock
pg_advisory_unlock_all
pg_advisory_unlock_all
step lock:
SELECT pg_advisory_lock(1);
pg_advisory_lock
step assign1:
do $$
declare
x text;
begin
select test1.b into x from test1;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'x = %', x;
end;
$$;
<waiting ...>
step vacuum:
VACUUM test1;
step unlock:
SELECT pg_advisory_unlock(1);
pg_advisory_unlock
t
step assign1: <... completed>
starting permutation: lock assign2 vacuum unlock
pg_advisory_unlock_all
pg_advisory_unlock_all
step lock:
SELECT pg_advisory_lock(1);
pg_advisory_lock
step assign2:
do $$
declare
x text;
begin
x := (select test1.b from test1);
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'x = %', x;
end;
$$;
<waiting ...>
step vacuum:
VACUUM test1;
step unlock:
SELECT pg_advisory_unlock(1);
pg_advisory_unlock
t
step assign2: <... completed>
starting permutation: lock assign3 vacuum unlock
pg_advisory_unlock_all
pg_advisory_unlock_all
step lock:
SELECT pg_advisory_lock(1);
pg_advisory_lock
step assign3:
do $$
declare
r record;
begin
select * into r from test1;
r.b := (select test1.b from test1);
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
<waiting ...>
step vacuum:
VACUUM test1;
step unlock:
SELECT pg_advisory_unlock(1);
pg_advisory_unlock
t
step assign3: <... completed>
starting permutation: lock assign4 vacuum unlock
pg_advisory_unlock_all
pg_advisory_unlock_all
step lock:
SELECT pg_advisory_lock(1);
pg_advisory_lock
step assign4:
do $$
declare
r test2;
begin
select * into r from test1;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
<waiting ...>
step vacuum:
VACUUM test1;
step unlock:
SELECT pg_advisory_unlock(1);
pg_advisory_unlock
t
step assign4: <... completed>
starting permutation: lock assign5 vacuum unlock
pg_advisory_unlock_all
pg_advisory_unlock_all
step lock:
SELECT pg_advisory_lock(1);
pg_advisory_lock
step assign5:
do $$
declare
r record;
begin
for r in select test1.b from test1 loop
null;
end loop;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
<waiting ...>
step vacuum:
VACUUM test1;
step unlock:
SELECT pg_advisory_unlock(1);
pg_advisory_unlock
t
step assign5: <... completed>
......@@ -74,3 +74,4 @@ test: predicate-gin-nomatch
test: partition-key-update-1
test: partition-key-update-2
test: partition-key-update-3
test: plpgsql-toast
# Test TOAST behavior in PL/pgSQL procedures with transaction control.
#
# We need to ensure that values stored in PL/pgSQL variables are free
# of external TOAST references, because those could disappear after a
# transaction is committed (leading to errors "missing chunk number
# ... for toast value ..."). The tests here do this by running VACUUM
# in a second session. Advisory locks are used to have the VACUUM
# kick in at the right time. The different "assign" steps test
# different code paths for variable assignments in PL/pgSQL.
setup
{
CREATE TABLE test1 (a int, b text);
ALTER TABLE test1 ALTER COLUMN b SET STORAGE EXTERNAL;
INSERT INTO test1 VALUES (1, repeat('foo', 2000));
CREATE TYPE test2 AS (a bigint, b text);
}
teardown
{
DROP TABLE test1;
DROP TYPE test2;
}
session "s1"
setup
{
SELECT pg_advisory_unlock_all();
}
# assign_simple_var()
step "assign1"
{
do $$
declare
x text;
begin
select test1.b into x from test1;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'x = %', x;
end;
$$;
}
# assign_simple_var()
step "assign2"
{
do $$
declare
x text;
begin
x := (select test1.b from test1);
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'x = %', x;
end;
$$;
}
# expanded_record_set_field()
step "assign3"
{
do $$
declare
r record;
begin
select * into r from test1;
r.b := (select test1.b from test1);
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
}
# expanded_record_set_fields()
step "assign4"
{
do $$
declare
r test2;
begin
select * into r from test1;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
}
# expanded_record_set_tuple()
step "assign5"
{
do $$
declare
r record;
begin
for r in select test1.b from test1 loop
null;
end loop;
delete from test1;
commit;
perform pg_advisory_lock(1);
raise notice 'r = %', r;
end;
$$;
}
session "s2"
setup
{
SELECT pg_advisory_unlock_all();
}
step "lock"
{
SELECT pg_advisory_lock(1);
}
step "vacuum"
{
VACUUM test1;
}
step "unlock"
{
SELECT pg_advisory_unlock(1);
}
permutation "lock" "assign1" "vacuum" "unlock"
permutation "lock" "assign2" "vacuum" "unlock"
permutation "lock" "assign3" "vacuum" "unlock"
permutation "lock" "assign4" "vacuum" "unlock"
permutation "lock" "assign5" "vacuum" "unlock"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment