Commit 84f5c290 authored by Tom Lane's avatar Tom Lane

Restore the portal-level snapshot after procedure COMMIT/ROLLBACK.

COMMIT/ROLLBACK necessarily destroys all snapshots within the session.
The original implementation of intra-procedure transactions just
cavalierly did that, ignoring the fact that this left us executing in
a rather different environment than normal.  In particular, it turns
out that handling of toasted datums depends rather critically on there
being an outer ActiveSnapshot: otherwise, when SPI or the core
executor pop whatever snapshot they used and return, it's unsafe to
dereference any toasted datums that may appear in the query result.
It's possible to demonstrate "no known snapshots" and "missing chunk
number N for toast value" errors as a result of this oversight.

Historically this outer snapshot has been held by the Portal code,
and that seems like a good plan to preserve.  So add infrastructure
to pquery.c to allow re-establishing the Portal-owned snapshot if it's
not there anymore, and add enough bookkeeping support that we can tell
whether it is or not.

We can't, however, just re-establish the Portal snapshot as part of
COMMIT/ROLLBACK.  As in normal transaction start, acquiring the first
snapshot should wait until after SET and LOCK commands.  Hence, teach
spi.c about doing this at the right time.  (Note that this patch
doesn't fix the problem for any PLs that try to run intra-procedure
transactions without using SPI to execute SQL commands.)

This makes SPI's no_snapshots parameter rather a misnomer, so in HEAD,
rename that to allow_nonatomic.

replication/logical/worker.c also needs some fixes, because it wasn't
careful to hold a snapshot open around AFTER trigger execution.
That code doesn't use a Portal, which I suspect someday we're gonna
have to fix.  But for now, just rearrange the order of operations.
This includes back-patching the recent addition of finish_estate()
to centralize the cleanup logic there.

This also back-patches commit 2ecfeda3 into v13, to improve the
test coverage for worker.c (it was that test that exposed that
worker.c's snapshot management is wrong).

Per bug #15990 from Andreas Wicht.  Back-patch to v11 where
intra-procedure COMMIT was added.

Discussion: https://postgr.es/m/15990-eee2ac466b11293d@postgresql.org
parent 124966c1
...@@ -730,11 +730,11 @@ int SPI_execute_extended(const char *<parameter>command</parameter>, ...@@ -730,11 +730,11 @@ int SPI_execute_extended(const char *<parameter>command</parameter>,
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><literal>bool <parameter>no_snapshots</parameter></literal></term> <term><literal>bool <parameter>allow_nonatomic</parameter></literal></term>
<listitem> <listitem>
<para> <para>
<literal>true</literal> prevents SPI from managing snapshots for <literal>true</literal> allows non-atomic execution of CALL and DO
execution of the query; use with extreme caution statements
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -1860,11 +1860,11 @@ int SPI_execute_plan_extended(SPIPlanPtr <parameter>plan</parameter>, ...@@ -1860,11 +1860,11 @@ int SPI_execute_plan_extended(SPIPlanPtr <parameter>plan</parameter>,
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><literal>bool <parameter>no_snapshots</parameter></literal></term> <term><literal>bool <parameter>allow_nonatomic</parameter></literal></term>
<listitem> <listitem>
<para> <para>
<literal>true</literal> prevents SPI from managing snapshots for <literal>true</literal> allows non-atomic execution of CALL and DO
execution of the query; use with extreme caution statements
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
#include "parser/parse_func.h" #include "parser/parse_func.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "pgstat.h" #include "pgstat.h"
#include "tcop/pquery.h"
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
...@@ -2319,6 +2320,20 @@ ExecuteCallStmt(CallStmt *stmt, ParamListInfo params, bool atomic, DestReceiver ...@@ -2319,6 +2320,20 @@ ExecuteCallStmt(CallStmt *stmt, ParamListInfo params, bool atomic, DestReceiver
if (fcinfo->isnull) if (fcinfo->isnull)
elog(ERROR, "procedure returned null record"); elog(ERROR, "procedure returned null record");
/*
* Ensure there's an active snapshot whilst we execute whatever's
* involved here. Note that this is *not* sufficient to make the
* world safe for TOAST pointers to be included in the returned data:
* the referenced data could have gone away while we didn't hold a
* snapshot. Hence, it's incumbent on PLs that can do COMMIT/ROLLBACK
* to not return TOAST pointers, unless those pointers were fetched
* after the last COMMIT/ROLLBACK in the procedure.
*
* XXX that is a really nasty, hard-to-test requirement. Is there a
* way to remove it?
*/
EnsurePortalSnapshotExists();
td = DatumGetHeapTupleHeader(retval); td = DatumGetHeapTupleHeader(retval);
tupType = HeapTupleHeaderGetTypeId(td); tupType = HeapTupleHeaderGetTypeId(td);
tupTypmod = HeapTupleHeaderGetTypMod(td); tupTypmod = HeapTupleHeaderGetTypMod(td);
......
...@@ -66,7 +66,7 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); ...@@ -66,7 +66,7 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot, Snapshot snapshot, Snapshot crosscheck_snapshot,
bool read_only, bool no_snapshots, bool read_only, bool allow_nonatomic,
bool fire_triggers, uint64 tcount, bool fire_triggers, uint64 tcount,
DestReceiver *caller_dest, DestReceiver *caller_dest,
ResourceOwner plan_owner); ResourceOwner plan_owner);
...@@ -260,12 +260,8 @@ _SPI_commit(bool chain) ...@@ -260,12 +260,8 @@ _SPI_commit(bool chain)
/* Start the actual commit */ /* Start the actual commit */
_SPI_current->internal_xact = true; _SPI_current->internal_xact = true;
/* /* Release snapshots associated with portals */
* Before committing, pop all active snapshots to avoid error about ForgetPortalSnapshots();
* "snapshot %p still active".
*/
while (ActiveSnapshotSet())
PopActiveSnapshot();
if (chain) if (chain)
SaveTransactionCharacteristics(); SaveTransactionCharacteristics();
...@@ -322,6 +318,9 @@ _SPI_rollback(bool chain) ...@@ -322,6 +318,9 @@ _SPI_rollback(bool chain)
/* Start the actual rollback */ /* Start the actual rollback */
_SPI_current->internal_xact = true; _SPI_current->internal_xact = true;
/* Release snapshots associated with portals */
ForgetPortalSnapshots();
if (chain) if (chain)
SaveTransactionCharacteristics(); SaveTransactionCharacteristics();
...@@ -567,7 +566,7 @@ SPI_execute_extended(const char *src, ...@@ -567,7 +566,7 @@ SPI_execute_extended(const char *src,
res = _SPI_execute_plan(&plan, options->params, res = _SPI_execute_plan(&plan, options->params,
InvalidSnapshot, InvalidSnapshot, InvalidSnapshot, InvalidSnapshot,
options->read_only, options->no_snapshots, options->read_only, options->allow_nonatomic,
true, options->tcount, true, options->tcount,
options->dest, options->owner); options->dest, options->owner);
...@@ -627,7 +626,7 @@ SPI_execute_plan_extended(SPIPlanPtr plan, ...@@ -627,7 +626,7 @@ SPI_execute_plan_extended(SPIPlanPtr plan,
res = _SPI_execute_plan(plan, options->params, res = _SPI_execute_plan(plan, options->params,
InvalidSnapshot, InvalidSnapshot, InvalidSnapshot, InvalidSnapshot,
options->read_only, options->no_snapshots, options->read_only, options->allow_nonatomic,
true, options->tcount, true, options->tcount,
options->dest, options->owner); options->dest, options->owner);
...@@ -2264,7 +2263,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) ...@@ -2264,7 +2263,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
* behavior of taking a new snapshot for each query. * behavior of taking a new snapshot for each query.
* crosscheck_snapshot: for RI use, all others pass InvalidSnapshot * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
* read_only: true for read-only execution (no CommandCounterIncrement) * read_only: true for read-only execution (no CommandCounterIncrement)
* no_snapshots: true to skip snapshot management * allow_nonatomic: true to allow nonatomic CALL/DO execution
* fire_triggers: true to fire AFTER triggers at end of query (normal case); * fire_triggers: true to fire AFTER triggers at end of query (normal case);
* false means any AFTER triggers are postponed to end of outer query * false means any AFTER triggers are postponed to end of outer query
* tcount: execution tuple-count limit, or 0 for none * tcount: execution tuple-count limit, or 0 for none
...@@ -2275,7 +2274,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) ...@@ -2275,7 +2274,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot, Snapshot snapshot, Snapshot crosscheck_snapshot,
bool read_only, bool no_snapshots, bool read_only, bool allow_nonatomic,
bool fire_triggers, uint64 tcount, bool fire_triggers, uint64 tcount,
DestReceiver *caller_dest, ResourceOwner plan_owner) DestReceiver *caller_dest, ResourceOwner plan_owner)
{ {
...@@ -2318,11 +2317,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -2318,11 +2317,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
* In the first two cases, we can just push the snap onto the stack once * In the first two cases, we can just push the snap onto the stack once
* for the whole plan list. * for the whole plan list.
* *
* But if no_snapshots is true, then don't manage snapshots at all here. * Note that snapshot != InvalidSnapshot implies an atomic execution
* The caller must then take care of that. * context.
*/ */
if (snapshot != InvalidSnapshot && !no_snapshots) if (snapshot != InvalidSnapshot)
{ {
Assert(!allow_nonatomic);
if (read_only) if (read_only)
{ {
PushActiveSnapshot(snapshot); PushActiveSnapshot(snapshot);
...@@ -2408,15 +2408,39 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -2408,15 +2408,39 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
stmt_list = cplan->stmt_list; stmt_list = cplan->stmt_list;
/* /*
* In the default non-read-only case, get a new snapshot, replacing * If we weren't given a specific snapshot to use, and the statement
* any that we pushed in a previous cycle. * list requires a snapshot, set that up.
*/ */
if (snapshot == InvalidSnapshot && !read_only && !no_snapshots) if (snapshot == InvalidSnapshot &&
(list_length(stmt_list) > 1 ||
(list_length(stmt_list) == 1 &&
PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt,
stmt_list)))))
{ {
if (pushed_active_snap) /*
PopActiveSnapshot(); * First, ensure there's a Portal-level snapshot. This back-fills
PushActiveSnapshot(GetTransactionSnapshot()); * the snapshot stack in case the previous operation was a COMMIT
pushed_active_snap = true; * or ROLLBACK inside a procedure or DO block. (We can't put back
* the Portal snapshot any sooner, or we'd break cases like doing
* SET or LOCK just after COMMIT.) It's enough to check once per
* statement list, since COMMIT/ROLLBACK/CALL/DO can't appear
* within a multi-statement list.
*/
EnsurePortalSnapshotExists();
/*
* In the default non-read-only case, get a new per-statement-list
* snapshot, replacing any that we pushed in a previous cycle.
* Skip it when doing non-atomic execution, though (we rely
* entirely on the Portal snapshot in that case).
*/
if (!read_only && !allow_nonatomic)
{
if (pushed_active_snap)
PopActiveSnapshot();
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
} }
foreach(lc2, stmt_list) foreach(lc2, stmt_list)
...@@ -2434,6 +2458,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -2434,6 +2458,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
_SPI_current->processed = 0; _SPI_current->processed = 0;
_SPI_current->tuptable = NULL; _SPI_current->tuptable = NULL;
/* Check for unsupported cases. */
if (stmt->utilityStmt) if (stmt->utilityStmt)
{ {
if (IsA(stmt->utilityStmt, CopyStmt)) if (IsA(stmt->utilityStmt, CopyStmt))
...@@ -2462,9 +2487,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -2462,9 +2487,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
/* /*
* If not read-only mode, advance the command counter before each * If not read-only mode, advance the command counter before each
* command and update the snapshot. * command and update the snapshot. (But skip it if the snapshot
* isn't under our control.)
*/ */
if (!read_only && !no_snapshots) if (!read_only && pushed_active_snap)
{ {
CommandCounterIncrement(); CommandCounterIncrement();
UpdateActiveSnapshotCommandId(); UpdateActiveSnapshotCommandId();
...@@ -2507,13 +2533,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -2507,13 +2533,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
QueryCompletion qc; QueryCompletion qc;
/* /*
* If the SPI context is atomic, or we are asked to manage * If the SPI context is atomic, or we were not told to allow
* snapshots, then we are in an atomic execution context. * nonatomic operations, tell ProcessUtility this is an atomic
* Conversely, to propagate a nonatomic execution context, the * execution context.
* caller must be in a nonatomic SPI context and manage
* snapshots itself.
*/ */
if (_SPI_current->atomic || !no_snapshots) if (_SPI_current->atomic || !allow_nonatomic)
context = PROCESS_UTILITY_QUERY; context = PROCESS_UTILITY_QUERY;
else else
context = PROCESS_UTILITY_QUERY_NONATOMIC; context = PROCESS_UTILITY_QUERY_NONATOMIC;
......
...@@ -349,6 +349,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, ...@@ -349,6 +349,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
EState *estate; EState *estate;
RangeTblEntry *rte; RangeTblEntry *rte;
/*
* Input functions may need an active snapshot, as may AFTER triggers
* invoked during finish_estate. For safety, ensure an active snapshot
* exists throughout all our usage of the executor.
*/
PushActiveSnapshot(GetTransactionSnapshot());
estate = CreateExecutorState(); estate = CreateExecutorState();
rte = makeNode(RangeTblEntry); rte = makeNode(RangeTblEntry);
...@@ -400,6 +407,7 @@ finish_estate(EState *estate) ...@@ -400,6 +407,7 @@ finish_estate(EState *estate)
/* Cleanup. */ /* Cleanup. */
ExecResetTupleTable(estate->es_tupleTable, false); ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate); FreeExecutorState(estate);
PopActiveSnapshot();
} }
/* /*
...@@ -1212,9 +1220,6 @@ apply_handle_insert(StringInfo s) ...@@ -1212,9 +1220,6 @@ apply_handle_insert(StringInfo s)
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */ /* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_data(remoteslot, rel, &newtup); slot_store_data(remoteslot, rel, &newtup);
...@@ -1229,8 +1234,6 @@ apply_handle_insert(StringInfo s) ...@@ -1229,8 +1234,6 @@ apply_handle_insert(StringInfo s)
apply_handle_insert_internal(resultRelInfo, estate, apply_handle_insert_internal(resultRelInfo, estate,
remoteslot); remoteslot);
PopActiveSnapshot();
finish_estate(estate); finish_estate(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
...@@ -1358,8 +1361,6 @@ apply_handle_update(StringInfo s) ...@@ -1358,8 +1361,6 @@ apply_handle_update(StringInfo s)
/* Also populate extraUpdatedCols, in case we have generated columns */ /* Also populate extraUpdatedCols, in case we have generated columns */
fill_extraUpdatedCols(target_rte, rel->localrel); fill_extraUpdatedCols(target_rte, rel->localrel);
PushActiveSnapshot(GetTransactionSnapshot());
/* Build the search tuple. */ /* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_data(remoteslot, rel, slot_store_data(remoteslot, rel,
...@@ -1374,8 +1375,6 @@ apply_handle_update(StringInfo s) ...@@ -1374,8 +1375,6 @@ apply_handle_update(StringInfo s)
apply_handle_update_internal(resultRelInfo, estate, apply_handle_update_internal(resultRelInfo, estate,
remoteslot, &newtup, rel); remoteslot, &newtup, rel);
PopActiveSnapshot();
finish_estate(estate); finish_estate(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
...@@ -1482,8 +1481,6 @@ apply_handle_delete(StringInfo s) ...@@ -1482,8 +1481,6 @@ apply_handle_delete(StringInfo s)
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
PushActiveSnapshot(GetTransactionSnapshot());
/* Build the search tuple. */ /* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_data(remoteslot, rel, &oldtup); slot_store_data(remoteslot, rel, &oldtup);
...@@ -1497,8 +1494,6 @@ apply_handle_delete(StringInfo s) ...@@ -1497,8 +1494,6 @@ apply_handle_delete(StringInfo s)
apply_handle_delete_internal(resultRelInfo, estate, apply_handle_delete_internal(resultRelInfo, estate,
remoteslot, &rel->remoterel); remoteslot, &rel->remoterel);
PopActiveSnapshot();
finish_estate(estate); finish_estate(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
...@@ -1818,7 +1813,7 @@ apply_handle_truncate(StringInfo s) ...@@ -1818,7 +1813,7 @@ apply_handle_truncate(StringInfo s)
List *relids = NIL; List *relids = NIL;
List *relids_logged = NIL; List *relids_logged = NIL;
ListCell *lc; ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock; LOCKMODE lockmode = AccessExclusiveLock;
if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
return; return;
......
...@@ -476,6 +476,13 @@ PortalStart(Portal portal, ParamListInfo params, ...@@ -476,6 +476,13 @@ PortalStart(Portal portal, ParamListInfo params,
else else
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/*
* We could remember the snapshot in portal->portalSnapshot,
* but presently there seems no need to, as this code path
* cannot be used for non-atomic execution. Hence there can't
* be any commit/abort that might destroy the snapshot.
*/
/* /*
* Create QueryDesc in portal's context; for the moment, set * Create QueryDesc in portal's context; for the moment, set
* the destination to DestNone. * the destination to DestNone.
...@@ -1116,45 +1123,26 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, ...@@ -1116,45 +1123,26 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
bool isTopLevel, bool setHoldSnapshot, bool isTopLevel, bool setHoldSnapshot,
DestReceiver *dest, QueryCompletion *qc) DestReceiver *dest, QueryCompletion *qc)
{ {
Node *utilityStmt = pstmt->utilityStmt;
Snapshot snapshot;
/* /*
* Set snapshot if utility stmt needs one. Most reliable way to do this * Set snapshot if utility stmt needs one.
* seems to be to enumerate those that do not need one; this is a short
* list. Transaction control, LOCK, and SET must *not* set a snapshot
* since they need to be executable at the start of a transaction-snapshot
* mode transaction without freezing a snapshot. By extension we allow
* SHOW not to set a snapshot. The other stmts listed are just efficiency
* hacks. Beware of listing anything that can modify the database --- if,
* say, it has to update an index with expressions that invoke
* user-defined functions, then it had better have a snapshot.
*/ */
if (!(IsA(utilityStmt, TransactionStmt) || if (PlannedStmtRequiresSnapshot(pstmt))
IsA(utilityStmt, LockStmt) ||
IsA(utilityStmt, VariableSetStmt) ||
IsA(utilityStmt, VariableShowStmt) ||
IsA(utilityStmt, ConstraintsSetStmt) ||
/* efficiency hacks from here down */
IsA(utilityStmt, FetchStmt) ||
IsA(utilityStmt, ListenStmt) ||
IsA(utilityStmt, NotifyStmt) ||
IsA(utilityStmt, UnlistenStmt) ||
IsA(utilityStmt, CheckPointStmt)))
{ {
snapshot = GetTransactionSnapshot(); Snapshot snapshot = GetTransactionSnapshot();
/* If told to, register the snapshot we're using and save in portal */ /* If told to, register the snapshot we're using and save in portal */
if (setHoldSnapshot) if (setHoldSnapshot)
{ {
snapshot = RegisterSnapshot(snapshot); snapshot = RegisterSnapshot(snapshot);
portal->holdSnapshot = snapshot; portal->holdSnapshot = snapshot;
} }
/* In any case, make the snapshot active and remember it in portal */
PushActiveSnapshot(snapshot); PushActiveSnapshot(snapshot);
/* PushActiveSnapshot might have copied the snapshot */ /* PushActiveSnapshot might have copied the snapshot */
snapshot = GetActiveSnapshot(); portal->portalSnapshot = GetActiveSnapshot();
} }
else else
snapshot = NULL; portal->portalSnapshot = NULL;
ProcessUtility(pstmt, ProcessUtility(pstmt,
portal->sourceText, portal->sourceText,
...@@ -1168,13 +1156,17 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, ...@@ -1168,13 +1156,17 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
MemoryContextSwitchTo(portal->portalContext); MemoryContextSwitchTo(portal->portalContext);
/* /*
* Some utility commands may pop the ActiveSnapshot stack from under us, * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
* so be careful to only pop the stack if our snapshot is still at the * under us, so don't complain if it's now empty. Otherwise, our snapshot
* top. * should be the top one; pop it. Note that this could be a different
* snapshot from the one we made above; see EnsurePortalSnapshotExists.
*/ */
if (snapshot != NULL && ActiveSnapshotSet() && if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
snapshot == GetActiveSnapshot()) {
Assert(portal->portalSnapshot == GetActiveSnapshot());
PopActiveSnapshot(); PopActiveSnapshot();
}
portal->portalSnapshot = NULL;
} }
/* /*
...@@ -1256,6 +1248,12 @@ PortalRunMulti(Portal portal, ...@@ -1256,6 +1248,12 @@ PortalRunMulti(Portal portal,
* from what holdSnapshot has.) * from what holdSnapshot has.)
*/ */
PushCopiedSnapshot(snapshot); PushCopiedSnapshot(snapshot);
/*
* As for PORTAL_ONE_SELECT portals, it does not seem
* necessary to maintain portal->portalSnapshot here.
*/
active_snapshot_set = true; active_snapshot_set = true;
} }
else else
...@@ -1692,3 +1690,78 @@ DoPortalRewind(Portal portal) ...@@ -1692,3 +1690,78 @@ DoPortalRewind(Portal portal)
portal->atEnd = false; portal->atEnd = false;
portal->portalPos = 0; portal->portalPos = 0;
} }
/*
* PlannedStmtRequiresSnapshot - what it says on the tin
*/
bool
PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
{
Node *utilityStmt = pstmt->utilityStmt;
/* If it's not a utility statement, it definitely needs a snapshot */
if (utilityStmt == NULL)
return true;
/*
* Most utility statements need a snapshot, and the default presumption
* about new ones should be that they do too. Hence, enumerate those that
* do not need one.
*
* Transaction control, LOCK, and SET must *not* set a snapshot, since
* they need to be executable at the start of a transaction-snapshot-mode
* transaction without freezing a snapshot. By extension we allow SHOW
* not to set a snapshot. The other stmts listed are just efficiency
* hacks. Beware of listing anything that can modify the database --- if,
* say, it has to update an index with expressions that invoke
* user-defined functions, then it had better have a snapshot.
*/
if (IsA(utilityStmt, TransactionStmt) ||
IsA(utilityStmt, LockStmt) ||
IsA(utilityStmt, VariableSetStmt) ||
IsA(utilityStmt, VariableShowStmt) ||
IsA(utilityStmt, ConstraintsSetStmt) ||
/* efficiency hacks from here down */
IsA(utilityStmt, FetchStmt) ||
IsA(utilityStmt, ListenStmt) ||
IsA(utilityStmt, NotifyStmt) ||
IsA(utilityStmt, UnlistenStmt) ||
IsA(utilityStmt, CheckPointStmt))
return false;
return true;
}
/*
* EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
*
* Generally, we will have an active snapshot whenever we are executing
* inside a Portal, unless the Portal's query is one of the utility
* statements exempted from that rule (see PlannedStmtRequiresSnapshot).
* However, procedures and DO blocks can commit or abort the transaction,
* and thereby destroy all snapshots. This function can be called to
* re-establish the Portal-level snapshot when none exists.
*/
void
EnsurePortalSnapshotExists(void)
{
Portal portal;
/*
* Nothing to do if a snapshot is set. (We take it on faith that the
* outermost active snapshot belongs to some Portal; or if there is no
* Portal, it's somebody else's responsibility to manage things.)
*/
if (ActiveSnapshotSet())
return;
/* Otherwise, we'd better have an active Portal */
portal = ActivePortal;
Assert(portal != NULL);
Assert(portal->portalSnapshot == NULL);
/* Create a new snapshot and make it active */
PushActiveSnapshot(GetTransactionSnapshot());
/* PushActiveSnapshot might have copied the snapshot */
portal->portalSnapshot = GetActiveSnapshot();
}
...@@ -502,6 +502,9 @@ PortalDrop(Portal portal, bool isTopCommit) ...@@ -502,6 +502,9 @@ PortalDrop(Portal portal, bool isTopCommit)
portal->cleanup = NULL; portal->cleanup = NULL;
} }
/* There shouldn't be an active snapshot anymore, except after error */
Assert(portal->portalSnapshot == NULL || !isTopCommit);
/* /*
* Remove portal from hash table. Because we do this here, we will not * Remove portal from hash table. Because we do this here, we will not
* come back to try to remove the portal again if there's any error in the * come back to try to remove the portal again if there's any error in the
...@@ -709,6 +712,8 @@ PreCommit_Portals(bool isPrepare) ...@@ -709,6 +712,8 @@ PreCommit_Portals(bool isPrepare)
portal->holdSnapshot = NULL; portal->holdSnapshot = NULL;
} }
portal->resowner = NULL; portal->resowner = NULL;
/* Clear portalSnapshot too, for cleanliness */
portal->portalSnapshot = NULL;
continue; continue;
} }
...@@ -1278,3 +1283,54 @@ HoldPinnedPortals(void) ...@@ -1278,3 +1283,54 @@ HoldPinnedPortals(void)
} }
} }
} }
/*
* Drop the outer active snapshots for all portals, so that no snapshots
* remain active.
*
* Like HoldPinnedPortals, this must be called when initiating a COMMIT or
* ROLLBACK inside a procedure. This has to be separate from that since it
* should not be run until we're done with steps that are likely to fail.
*
* It's tempting to fold this into PreCommit_Portals, but to do so, we'd
* need to clean up snapshot management in VACUUM and perhaps other places.
*/
void
ForgetPortalSnapshots(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
int numPortalSnaps = 0;
int numActiveSnaps = 0;
/* First, scan PortalHashTable and clear portalSnapshot fields */
hash_seq_init(&status, PortalHashTable);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
Portal portal = hentry->portal;
if (portal->portalSnapshot != NULL)
{
portal->portalSnapshot = NULL;
numPortalSnaps++;
}
/* portal->holdSnapshot will be cleaned up in PreCommit_Portals */
}
/*
* Now, pop all the active snapshots, which should be just those that were
* portal snapshots. Ideally we'd drive this directly off the portal
* scan, but there's no good way to visit the portals in the correct
* order. So just cross-check after the fact.
*/
while (ActiveSnapshotSet())
{
PopActiveSnapshot();
numActiveSnaps++;
}
if (numPortalSnaps != numActiveSnaps)
elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)",
numPortalSnaps, numActiveSnaps);
}
...@@ -47,7 +47,7 @@ typedef struct SPIExecuteOptions ...@@ -47,7 +47,7 @@ typedef struct SPIExecuteOptions
{ {
ParamListInfo params; ParamListInfo params;
bool read_only; bool read_only;
bool no_snapshots; bool allow_nonatomic;
uint64 tcount; uint64 tcount;
DestReceiver *dest; DestReceiver *dest;
ResourceOwner owner; ResourceOwner owner;
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "utils/portal.h" #include "utils/portal.h"
struct PlannedStmt; /* avoid including plannodes.h here */
extern PGDLLIMPORT Portal ActivePortal; extern PGDLLIMPORT Portal ActivePortal;
...@@ -42,4 +44,8 @@ extern uint64 PortalRunFetch(Portal portal, ...@@ -42,4 +44,8 @@ extern uint64 PortalRunFetch(Portal portal,
long count, long count,
DestReceiver *dest); DestReceiver *dest);
extern bool PlannedStmtRequiresSnapshot(struct PlannedStmt *pstmt);
extern void EnsurePortalSnapshotExists(void);
#endif /* PQUERY_H */ #endif /* PQUERY_H */
...@@ -160,6 +160,14 @@ typedef struct PortalData ...@@ -160,6 +160,14 @@ typedef struct PortalData
/* and these are the format codes to use for the columns: */ /* and these are the format codes to use for the columns: */
int16 *formats; /* a format code for each column */ int16 *formats; /* a format code for each column */
/*
* Outermost ActiveSnapshot for execution of the portal's queries. For
* all but a few utility commands, we require such a snapshot to exist.
* This ensures that TOAST references in query results can be detoasted,
* and helps to reduce thrashing of the process's exposed xmin.
*/
Snapshot portalSnapshot; /* active snapshot, or NULL if none */
/* /*
* Where we store tuples for a held cursor or a PORTAL_ONE_RETURNING or * Where we store tuples for a held cursor or a PORTAL_ONE_RETURNING or
* PORTAL_UTIL_SELECT query. (A cursor held past the end of its * PORTAL_UTIL_SELECT query. (A cursor held past the end of its
...@@ -237,5 +245,6 @@ extern void PortalCreateHoldStore(Portal portal); ...@@ -237,5 +245,6 @@ extern void PortalCreateHoldStore(Portal portal);
extern void PortalHashTableDeleteAll(void); extern void PortalHashTableDeleteAll(void);
extern bool ThereAreNoReadyPortals(void); extern bool ThereAreNoReadyPortals(void);
extern void HoldPinnedPortals(void); extern void HoldPinnedPortals(void);
extern void ForgetPortalSnapshots(void);
#endif /* PORTAL_H */ #endif /* PORTAL_H */
...@@ -2159,7 +2159,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) ...@@ -2159,7 +2159,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
PLpgSQL_expr *expr = stmt->expr; PLpgSQL_expr *expr = stmt->expr;
LocalTransactionId before_lxid; LocalTransactionId before_lxid;
LocalTransactionId after_lxid; LocalTransactionId after_lxid;
bool pushed_active_snap = false;
ParamListInfo paramLI; ParamListInfo paramLI;
SPIExecuteOptions options; SPIExecuteOptions options;
int rc; int rc;
...@@ -2189,27 +2188,17 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) ...@@ -2189,27 +2188,17 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
before_lxid = MyProc->lxid; before_lxid = MyProc->lxid;
/*
* The procedure call could end transactions, which would upset the
* snapshot management in SPI_execute*, so handle snapshots here instead.
* Set a snapshot only for non-read-only procedures, similar to SPI
* behavior.
*/
if (!estate->readonly_func)
{
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
/* /*
* If we have a procedure-lifespan resowner, use that to hold the refcount * If we have a procedure-lifespan resowner, use that to hold the refcount
* for the plan. This avoids refcount leakage complaints if the called * for the plan. This avoids refcount leakage complaints if the called
* procedure ends the current transaction. * procedure ends the current transaction.
*
* Also, tell SPI to allow non-atomic execution.
*/ */
memset(&options, 0, sizeof(options)); memset(&options, 0, sizeof(options));
options.params = paramLI; options.params = paramLI;
options.read_only = estate->readonly_func; options.read_only = estate->readonly_func;
options.no_snapshots = true; /* disable SPI's snapshot management */ options.allow_nonatomic = true;
options.owner = estate->procedure_resowner; options.owner = estate->procedure_resowner;
rc = SPI_execute_plan_extended(expr->plan, &options); rc = SPI_execute_plan_extended(expr->plan, &options);
...@@ -2220,17 +2209,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) ...@@ -2220,17 +2209,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
after_lxid = MyProc->lxid; after_lxid = MyProc->lxid;
if (before_lxid == after_lxid) if (before_lxid != after_lxid)
{
/*
* If we are still in the same transaction after the call, pop the
* snapshot that we might have pushed. (If it's a new transaction,
* then all the snapshots are gone already.)
*/
if (pushed_active_snap)
PopActiveSnapshot();
}
else
{ {
/* /*
* If we are in a new transaction after the call, we need to build new * If we are in a new transaction after the call, we need to build new
...@@ -4954,6 +4933,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt) ...@@ -4954,6 +4933,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
* *
* We just parse and execute the statement normally, but we have to do it * We just parse and execute the statement normally, but we have to do it
* without setting a snapshot, for things like SET TRANSACTION. * without setting a snapshot, for things like SET TRANSACTION.
* XXX spi.c now handles this correctly, so we no longer need a special case.
*/ */
static int static int
exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt) exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
...@@ -4967,7 +4947,6 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt) ...@@ -4967,7 +4947,6 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
memset(&options, 0, sizeof(options)); memset(&options, 0, sizeof(options));
options.read_only = estate->readonly_func; options.read_only = estate->readonly_func;
options.no_snapshots = true; /* disable SPI's snapshot management */
rc = SPI_execute_plan_extended(expr->plan, &options); rc = SPI_execute_plan_extended(expr->plan, &options);
......
...@@ -235,3 +235,30 @@ s1: NOTICE: length(r) = 6002 ...@@ -235,3 +235,30 @@ s1: NOTICE: length(r) = 6002
s1: NOTICE: length(r) = 9002 s1: NOTICE: length(r) = 9002
s1: NOTICE: length(r) = 12002 s1: NOTICE: length(r) = 12002
step assign6: <... completed> step assign6: <... completed>
starting permutation: fetch-after-commit
pg_advisory_unlock_all
pg_advisory_unlock_all
s1: NOTICE: length(t) = 6000
s1: NOTICE: length(t) = 9000
s1: NOTICE: length(t) = 12000
step fetch-after-commit:
do $$
declare
r record;
t text;
begin
insert into test1 values (2, repeat('bar', 3000));
insert into test1 values (3, repeat('baz', 4000));
for r in select test1.a from test1 loop
commit;
select b into t from test1 where a = r.a;
raise notice 'length(t) = %', length(t);
end loop;
end;
$$;
...@@ -131,6 +131,26 @@ do $$ ...@@ -131,6 +131,26 @@ do $$
$$; $$;
} }
# Check that the results of a query can be detoasted just after committing
# (there's no interaction with VACUUM here)
step "fetch-after-commit"
{
do $$
declare
r record;
t text;
begin
insert into test1 values (2, repeat('bar', 3000));
insert into test1 values (3, repeat('baz', 4000));
for r in select test1.a from test1 loop
commit;
select b into t from test1 where a = r.a;
raise notice 'length(t) = %', length(t);
end loop;
end;
$$;
}
session "s2" session "s2"
setup setup
{ {
...@@ -155,3 +175,4 @@ permutation "lock" "assign3" "vacuum" "unlock" ...@@ -155,3 +175,4 @@ permutation "lock" "assign3" "vacuum" "unlock"
permutation "lock" "assign4" "vacuum" "unlock" permutation "lock" "assign4" "vacuum" "unlock"
permutation "lock" "assign5" "vacuum" "unlock" permutation "lock" "assign5" "vacuum" "unlock"
permutation "lock" "assign6" "vacuum" "unlock" permutation "lock" "assign6" "vacuum" "unlock"
permutation "fetch-after-commit"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment