Commit c0b00760 authored by Tom Lane's avatar Tom Lane

Rearrange snapshot handling to make rule expansion more consistent.

With this patch, portals, SQL functions, and SPI all agree that there
should be only a CommandCounterIncrement between the queries that are
generated from a single SQL command by rule expansion.  Fetching a whole
new snapshot now happens only between original queries.  This is equivalent
to the existing behavior of EXPLAIN ANALYZE, and it was judged to be the
best choice since it eliminates one source of concurrency hazards for
rules.  The patch should also make things marginally faster by reducing the
number of snapshot push/pop operations.

The patch removes pg_parse_and_rewrite(), which is no longer used anywhere.
There was considerable discussion about more aggressive refactoring of the
query-processing functions exported by postgres.c, but for the moment
nothing more has been done there.

I also took the opportunity to refactor snapmgr.c's API slightly: the
former PushUpdatedSnapshot() has been split into two functions.

Marko Tiikkaja, reviewed by Steve Singer and Tom Lane
parent 57e9bda5
...@@ -764,7 +764,9 @@ fmgr_sql_validator(PG_FUNCTION_ARGS) ...@@ -764,7 +764,9 @@ fmgr_sql_validator(PG_FUNCTION_ARGS)
Oid funcoid = PG_GETARG_OID(0); Oid funcoid = PG_GETARG_OID(0);
HeapTuple tuple; HeapTuple tuple;
Form_pg_proc proc; Form_pg_proc proc;
List *raw_parsetree_list;
List *querytree_list; List *querytree_list;
ListCell *lc;
bool isnull; bool isnull;
Datum tmp; Datum tmp;
char *prosrc; char *prosrc;
...@@ -835,17 +837,32 @@ fmgr_sql_validator(PG_FUNCTION_ARGS) ...@@ -835,17 +837,32 @@ fmgr_sql_validator(PG_FUNCTION_ARGS)
* We can run the text through the raw parser though; this will at * We can run the text through the raw parser though; this will at
* least catch silly syntactic errors. * least catch silly syntactic errors.
*/ */
raw_parsetree_list = pg_parse_query(prosrc);
if (!haspolyarg) if (!haspolyarg)
{ {
querytree_list = pg_parse_and_rewrite(prosrc, /*
proc->proargtypes.values, * OK to do full precheck: analyze and rewrite the queries,
proc->pronargs); * then verify the result type.
*/
querytree_list = NIL;
foreach(lc, raw_parsetree_list)
{
Node *parsetree = (Node *) lfirst(lc);
List *querytree_sublist;
querytree_sublist = pg_analyze_and_rewrite(parsetree,
prosrc,
proc->proargtypes.values,
proc->pronargs);
querytree_list = list_concat(querytree_list,
querytree_sublist);
}
(void) check_sql_fn_retval(funcoid, proc->prorettype, (void) check_sql_fn_retval(funcoid, proc->prorettype,
querytree_list, querytree_list,
NULL, NULL); NULL, NULL);
} }
else
querytree_list = pg_parse_query(prosrc);
error_context_stack = sqlerrcontext.previous; error_context_stack = sqlerrcontext.previous;
} }
......
...@@ -1216,7 +1216,8 @@ BeginCopy(bool is_from, ...@@ -1216,7 +1216,8 @@ BeginCopy(bool is_from,
* Use a snapshot with an updated command ID to ensure this query sees * Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. * results of any previously executed queries.
*/ */
PushUpdatedSnapshot(GetActiveSnapshot()); PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create dest receiver for COPY OUT */ /* Create dest receiver for COPY OUT */
dest = CreateDestReceiver(DestCopyOut); dest = CreateDestReceiver(DestCopyOut);
......
...@@ -366,7 +366,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, ExplainState *es, ...@@ -366,7 +366,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, ExplainState *es,
* Use a snapshot with an updated command ID to ensure this query sees * Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. * results of any previously executed queries.
*/ */
PushUpdatedSnapshot(GetActiveSnapshot()); PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc requesting no output */ /* Create a QueryDesc requesting no output */
queryDesc = CreateQueryDesc(plannedstmt, queryString, queryDesc = CreateQueryDesc(plannedstmt, queryString,
......
...@@ -47,6 +47,10 @@ typedef struct ...@@ -47,6 +47,10 @@ typedef struct
* We have an execution_state record for each query in a function. Each * We have an execution_state record for each query in a function. Each
* record contains a plantree for its query. If the query is currently in * record contains a plantree for its query. If the query is currently in
* F_EXEC_RUN state then there's a QueryDesc too. * F_EXEC_RUN state then there's a QueryDesc too.
*
* The "next" fields chain together all the execution_state records generated
* from a single original parsetree. (There will only be more than one in
* case of rule expansion of the original parsetree.)
*/ */
typedef enum typedef enum
{ {
...@@ -93,15 +97,20 @@ typedef struct ...@@ -93,15 +97,20 @@ typedef struct
JunkFilter *junkFilter; /* will be NULL if function returns VOID */ JunkFilter *junkFilter; /* will be NULL if function returns VOID */
/* head of linked list of execution_state records */ /*
execution_state *func_state; * func_state is a List of execution_state records, each of which is the
* first for its original parsetree, with any additional records chained
* to it via the "next" fields. This sublist structure is needed to keep
* track of where the original query boundaries are.
*/
List *func_state;
} SQLFunctionCache; } SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr; typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */ /* non-export function prototypes */
static execution_state *init_execution_state(List *queryTree_list, static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache, SQLFunctionCachePtr fcache,
bool lazyEvalOK); bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
...@@ -122,62 +131,78 @@ static void sqlfunction_shutdown(DestReceiver *self); ...@@ -122,62 +131,78 @@ static void sqlfunction_shutdown(DestReceiver *self);
static void sqlfunction_destroy(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self);
/* Set up the list of per-query execution_state records for a SQL function */ /*
static execution_state * * Set up the per-query execution_state records for a SQL function.
*
* The input is a List of Lists of parsed and rewritten, but not planned,
* querytrees. The sublist structure denotes the original query boundaries.
*/
static List *
init_execution_state(List *queryTree_list, init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache, SQLFunctionCachePtr fcache,
bool lazyEvalOK) bool lazyEvalOK)
{ {
execution_state *firstes = NULL; List *eslist = NIL;
execution_state *preves = NULL;
execution_state *lasttages = NULL; execution_state *lasttages = NULL;
ListCell *qtl_item; ListCell *lc1;
foreach(qtl_item, queryTree_list) foreach(lc1, queryTree_list)
{ {
Query *queryTree = (Query *) lfirst(qtl_item); List *qtlist = (List *) lfirst(lc1);
Node *stmt; execution_state *firstes = NULL;
execution_state *newes; execution_state *preves = NULL;
ListCell *lc2;
Assert(IsA(queryTree, Query)); foreach(lc2, qtlist)
{
Query *queryTree = (Query *) lfirst(lc2);
Node *stmt;
execution_state *newes;
if (queryTree->commandType == CMD_UTILITY) Assert(IsA(queryTree, Query));
stmt = queryTree->utilityStmt;
else
stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
/* Precheck all commands for validity in a function */ /* Plan the query if needed */
if (IsA(stmt, TransactionStmt)) if (queryTree->commandType == CMD_UTILITY)
ereport(ERROR, stmt = queryTree->utilityStmt;
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), else
/* translator: %s is a SQL statement name */ stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
errmsg("%s is not allowed in a SQL function",
CreateCommandTag(stmt))));
if (fcache->readonly_func && !CommandIsReadOnly(stmt)) /* Precheck all commands for validity in a function */
ereport(ERROR, if (IsA(stmt, TransactionStmt))
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR,
/* translator: %s is a SQL statement name */ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("%s is not allowed in a non-volatile function", /* translator: %s is a SQL statement name */
CreateCommandTag(stmt)))); errmsg("%s is not allowed in a SQL function",
CreateCommandTag(stmt))));
newes = (execution_state *) palloc(sizeof(execution_state)); if (fcache->readonly_func && !CommandIsReadOnly(stmt))
if (preves) ereport(ERROR,
preves->next = newes; (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
else /* translator: %s is a SQL statement name */
firstes = newes; errmsg("%s is not allowed in a non-volatile function",
CreateCommandTag(stmt))));
/* OK, build the execution_state for this query */
newes = (execution_state *) palloc(sizeof(execution_state));
if (preves)
preves->next = newes;
else
firstes = newes;
newes->next = NULL; newes->next = NULL;
newes->status = F_EXEC_START; newes->status = F_EXEC_START;
newes->setsResult = false; /* might change below */ newes->setsResult = false; /* might change below */
newes->lazyEval = false; /* might change below */ newes->lazyEval = false; /* might change below */
newes->stmt = stmt; newes->stmt = stmt;
newes->qd = NULL; newes->qd = NULL;
if (queryTree->canSetTag) if (queryTree->canSetTag)
lasttages = newes; lasttages = newes;
preves = newes; preves = newes;
}
eslist = lappend(eslist, firstes);
} }
/* /*
...@@ -211,7 +236,7 @@ init_execution_state(List *queryTree_list, ...@@ -211,7 +236,7 @@ init_execution_state(List *queryTree_list,
} }
} }
return firstes; return eslist;
} }
/* Initialize the SQLFunctionCache for a SQL function */ /* Initialize the SQLFunctionCache for a SQL function */
...@@ -225,7 +250,10 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) ...@@ -225,7 +250,10 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
SQLFunctionCachePtr fcache; SQLFunctionCachePtr fcache;
Oid *argOidVect; Oid *argOidVect;
int nargs; int nargs;
List *raw_parsetree_list;
List *queryTree_list; List *queryTree_list;
List *flat_query_list;
ListCell *lc;
Datum tmp; Datum tmp;
bool isNull; bool isNull;
...@@ -318,9 +346,32 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) ...@@ -318,9 +346,32 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
fcache->src = TextDatumGetCString(tmp); fcache->src = TextDatumGetCString(tmp);
/* /*
* Parse and rewrite the queries in the function text. * Parse and rewrite the queries in the function text. Use sublists to
* keep track of the original query boundaries. But we also build a
* "flat" list of the rewritten queries to pass to check_sql_fn_retval.
* This is because the last canSetTag query determines the result type
* independently of query boundaries --- and it might not be in the last
* sublist, for example if the last query rewrites to DO INSTEAD NOTHING.
* (It might not be unreasonable to throw an error in such a case, but
* this is the historical behavior and it doesn't seem worth changing.)
*/ */
queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs); raw_parsetree_list = pg_parse_query(fcache->src);
queryTree_list = NIL;
flat_query_list = NIL;
foreach(lc, raw_parsetree_list)
{
Node *parsetree = (Node *) lfirst(lc);
List *queryTree_sublist;
queryTree_sublist = pg_analyze_and_rewrite(parsetree,
fcache->src,
argOidVect,
nargs);
queryTree_list = lappend(queryTree_list, queryTree_sublist);
flat_query_list = list_concat(flat_query_list,
list_copy(queryTree_sublist));
}
/* /*
* Check that the function returns the type it claims to. Although in * Check that the function returns the type it claims to. Although in
...@@ -343,7 +394,7 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) ...@@ -343,7 +394,7 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
*/ */
fcache->returnsTuple = check_sql_fn_retval(foid, fcache->returnsTuple = check_sql_fn_retval(foid,
rettype, rettype,
queryTree_list, flat_query_list,
NULL, NULL,
&fcache->junkFilter); &fcache->junkFilter);
...@@ -375,24 +426,12 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) ...@@ -375,24 +426,12 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
static void static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache) postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{ {
Snapshot snapshot;
DestReceiver *dest; DestReceiver *dest;
Assert(es->qd == NULL); Assert(es->qd == NULL);
/* /* Caller should have ensured a suitable snapshot is active */
* In a read-only function, use the surrounding query's snapshot; Assert(ActiveSnapshotSet());
* otherwise take a new snapshot for each query. The snapshot should
* include a fresh command ID so that all work to date in this transaction
* is visible.
*/
if (fcache->readonly_func)
snapshot = GetActiveSnapshot();
else
{
CommandCounterIncrement();
snapshot = GetTransactionSnapshot();
}
/* /*
* If this query produces the function result, send its output to the * If this query produces the function result, send its output to the
...@@ -416,18 +455,17 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache) ...@@ -416,18 +455,17 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
if (IsA(es->stmt, PlannedStmt)) if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src, fcache->src,
snapshot, InvalidSnapshot, GetActiveSnapshot(),
InvalidSnapshot,
dest, dest,
fcache->paramLI, 0); fcache->paramLI, 0);
else else
es->qd = CreateUtilityQueryDesc(es->stmt, es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src, fcache->src,
snapshot, GetActiveSnapshot(),
dest, dest,
fcache->paramLI); fcache->paramLI);
/* We assume we don't need to set up ActiveSnapshot for ExecutorStart */
/* Utility commands don't need Executor. */ /* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL) if (es->qd->utilitystmt == NULL)
{ {
...@@ -457,9 +495,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) ...@@ -457,9 +495,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
{ {
bool result; bool result;
/* Make our snapshot the active one for any called functions */
PushActiveSnapshot(es->qd->snapshot);
if (es->qd->utilitystmt) if (es->qd->utilitystmt)
{ {
/* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */ /* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */
...@@ -487,8 +522,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) ...@@ -487,8 +522,6 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
result = (count == 0L || es->qd->estate->es_processed == 0); result = (count == 0L || es->qd->estate->es_processed == 0);
} }
PopActiveSnapshot();
return result; return result;
} }
...@@ -502,13 +535,8 @@ postquel_end(execution_state *es) ...@@ -502,13 +535,8 @@ postquel_end(execution_state *es)
/* Utility commands don't need Executor. */ /* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL) if (es->qd->utilitystmt == NULL)
{ {
/* Make our snapshot the active one for any called functions */
PushActiveSnapshot(es->qd->snapshot);
ExecutorFinish(es->qd); ExecutorFinish(es->qd);
ExecutorEnd(es->qd); ExecutorEnd(es->qd);
PopActiveSnapshot();
} }
(*es->qd->dest->rDestroy) (es->qd->dest); (*es->qd->dest->rDestroy) (es->qd->dest);
...@@ -619,9 +647,13 @@ fmgr_sql(PG_FUNCTION_ARGS) ...@@ -619,9 +647,13 @@ fmgr_sql(PG_FUNCTION_ARGS)
ErrorContextCallback sqlerrcontext; ErrorContextCallback sqlerrcontext;
bool randomAccess; bool randomAccess;
bool lazyEvalOK; bool lazyEvalOK;
bool is_first;
bool pushed_snapshot;
execution_state *es; execution_state *es;
TupleTableSlot *slot; TupleTableSlot *slot;
Datum result; Datum result;
List *eslist;
ListCell *eslc;
/* /*
* Switch to context in which the fcache lives. This ensures that * Switch to context in which the fcache lives. This ensures that
...@@ -673,13 +705,33 @@ fmgr_sql(PG_FUNCTION_ARGS) ...@@ -673,13 +705,33 @@ fmgr_sql(PG_FUNCTION_ARGS)
init_sql_fcache(fcinfo->flinfo, lazyEvalOK); init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
} }
es = fcache->func_state; eslist = fcache->func_state;
/*
* Find first unfinished query in function, and note whether it's the
* first query.
*/
es = NULL;
is_first = true;
foreach(eslc, eslist)
{
es = (execution_state *) lfirst(eslc);
while (es && es->status == F_EXEC_DONE)
{
is_first = false;
es = es->next;
}
if (es)
break;
}
/* /*
* Convert params to appropriate format if starting a fresh execution. (If * Convert params to appropriate format if starting a fresh execution. (If
* continuing execution, we can re-use prior params.) * continuing execution, we can re-use prior params.)
*/ */
if (es && es->status == F_EXEC_START) if (is_first && es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo); postquel_sub_params(fcache, fcinfo);
/* /*
...@@ -689,22 +741,57 @@ fmgr_sql(PG_FUNCTION_ARGS) ...@@ -689,22 +741,57 @@ fmgr_sql(PG_FUNCTION_ARGS)
if (!fcache->tstore) if (!fcache->tstore)
fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem); fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem);
/*
* Find first unfinished query in function.
*/
while (es && es->status == F_EXEC_DONE)
es = es->next;
/* /*
* Execute each command in the function one after another until we either * Execute each command in the function one after another until we either
* run out of commands or get a result row from a lazily-evaluated SELECT. * run out of commands or get a result row from a lazily-evaluated SELECT.
*
* Notes about snapshot management:
*
* In a read-only function, we just use the surrounding query's snapshot.
*
* In a non-read-only function, we rely on the fact that we'll never
* suspend execution between queries of the function: the only reason to
* suspend execution before completion is if we are returning a row from
* a lazily-evaluated SELECT. So, when first entering this loop, we'll
* either start a new query (and push a fresh snapshot) or re-establish
* the active snapshot from the existing query descriptor. If we need to
* start a new query in a subsequent execution of the loop, either we need
* a fresh snapshot (and pushed_snapshot is false) or the existing
* snapshot is on the active stack and we can just bump its command ID.
*/ */
pushed_snapshot = false;
while (es) while (es)
{ {
bool completed; bool completed;
if (es->status == F_EXEC_START) if (es->status == F_EXEC_START)
{
/*
* If not read-only, be sure to advance the command counter for
* each command, so that all work to date in this transaction is
* visible. Take a new snapshot if we don't have one yet,
* otherwise just bump the command ID in the existing snapshot.
*/
if (!fcache->readonly_func)
{
CommandCounterIncrement();
if (!pushed_snapshot)
{
PushActiveSnapshot(GetTransactionSnapshot());
pushed_snapshot = true;
}
else
UpdateActiveSnapshotCommandId();
}
postquel_start(es, fcache); postquel_start(es, fcache);
}
else if (!fcache->readonly_func && !pushed_snapshot)
{
/* Re-establish active snapshot when re-entering function */
PushActiveSnapshot(es->qd->snapshot);
pushed_snapshot = true;
}
completed = postquel_getnext(es, fcache); completed = postquel_getnext(es, fcache);
...@@ -730,7 +817,31 @@ fmgr_sql(PG_FUNCTION_ARGS) ...@@ -730,7 +817,31 @@ fmgr_sql(PG_FUNCTION_ARGS)
*/ */
if (es->status != F_EXEC_DONE) if (es->status != F_EXEC_DONE)
break; break;
/*
* Advance to next execution_state, which might be in the next list.
*/
es = es->next; es = es->next;
while (!es)
{
eslc = lnext(eslc);
if (!eslc)
break; /* end of function */
es = (execution_state *) lfirst(eslc);
/*
* Flush the current snapshot so that we will take a new one
* for the new query list. This ensures that new snaps are
* taken at original-query boundaries, matching the behavior
* of interactive execution.
*/
if (pushed_snapshot)
{
PopActiveSnapshot();
pushed_snapshot = false;
}
}
} }
/* /*
...@@ -857,17 +968,24 @@ fmgr_sql(PG_FUNCTION_ARGS) ...@@ -857,17 +968,24 @@ fmgr_sql(PG_FUNCTION_ARGS)
tuplestore_clear(fcache->tstore); tuplestore_clear(fcache->tstore);
} }
/* Pop snapshot if we have pushed one */
if (pushed_snapshot)
PopActiveSnapshot();
/* /*
* If we've gone through every command in the function, we are done. Reset * If we've gone through every command in the function, we are done. Reset
* the execution states to start over again on next call. * the execution states to start over again on next call.
*/ */
if (es == NULL) if (es == NULL)
{ {
es = fcache->func_state; foreach(eslc, fcache->func_state)
while (es)
{ {
es->status = F_EXEC_START; es = (execution_state *) lfirst(eslc);
es = es->next; while (es)
{
es->status = F_EXEC_START;
es = es->next;
}
} }
} }
...@@ -918,18 +1036,25 @@ sql_exec_error_callback(void *arg) ...@@ -918,18 +1036,25 @@ sql_exec_error_callback(void *arg)
{ {
execution_state *es; execution_state *es;
int query_num; int query_num;
ListCell *lc;
es = fcache->func_state; es = NULL;
query_num = 1; query_num = 1;
while (es) foreach(lc, fcache->func_state)
{ {
if (es->qd) es = (execution_state *) lfirst(lc);
while (es)
{ {
errcontext("SQL function \"%s\" statement %d", if (es->qd)
fcache->fname, query_num); {
break; errcontext("SQL function \"%s\" statement %d",
fcache->fname, query_num);
break;
}
es = es->next;
} }
es = es->next; if (es)
break;
query_num++; query_num++;
} }
if (es == NULL) if (es == NULL)
...@@ -961,16 +1086,31 @@ static void ...@@ -961,16 +1086,31 @@ static void
ShutdownSQLFunction(Datum arg) ShutdownSQLFunction(Datum arg)
{ {
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
execution_state *es = fcache->func_state; execution_state *es;
ListCell *lc;
while (es != NULL) foreach(lc, fcache->func_state)
{ {
/* Shut down anything still running */ es = (execution_state *) lfirst(lc);
if (es->status == F_EXEC_RUN) while (es)
postquel_end(es); {
/* Reset states to START in case we're called again */ /* Shut down anything still running */
es->status = F_EXEC_START; if (es->status == F_EXEC_RUN)
es = es->next; {
/* Re-establish active snapshot for any called functions */
if (!fcache->readonly_func)
PushActiveSnapshot(es->qd->snapshot);
postquel_end(es);
if (!fcache->readonly_func)
PopActiveSnapshot();
}
/* Reset states to START in case we're called again */
es->status = F_EXEC_START;
es = es->next;
}
} }
/* Release tuplestore if we have one */ /* Release tuplestore if we have one */
......
...@@ -1768,7 +1768,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1768,7 +1768,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Oid my_lastoid = InvalidOid; Oid my_lastoid = InvalidOid;
SPITupleTable *my_tuptable = NULL; SPITupleTable *my_tuptable = NULL;
int res = 0; int res = 0;
bool have_active_snap = ActiveSnapshotSet(); bool pushed_active_snap = false;
ErrorContextCallback spierrcontext; ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL; CachedPlan *cplan = NULL;
ListCell *lc1; ListCell *lc1;
...@@ -1781,6 +1781,40 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1781,6 +1781,40 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
spierrcontext.previous = error_context_stack; spierrcontext.previous = error_context_stack;
error_context_stack = &spierrcontext; error_context_stack = &spierrcontext;
/*
* We support four distinct snapshot management behaviors:
*
* snapshot != InvalidSnapshot, read_only = true: use exactly the given
* snapshot.
*
* snapshot != InvalidSnapshot, read_only = false: use the given
* snapshot, modified by advancing its command ID before each querytree.
*
* snapshot == InvalidSnapshot, read_only = true: use the entry-time
* ActiveSnapshot, if any (if there isn't one, we run with no snapshot).
*
* snapshot == InvalidSnapshot, read_only = false: take a full new
* snapshot for each user command, and advance its command ID before each
* querytree within the command.
*
* In the first two cases, we can just push the snap onto the stack
* once for the whole plan list.
*/
if (snapshot != InvalidSnapshot)
{
if (read_only)
{
PushActiveSnapshot(snapshot);
pushed_active_snap = true;
}
else
{
/* Make sure we have a private copy of the snapshot to modify */
PushCopiedSnapshot(snapshot);
pushed_active_snap = true;
}
}
foreach(lc1, plan->plancache_list) foreach(lc1, plan->plancache_list)
{ {
CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1); CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
...@@ -1802,12 +1836,23 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1802,12 +1836,23 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
stmt_list = plansource->plan->stmt_list; stmt_list = plansource->plan->stmt_list;
} }
/*
* In the default non-read-only case, get a new snapshot, replacing
* any that we pushed in a previous cycle.
*/
if (snapshot == InvalidSnapshot && !read_only)
{
if (pushed_active_snap)
PopActiveSnapshot();
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
foreach(lc2, stmt_list) foreach(lc2, stmt_list)
{ {
Node *stmt = (Node *) lfirst(lc2); Node *stmt = (Node *) lfirst(lc2);
bool canSetTag; bool canSetTag;
DestReceiver *dest; DestReceiver *dest;
bool pushed_active_snap = false;
_SPI_current->processed = 0; _SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid; _SPI_current->lastoid = InvalidOid;
...@@ -1848,48 +1893,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1848,48 +1893,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
/* /*
* If not read-only mode, advance the command counter before each * If not read-only mode, advance the command counter before each
* command. * command and update the snapshot.
*/ */
if (!read_only) if (!read_only)
{
CommandCounterIncrement(); CommandCounterIncrement();
UpdateActiveSnapshotCommandId();
}
dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (snapshot == InvalidSnapshot)
{
/*
* Default read_only behavior is to use the entry-time
* ActiveSnapshot, if any; if read-write, grab a full new
* snap.
*/
if (read_only)
{
if (have_active_snap)
{
PushActiveSnapshot(GetActiveSnapshot());
pushed_active_snap = true;
}
}
else
{
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
}
else
{
/*
* We interpret read_only with a specified snapshot to be
* exactly that snapshot, but read-write means use the snap
* with advancing of command ID.
*/
if (read_only)
PushActiveSnapshot(snapshot);
else
PushUpdatedSnapshot(snapshot);
pushed_active_snap = true;
}
if (IsA(stmt, PlannedStmt) && if (IsA(stmt, PlannedStmt) &&
((PlannedStmt *) stmt)->utilityStmt == NULL) ((PlannedStmt *) stmt)->utilityStmt == NULL)
{ {
...@@ -1925,9 +1938,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1925,9 +1938,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
res = SPI_OK_UTILITY; res = SPI_OK_UTILITY;
} }
if (pushed_active_snap)
PopActiveSnapshot();
/* /*
* The last canSetTag query sets the status values returned to the * The last canSetTag query sets the status values returned to the
* caller. Be careful to free any tuptables not returned, to * caller. Be careful to free any tuptables not returned, to
...@@ -1970,6 +1980,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ...@@ -1970,6 +1980,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
fail: fail:
/* Pop the snapshot off the stack if we pushed one */
if (pushed_active_snap)
PopActiveSnapshot();
/* We no longer need the cached plan refcount, if any */ /* We no longer need the cached plan refcount, if any */
if (cplan) if (cplan)
ReleaseCachedPlan(cplan, true); ReleaseCachedPlan(cplan, true);
......
...@@ -513,47 +513,6 @@ client_read_ended(void) ...@@ -513,47 +513,6 @@ client_read_ended(void)
} }
/*
* Parse a query string and pass it through the rewriter.
*
* A list of Query nodes is returned, since the string might contain
* multiple queries and/or the rewriter might expand one query to several.
*
* NOTE: this routine is no longer used for processing interactive queries,
* but it is still needed for parsing of SQL function bodies.
*/
List *
pg_parse_and_rewrite(const char *query_string, /* string to execute */
Oid *paramTypes, /* parameter types */
int numParams) /* number of parameters */
{
List *raw_parsetree_list;
List *querytree_list;
ListCell *list_item;
/*
* (1) parse the request string into a list of raw parse trees.
*/
raw_parsetree_list = pg_parse_query(query_string);
/*
* (2) Do parse analysis and rule rewrite.
*/
querytree_list = NIL;
foreach(list_item, raw_parsetree_list)
{
Node *parsetree = (Node *) lfirst(list_item);
querytree_list = list_concat(querytree_list,
pg_analyze_and_rewrite(parsetree,
query_string,
paramTypes,
numParams));
}
return querytree_list;
}
/* /*
* Do raw parsing (only). * Do raw parsing (only).
* *
......
...@@ -169,11 +169,6 @@ ProcessQuery(PlannedStmt *plan, ...@@ -169,11 +169,6 @@ ProcessQuery(PlannedStmt *plan,
elog(DEBUG3, "ProcessQuery"); elog(DEBUG3, "ProcessQuery");
/*
* Must always set a snapshot for plannable queries.
*/
PushActiveSnapshot(GetTransactionSnapshot());
/* /*
* Create the QueryDesc object * Create the QueryDesc object
*/ */
...@@ -233,8 +228,6 @@ ProcessQuery(PlannedStmt *plan, ...@@ -233,8 +228,6 @@ ProcessQuery(PlannedStmt *plan,
ExecutorEnd(queryDesc); ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc); FreeQueryDesc(queryDesc);
PopActiveSnapshot();
} }
/* /*
...@@ -1164,8 +1157,8 @@ PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel, ...@@ -1164,8 +1157,8 @@ PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
* seems to be to enumerate those that do not need one; this is a short * seems to be to enumerate those that do not need one; this is a short
* list. Transaction control, LOCK, and SET must *not* set a snapshot * list. Transaction control, LOCK, and SET must *not* set a snapshot
* since they need to be executable at the start of a transaction-snapshot * since they need to be executable at the start of a transaction-snapshot
* mode transaction without freezing a snapshot. By extension we allow SHOW * mode transaction without freezing a snapshot. By extension we allow
* not to set a snapshot. The other stmts listed are just efficiency * SHOW not to set a snapshot. The other stmts listed are just efficiency
* hacks. Beware of listing anything that can modify the database --- if, * hacks. Beware of listing anything that can modify the database --- if,
* say, it has to update an index with expressions that invoke * say, it has to update an index with expressions that invoke
* user-defined functions, then it had better have a snapshot. * user-defined functions, then it had better have a snapshot.
...@@ -1219,6 +1212,7 @@ PortalRunMulti(Portal portal, bool isTopLevel, ...@@ -1219,6 +1212,7 @@ PortalRunMulti(Portal portal, bool isTopLevel,
DestReceiver *dest, DestReceiver *altdest, DestReceiver *dest, DestReceiver *altdest,
char *completionTag) char *completionTag)
{ {
bool active_snapshot_set = false;
ListCell *stmtlist_item; ListCell *stmtlist_item;
/* /*
...@@ -1262,6 +1256,20 @@ PortalRunMulti(Portal portal, bool isTopLevel, ...@@ -1262,6 +1256,20 @@ PortalRunMulti(Portal portal, bool isTopLevel,
if (log_executor_stats) if (log_executor_stats)
ResetUsage(); ResetUsage();
/*
* Must always have a snapshot for plannable queries. First time
* through, take a new snapshot; for subsequent queries in the
* same portal, just update the snapshot's copy of the command
* counter.
*/
if (!active_snapshot_set)
{
PushActiveSnapshot(GetTransactionSnapshot());
active_snapshot_set = true;
}
else
UpdateActiveSnapshotCommandId();
if (pstmt->canSetTag) if (pstmt->canSetTag)
{ {
/* statement can set tag string */ /* statement can set tag string */
...@@ -1291,11 +1299,29 @@ PortalRunMulti(Portal portal, bool isTopLevel, ...@@ -1291,11 +1299,29 @@ PortalRunMulti(Portal portal, bool isTopLevel,
* *
* These are assumed canSetTag if they're the only stmt in the * These are assumed canSetTag if they're the only stmt in the
* portal. * portal.
*
* We must not set a snapshot here for utility commands (if one is
* needed, PortalRunUtility will do it). If a utility command is
* alone in a portal then everything's fine. The only case where
* a utility command can be part of a longer list is that rules
* are allowed to include NotifyStmt. NotifyStmt doesn't care
* whether it has a snapshot or not, so we just leave the current
* snapshot alone if we have one.
*/ */
if (list_length(portal->stmts) == 1) if (list_length(portal->stmts) == 1)
PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag); {
Assert(!active_snapshot_set);
/* statement can set tag string */
PortalRunUtility(portal, stmt, isTopLevel,
dest, completionTag);
}
else else
PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL); {
Assert(IsA(stmt, NotifyStmt));
/* stmt added by rewrite cannot set tag */
PortalRunUtility(portal, stmt, isTopLevel,
altdest, NULL);
}
} }
/* /*
...@@ -1313,6 +1339,10 @@ PortalRunMulti(Portal portal, bool isTopLevel, ...@@ -1313,6 +1339,10 @@ PortalRunMulti(Portal portal, bool isTopLevel,
MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
} }
/* Pop the snapshot if we pushed one. */
if (active_snapshot_set)
PopActiveSnapshot();
/* /*
* If a command completion tag was supplied, use it. Otherwise use the * If a command completion tag was supplied, use it. Otherwise use the
* portal's commandTag as the default completion tag. * portal's commandTag as the default completion tag.
......
...@@ -299,22 +299,33 @@ PushActiveSnapshot(Snapshot snap) ...@@ -299,22 +299,33 @@ PushActiveSnapshot(Snapshot snap)
} }
/* /*
* PushUpdatedSnapshot * PushCopiedSnapshot
* As above, except we set the snapshot's CID to the current CID. * As above, except forcibly copy the presented snapshot.
*
* This should be used when the ActiveSnapshot has to be modifiable, for
* example if the caller intends to call UpdateActiveSnapshotCommandId.
* The new snapshot will be released when popped from the stack.
*/ */
void void
PushUpdatedSnapshot(Snapshot snapshot) PushCopiedSnapshot(Snapshot snapshot)
{ {
Snapshot newsnap; PushActiveSnapshot(CopySnapshot(snapshot));
}
/* /*
* We cannot risk modifying a snapshot that's possibly already used * UpdateActiveSnapshotCommandId
* elsewhere, so make a new copy to scribble on. *
*/ * Update the current CID of the active snapshot. This can only be applied
newsnap = CopySnapshot(snapshot); * to a snapshot that is not referenced elsewhere.
newsnap->curcid = GetCurrentCommandId(false); */
void
UpdateActiveSnapshotCommandId(void)
{
Assert(ActiveSnapshot != NULL);
Assert(ActiveSnapshot->as_snap->active_count == 1);
Assert(ActiveSnapshot->as_snap->regd_count == 0);
PushActiveSnapshot(newsnap); ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
} }
/* /*
......
...@@ -45,8 +45,6 @@ typedef enum ...@@ -45,8 +45,6 @@ typedef enum
extern int log_statement; extern int log_statement;
extern List *pg_parse_and_rewrite(const char *query_string,
Oid *paramTypes, int numParams);
extern List *pg_parse_query(const char *query_string); extern List *pg_parse_query(const char *query_string);
extern List *pg_analyze_and_rewrite(Node *parsetree, const char *query_string, extern List *pg_analyze_and_rewrite(Node *parsetree, const char *query_string,
Oid *paramTypes, int numParams); Oid *paramTypes, int numParams);
......
...@@ -28,7 +28,8 @@ extern Snapshot GetLatestSnapshot(void); ...@@ -28,7 +28,8 @@ extern Snapshot GetLatestSnapshot(void);
extern void SnapshotSetCommandId(CommandId curcid); extern void SnapshotSetCommandId(CommandId curcid);
extern void PushActiveSnapshot(Snapshot snapshot); extern void PushActiveSnapshot(Snapshot snapshot);
extern void PushUpdatedSnapshot(Snapshot snapshot); extern void PushCopiedSnapshot(Snapshot snapshot);
extern void UpdateActiveSnapshotCommandId(void);
extern void PopActiveSnapshot(void); extern void PopActiveSnapshot(void);
extern Snapshot GetActiveSnapshot(void); extern Snapshot GetActiveSnapshot(void);
extern bool ActiveSnapshotSet(void); extern bool ActiveSnapshotSet(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment