Commit e5d7be09 authored by Meet Narendra's avatar Meet Narendra 💬

YOLO

parent 07afc8d8
# CS631-Project # CS631-Project: Automatic Index creation in postgres.
***Authors** @meet.doshi @gaurangathavale @jiteshg*
---
#### For checking patch updates
```
Just look at the following files
execMain.c
nodeSeqScan.c
worker_spi/worker_spi.c
Makefile
Look for following tags
/* CS631 start */
/* CS631 end */
```
###File Structure:
```
<22m0742_22m0752>
├── execMain.c ~ User queries without where clause dont reach node sequential scan level. So they have to be stored at execMain level. Currently we are only saving insert and delete queries.
├── nodeSeqScan.c ~ Operates at table level. Here we are keeping track of all select queries with and without where clause.
├── worker_spi/ ~ This module should be registered in contrib folder and requires updates in the make file in same folder.
│ ├── worker_spi.c ~ This is the daemon process which runs every 20 seconds. It alternates with a toggle where first time it fetches info and decides which index to create. On the next round it executes the query and creates the index.
└── readme.md
```
###Tasks:
1. Storing logs of incoming user queries for later use on making a decision whether to create an index or not.
2. Creating a daemon process like a cron job for the database which runs regularly to check whether indexes need to be created or not.
3. Building up logic for the daemon process to decided whether indexes need to be created or deleted.
###Cautions:
1. Installing worker module correctly and updating makefiles.
2. Accessing uninitialized or modifying static memory.
3. Recursive loops inside code.
4. Codebase changes due to changes in postgres versions. So be sure to check if the data structures and libraries are matching.
5. Mutliple where clauses and subqueries.
6. Current postgres version 13.7 `Different from the git version`
\ No newline at end of file
/*-------------------------------------------------------------------------
*
* execMain.c
* top level executor interface routines
*
* INTERFACE ROUTINES
* ExecutorStart()
* ExecutorRun()
* ExecutorFinish()
* ExecutorEnd()
*
* These four procedures are the external interface to the executor.
* In each case, the query descriptor is required as an argument.
*
* ExecutorStart must be called at the beginning of execution of any
* query plan and ExecutorEnd must always be called at the end of
* execution of a plan (unless it is aborted due to error).
*
* ExecutorRun accepts direction and count arguments that specify whether
* the plan is to be executed forwards, backwards, and for how many tuples.
* In some cases ExecutorRun may be called multiple times to process all
* the tuples for a plan. It is also acceptable to stop short of executing
* the whole plan (but only if it is a SELECT).
*
* ExecutorFinish must be called after the final ExecutorRun call and
* before ExecutorEnd. This can be omitted only in case of EXPLAIN,
* which should also omit ExecutorRun.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/executor/execMain.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_publication.h"
#include "commands/matview.h"
#include "commands/trigger.h"
#include "executor/execdebug.h"
#include "executor/nodeSubplan.h"
#include "foreign/fdwapi.h"
#include "jit/jit.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/backend_status.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"
#include "utils/rls.h"
#include "utils/ruleutils.h"
#include "utils/snapmgr.h"
/* CS631 start */
#include "executor/spi.h"
/* CS631 end */
/* Hooks for plugins to get control in ExecutorStart/Run/Finish/End */
ExecutorStart_hook_type ExecutorStart_hook = NULL;
ExecutorRun_hook_type ExecutorRun_hook = NULL;
ExecutorFinish_hook_type ExecutorFinish_hook = NULL;
ExecutorEnd_hook_type ExecutorEnd_hook = NULL;
/* Hook for plugin to get control in ExecCheckRTPerms() */
ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL;
/* decls for local routines only used within this module */
static void InitPlan(QueryDesc *queryDesc, int eflags);
static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecutePlan(EState *estate, PlanState *planstate,
bool use_parallel_mode,
CmdType operation,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
DestReceiver *dest,
bool execute_once);
static bool ExecCheckRTEPerms(RangeTblEntry *rte);
static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
AclMode requiredPerms);
static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
static char *ExecBuildSlotValueDescription(Oid reloid,
TupleTableSlot *slot,
TupleDesc tupdesc,
Bitmapset *modifiedCols,
int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
/* end of local decls */
/* ----------------------------------------------------------------
* ExecutorStart
*
* This routine must be called at the beginning of any execution of any
* query plan
*
* Takes a QueryDesc previously created by CreateQueryDesc (which is separate
* only because some places use QueryDescs for utility commands). The tupDesc
* field of the QueryDesc is filled in to describe the tuples that will be
* returned, and the internal fields (estate and planstate) are set up.
*
* eflags contains flag bits as described in executor.h.
*
* NB: the CurrentMemoryContext when this is called will become the parent
* of the per-query context used for this Executor invocation.
*
* We provide a function hook variable that lets loadable plugins
* get control when ExecutorStart is called. Such a plugin would
* normally call standard_ExecutorStart().
*
* ----------------------------------------------------------------
*/
void
ExecutorStart(QueryDesc *queryDesc, int eflags)
{
/*
* In some cases (e.g. an EXECUTE statement) a query execution will skip
* parse analysis, which means that the query_id won't be reported. Note
* that it's harmless to report the query_id multiple time, as the call
* will be ignored if the top level query_id has already been reported.
*/
pgstat_report_query_id(queryDesc->plannedstmt->queryId, false);
if (ExecutorStart_hook)
(*ExecutorStart_hook) (queryDesc, eflags);
else
standard_ExecutorStart(queryDesc, eflags);
}
void
standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
EState *estate;
MemoryContext oldcontext;
/* sanity checks: queryDesc must not be started already */
Assert(queryDesc != NULL);
Assert(queryDesc->estate == NULL);
/*
* If the transaction is read-only, we need to check if any writes are
* planned to non-temporary tables. EXPLAIN is considered read-only.
*
* Don't allow writes in parallel mode. Supporting UPDATE and DELETE
* would require (a) storing the combo CID hash in shared memory, rather
* than synchronizing it just once at the start of parallelism, and (b) an
* alternative to heap_update()'s reliance on xmax for mutual exclusion.
* INSERT may have no such troubles, but we forbid it to simplify the
* checks.
*
* We have lower-level defenses in CommandCounterIncrement and elsewhere
* against performing unsafe operations in parallel mode, but this gives a
* more user-friendly error message.
*/
if ((XactReadOnly || IsInParallelMode()) &&
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
ExecCheckXactReadOnly(queryDesc->plannedstmt);
/*
* Build EState, switch into per-query memory context for startup.
*/
estate = CreateExecutorState();
queryDesc->estate = estate;
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/*
* Fill in external parameters, if any, from queryDesc; and allocate
* workspace for internal parameters
*/
estate->es_param_list_info = queryDesc->params;
if (queryDesc->plannedstmt->paramExecTypes != NIL)
{
int nParamExec;
nParamExec = list_length(queryDesc->plannedstmt->paramExecTypes);
estate->es_param_exec_vals = (ParamExecData *)
palloc0(nParamExec * sizeof(ParamExecData));
}
/* We now require all callers to provide sourceText */
Assert(queryDesc->sourceText != NULL);
estate->es_sourceText = queryDesc->sourceText;
/*
* Fill in the query environment, if any, from queryDesc.
*/
estate->es_queryEnv = queryDesc->queryEnv;
/*
* If non-read-only query, set the command ID to mark output tuples with
*/
switch (queryDesc->operation)
{
case CMD_SELECT:
/*
* SELECT FOR [KEY] UPDATE/SHARE and modifying CTEs need to mark
* tuples
*/
if (queryDesc->plannedstmt->rowMarks != NIL ||
queryDesc->plannedstmt->hasModifyingCTE)
estate->es_output_cid = GetCurrentCommandId(true);
/*
* A SELECT without modifying CTEs can't possibly queue triggers,
* so force skip-triggers mode. This is just a marginal efficiency
* hack, since AfterTriggerBeginQuery/AfterTriggerEndQuery aren't
* all that expensive, but we might as well do it.
*/
if (!queryDesc->plannedstmt->hasModifyingCTE)
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
break;
case CMD_INSERT:
case CMD_DELETE:
case CMD_UPDATE:
estate->es_output_cid = GetCurrentCommandId(true);
break;
default:
elog(ERROR, "unrecognized operation code: %d",
(int) queryDesc->operation);
break;
}
/*
* Copy other important information into the EState
*/
estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot);
estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot);
estate->es_top_eflags = eflags;
estate->es_instrument = queryDesc->instrument_options;
estate->es_jit_flags = queryDesc->plannedstmt->jitFlags;
/*
* Set up an AFTER-trigger statement context, unless told not to, or
* unless it's EXPLAIN-only mode (when ExecutorFinish won't be called).
*/
if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY)))
AfterTriggerBeginQuery();
/*
* Initialize the plan state tree
*/
InitPlan(queryDesc, eflags);
/* CS631 start */
char *s;
char *f;
ListCell* temp;
Alias* eref_val;
List* col_names;
char* relName;
int ret;
char *type;
char attrs[100] = "";
char *add1 = "','";
char *add2 = "','";
char *add3 = "');";
char new_new_query[200] = "INSERT INTO AIDX_QUERIES VALUES('";
relName = estate->es_relations[0]->rd_rel->relname.data;
elog(LOG,"%s TABLE NAME FROM INSERT\n",relName);
if(CMD_INSERT == queryDesc->plannedstmt->commandType)
type = "insert";
/* This will only run when insert is not done on aidx_queries as it cannot be updated by user
* Hence the if condition will break the recursive calls
* And this flow needs to be run only for inserts, hence the other CMDType check
*/
if(strcmp("aidx_queries",relName) && CMD_INSERT == queryDesc->plannedstmt->commandType){
int i1;
attrs[0] = '*';
/* Creating the insert into aidx_queries query */
int j1 = strlen(new_new_query);
for (i1 = 0; type[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = type[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add1[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add1[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; relName[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = relName[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add2[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add2[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; attrs[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = attrs[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add3[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add3[i1];
}
new_new_query[i1 + j1] = '\0';
elog(LOG, "%s", new_new_query);
if( SPI_connect() == SPI_OK_CONNECT )
{
ret = SPI_execute(new_new_query,false,0);
ret = SPI_finish();
}
}
/* CS631 end */
MemoryContextSwitchTo(oldcontext);
}
/* ----------------------------------------------------------------
* ExecutorRun
*
* This is the main routine of the executor module. It accepts
* the query descriptor from the traffic cop and executes the
* query plan.
*
* ExecutorStart must have been called already.
*
* If direction is NoMovementScanDirection then nothing is done
* except to start up/shut down the destination. Otherwise,
* we retrieve up to 'count' tuples in the specified direction.
*
* Note: count = 0 is interpreted as no portal limit, i.e., run to
* completion. Also note that the count limit is only applied to
* retrieved tuples, not for instance to those inserted/updated/deleted
* by a ModifyTable plan node.
*
* There is no return value, but output tuples (if any) are sent to
* the destination receiver specified in the QueryDesc; and the number
* of tuples processed at the top level can be found in
* estate->es_processed.
*
* We provide a function hook variable that lets loadable plugins
* get control when ExecutorRun is called. Such a plugin would
* normally call standard_ExecutorRun().
*
* ----------------------------------------------------------------
*/
void
ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count,
bool execute_once)
{
if (ExecutorRun_hook)
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
void
standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
EState *estate;
CmdType operation;
DestReceiver *dest;
bool sendTuples;
MemoryContext oldcontext;
/* sanity checks */
Assert(queryDesc != NULL);
estate = queryDesc->estate;
Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
/*
* Switch into per-query memory context
*/
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/* Allow instrumentation of Executor overall runtime */
if (queryDesc->totaltime)
InstrStartNode(queryDesc->totaltime);
/*
* extract information from the query descriptor and the query feature.
*/
operation = queryDesc->operation;
dest = queryDesc->dest;
/*
* startup tuple receiver, if we will be emitting tuples
*/
estate->es_processed = 0;
sendTuples = (operation == CMD_SELECT ||
queryDesc->plannedstmt->hasReturning);
if (sendTuples)
dest->rStartup(dest, operation, queryDesc->tupDesc);
/*
* run plan
*/
if (!ScanDirectionIsNoMovement(direction))
{
if (execute_once && queryDesc->already_executed)
elog(ERROR, "can't re-execute query flagged for single execution");
queryDesc->already_executed = true;
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
operation,
sendTuples,
count,
direction,
dest,
execute_once);
}
/*
* shutdown tuple receiver, if we started it
*/
if (sendTuples)
dest->rShutdown(dest);
if (queryDesc->totaltime)
InstrStopNode(queryDesc->totaltime, estate->es_processed);
MemoryContextSwitchTo(oldcontext);
}
/* ----------------------------------------------------------------
* ExecutorFinish
*
* This routine must be called after the last ExecutorRun call.
* It performs cleanup such as firing AFTER triggers. It is
* separate from ExecutorEnd because EXPLAIN ANALYZE needs to
* include these actions in the total runtime.
*
* We provide a function hook variable that lets loadable plugins
* get control when ExecutorFinish is called. Such a plugin would
* normally call standard_ExecutorFinish().
*
* ----------------------------------------------------------------
*/
void
ExecutorFinish(QueryDesc *queryDesc)
{
if (ExecutorFinish_hook)
(*ExecutorFinish_hook) (queryDesc);
else
standard_ExecutorFinish(queryDesc);
}
void
standard_ExecutorFinish(QueryDesc *queryDesc)
{
EState *estate;
MemoryContext oldcontext;
/* sanity checks */
Assert(queryDesc != NULL);
estate = queryDesc->estate;
Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
/* This should be run once and only once per Executor instance */
Assert(!estate->es_finished);
/* Switch into per-query memory context */
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/* Allow instrumentation of Executor overall runtime */
if (queryDesc->totaltime)
InstrStartNode(queryDesc->totaltime);
/* Run ModifyTable nodes to completion */
ExecPostprocessPlan(estate);
/* Execute queued AFTER triggers, unless told not to */
if (!(estate->es_top_eflags & EXEC_FLAG_SKIP_TRIGGERS))
AfterTriggerEndQuery(estate);
if (queryDesc->totaltime)
InstrStopNode(queryDesc->totaltime, 0);
MemoryContextSwitchTo(oldcontext);
estate->es_finished = true;
}
/* ----------------------------------------------------------------
* ExecutorEnd
*
* This routine must be called at the end of execution of any
* query plan
*
* We provide a function hook variable that lets loadable plugins
* get control when ExecutorEnd is called. Such a plugin would
* normally call standard_ExecutorEnd().
*
* ----------------------------------------------------------------
*/
void
ExecutorEnd(QueryDesc *queryDesc)
{
if (ExecutorEnd_hook)
(*ExecutorEnd_hook) (queryDesc);
else
standard_ExecutorEnd(queryDesc);
}
void
standard_ExecutorEnd(QueryDesc *queryDesc)
{
EState *estate;
MemoryContext oldcontext;
/* sanity checks */
Assert(queryDesc != NULL);
estate = queryDesc->estate;
Assert(estate != NULL);
/*
* Check that ExecutorFinish was called, unless in EXPLAIN-only mode. This
* Assert is needed because ExecutorFinish is new as of 9.1, and callers
* might forget to call it.
*/
Assert(estate->es_finished ||
(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
/*
* Switch into per-query memory context to run ExecEndPlan
*/
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
ExecEndPlan(queryDesc->planstate, estate);
/* do away with our snapshots */
UnregisterSnapshot(estate->es_snapshot);
UnregisterSnapshot(estate->es_crosscheck_snapshot);
/*
* Must switch out of context before destroying it
*/
MemoryContextSwitchTo(oldcontext);
/*
* Release EState and per-query memory context. This should release
* everything the executor has allocated.
*/
FreeExecutorState(estate);
/* Reset queryDesc fields that no longer point to anything */
queryDesc->tupDesc = NULL;
queryDesc->estate = NULL;
queryDesc->planstate = NULL;
queryDesc->totaltime = NULL;
}
/* ----------------------------------------------------------------
* ExecutorRewind
*
* This routine may be called on an open queryDesc to rewind it
* to the start.
* ----------------------------------------------------------------
*/
void
ExecutorRewind(QueryDesc *queryDesc)
{
EState *estate;
MemoryContext oldcontext;
/* sanity checks */
Assert(queryDesc != NULL);
estate = queryDesc->estate;
Assert(estate != NULL);
/* It's probably not sensible to rescan updating queries */
Assert(queryDesc->operation == CMD_SELECT);
/*
* Switch into per-query memory context
*/
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/*
* rescan plan
*/
ExecReScan(queryDesc->planstate);
MemoryContextSwitchTo(oldcontext);
}
/*
* ExecCheckRTPerms
* Check access permissions for all relations listed in a range table.
*
* Returns true if permissions are adequate. Otherwise, throws an appropriate
* error if ereport_on_violation is true, or simply returns false otherwise.
*
* Note that this does NOT address row-level security policies (aka: RLS). If
* rows will be returned to the user as a result of this permission check
* passing, then RLS also needs to be consulted (and check_enable_rls()).
*
* See rewrite/rowsecurity.c.
*/
bool
ExecCheckRTPerms(List *rangeTable, bool ereport_on_violation)
{
ListCell *l;
bool result = true;
foreach(l, rangeTable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
result = ExecCheckRTEPerms(rte);
if (!result)
{
Assert(rte->rtekind == RTE_RELATION);
if (ereport_on_violation)
aclcheck_error(ACLCHECK_NO_PRIV, get_relkind_objtype(get_rel_relkind(rte->relid)),
get_rel_name(rte->relid));
return false;
}
}
if (ExecutorCheckPerms_hook)
result = (*ExecutorCheckPerms_hook) (rangeTable,
ereport_on_violation);
return result;
}
/*
* ExecCheckRTEPerms
* Check access permissions for a single RTE.
*/
static bool
ExecCheckRTEPerms(RangeTblEntry *rte)
{
AclMode requiredPerms;
AclMode relPerms;
AclMode remainingPerms;
Oid relOid;
Oid userid;
/*
* Only plain-relation RTEs need to be checked here. Function RTEs are
* checked when the function is prepared for execution. Join, subquery,
* and special RTEs need no checks.
*/
if (rte->rtekind != RTE_RELATION)
return true;
/*
* No work if requiredPerms is empty.
*/
requiredPerms = rte->requiredPerms;
if (requiredPerms == 0)
return true;
relOid = rte->relid;
/*
* userid to check as: current user unless we have a setuid indication.
*
* Note: GetUserId() is presently fast enough that there's no harm in
* calling it separately for each RTE. If that stops being true, we could
* call it once in ExecCheckRTPerms and pass the userid down from there.
* But for now, no need for the extra clutter.
*/
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/*
* We must have *all* the requiredPerms bits, but some of the bits can be
* satisfied from column-level rather than relation-level permissions.
* First, remove any bits that are satisfied by relation permissions.
*/
relPerms = pg_class_aclmask(relOid, userid, requiredPerms, ACLMASK_ALL);
remainingPerms = requiredPerms & ~relPerms;
if (remainingPerms != 0)
{
int col = -1;
/*
* If we lack any permissions that exist only as relation permissions,
* we can fail straight away.
*/
if (remainingPerms & ~(ACL_SELECT | ACL_INSERT | ACL_UPDATE))
return false;
/*
* Check to see if we have the needed privileges at column level.
*
* Note: failures just report a table-level error; it would be nicer
* to report a column-level error if we have some but not all of the
* column privileges.
*/
if (remainingPerms & ACL_SELECT)
{
/*
* When the query doesn't explicitly reference any columns (for
* example, SELECT COUNT(*) FROM table), allow the query if we
* have SELECT on any column of the rel, as per SQL spec.
*/
if (bms_is_empty(rte->selectedCols))
{
if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
ACLMASK_ANY) != ACLCHECK_OK)
return false;
}
while ((col = bms_next_member(rte->selectedCols, col)) >= 0)
{
/* bit #s are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
if (attno == InvalidAttrNumber)
{
/* Whole-row reference, must have priv on all cols */
if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
ACLMASK_ALL) != ACLCHECK_OK)
return false;
}
else
{
if (pg_attribute_aclcheck(relOid, attno, userid,
ACL_SELECT) != ACLCHECK_OK)
return false;
}
}
}
/*
* Basically the same for the mod columns, for both INSERT and UPDATE
* privilege as specified by remainingPerms.
*/
if (remainingPerms & ACL_INSERT && !ExecCheckRTEPermsModified(relOid,
userid,
rte->insertedCols,
ACL_INSERT))
return false;
if (remainingPerms & ACL_UPDATE && !ExecCheckRTEPermsModified(relOid,
userid,
rte->updatedCols,
ACL_UPDATE))
return false;
}
return true;
}
/*
* ExecCheckRTEPermsModified
* Check INSERT or UPDATE access permissions for a single RTE (these
* are processed uniformly).
*/
static bool
ExecCheckRTEPermsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols,
AclMode requiredPerms)
{
int col = -1;
/*
* When the query doesn't explicitly update any columns, allow the query
* if we have permission on any column of the rel. This is to handle
* SELECT FOR UPDATE as well as possible corner cases in UPDATE.
*/
if (bms_is_empty(modifiedCols))
{
if (pg_attribute_aclcheck_all(relOid, userid, requiredPerms,
ACLMASK_ANY) != ACLCHECK_OK)
return false;
}
while ((col = bms_next_member(modifiedCols, col)) >= 0)
{
/* bit #s are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
if (attno == InvalidAttrNumber)
{
/* whole-row reference can't happen here */
elog(ERROR, "whole-row update is not implemented");
}
else
{
if (pg_attribute_aclcheck(relOid, attno, userid,
requiredPerms) != ACLCHECK_OK)
return false;
}
}
return true;
}
/*
* Check that the query does not imply any writes to non-temp tables;
* unless we're in parallel mode, in which case don't even allow writes
* to temp tables.
*
* Note: in a Hot Standby this would need to reject writes to temp
* tables just as we do in parallel mode; but an HS standby can't have created
* any temp tables in the first place, so no need to check that.
*/
static void
ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
{
ListCell *l;
/*
* Fail if write permissions are requested in parallel mode for table
* (temp or non-temp), otherwise fail for any non-temp table.
*/
foreach(l, plannedstmt->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
if (rte->rtekind != RTE_RELATION)
continue;
if ((rte->requiredPerms & (~ACL_SELECT)) == 0)
continue;
if (isTempNamespace(get_rel_namespace(rte->relid)))
continue;
PreventCommandIfReadOnly(CreateCommandName((Node *) plannedstmt));
}
if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE)
PreventCommandIfParallelMode(CreateCommandName((Node *) plannedstmt));
}
/* ----------------------------------------------------------------
* InitPlan
*
* Initializes the query plan: open files, allocate storage
* and start up the rule manager
* ----------------------------------------------------------------
*/
static void
InitPlan(QueryDesc *queryDesc, int eflags)
{
CmdType operation = queryDesc->operation;
PlannedStmt *plannedstmt = queryDesc->plannedstmt;
Plan *plan = plannedstmt->planTree;
List *rangeTable = plannedstmt->rtable;
EState *estate = queryDesc->estate;
PlanState *planstate;
TupleDesc tupType;
ListCell *l;
int i;
/*
* Do permissions checks
*/
ExecCheckRTPerms(rangeTable, true);
/*
* initialize the node's execution state
*/
ExecInitRangeTable(estate, rangeTable);
estate->es_plannedstmt = plannedstmt;
/*
* Next, build the ExecRowMark array from the PlanRowMark(s), if any.
*/
if (plannedstmt->rowMarks)
{
estate->es_rowmarks = (ExecRowMark **)
palloc0(estate->es_range_table_size * sizeof(ExecRowMark *));
foreach(l, plannedstmt->rowMarks)
{
PlanRowMark *rc = (PlanRowMark *) lfirst(l);
Oid relid;
Relation relation;
ExecRowMark *erm;
/* ignore "parent" rowmarks; they are irrelevant at runtime */
if (rc->isParent)
continue;
/* get relation's OID (will produce InvalidOid if subquery) */
relid = exec_rt_fetch(rc->rti, estate)->relid;
/* open relation, if we need to access it for this mark type */
switch (rc->markType)
{
case ROW_MARK_EXCLUSIVE:
case ROW_MARK_NOKEYEXCLUSIVE:
case ROW_MARK_SHARE:
case ROW_MARK_KEYSHARE:
case ROW_MARK_REFERENCE:
relation = ExecGetRangeTableRelation(estate, rc->rti);
break;
case ROW_MARK_COPY:
/* no physical table access is required */
relation = NULL;
break;
default:
elog(ERROR, "unrecognized markType: %d", rc->markType);
relation = NULL; /* keep compiler quiet */
break;
}
/* Check that relation is a legal target for marking */
if (relation)
CheckValidRowMarkRel(relation, rc->markType);
erm = (ExecRowMark *) palloc(sizeof(ExecRowMark));
erm->relation = relation;
erm->relid = relid;
erm->rti = rc->rti;
erm->prti = rc->prti;
erm->rowmarkId = rc->rowmarkId;
erm->markType = rc->markType;
erm->strength = rc->strength;
erm->waitPolicy = rc->waitPolicy;
erm->ermActive = false;
ItemPointerSetInvalid(&(erm->curCtid));
erm->ermExtra = NULL;
Assert(erm->rti > 0 && erm->rti <= estate->es_range_table_size &&
estate->es_rowmarks[erm->rti - 1] == NULL);
estate->es_rowmarks[erm->rti - 1] = erm;
}
}
/*
* Initialize the executor's tuple table to empty.
*/
estate->es_tupleTable = NIL;
/* signal that this EState is not used for EPQ */
estate->es_epq_active = NULL;
/*
* Initialize private state information for each SubPlan. We must do this
* before running ExecInitNode on the main query tree, since
* ExecInitSubPlan expects to be able to find these entries.
*/
Assert(estate->es_subplanstates == NIL);
i = 1; /* subplan indices count from 1 */
foreach(l, plannedstmt->subplans)
{
Plan *subplan = (Plan *) lfirst(l);
PlanState *subplanstate;
int sp_eflags;
/*
* A subplan will never need to do BACKWARD scan nor MARK/RESTORE. If
* it is a parameterless subplan (not initplan), we suggest that it be
* prepared to handle REWIND efficiently; otherwise there is no need.
*/
sp_eflags = eflags
& (EXEC_FLAG_EXPLAIN_ONLY | EXEC_FLAG_WITH_NO_DATA);
if (bms_is_member(i, plannedstmt->rewindPlanIDs))
sp_eflags |= EXEC_FLAG_REWIND;
subplanstate = ExecInitNode(subplan, estate, sp_eflags);
estate->es_subplanstates = lappend(estate->es_subplanstates,
subplanstate);
i++;
}
/*
* Initialize the private state information for all the nodes in the query
* tree. This opens files, allocates storage and leaves us ready to start
* processing tuples.
*/
planstate = ExecInitNode(plan, estate, eflags);
/*
* Get the tuple descriptor describing the type of tuples to return.
*/
tupType = ExecGetResultType(planstate);
/*
* Initialize the junk filter if needed. SELECT queries need a filter if
* there are any junk attrs in the top-level tlist.
*/
if (operation == CMD_SELECT)
{
bool junk_filter_needed = false;
ListCell *tlist;
foreach(tlist, plan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
if (tle->resjunk)
{
junk_filter_needed = true;
break;
}
}
if (junk_filter_needed)
{
JunkFilter *j;
TupleTableSlot *slot;
slot = ExecInitExtraTupleSlot(estate, NULL, &TTSOpsVirtual);
j = ExecInitJunkFilter(planstate->plan->targetlist,
slot);
estate->es_junkFilter = j;
/* Want to return the cleaned tuple type */
tupType = j->jf_cleanTupType;
}
}
queryDesc->tupDesc = tupType;
queryDesc->planstate = planstate;
}
/*
* Check that a proposed result relation is a legal target for the operation
*
* Generally the parser and/or planner should have noticed any such mistake
* already, but let's make sure.
*
* Note: when changing this function, you probably also need to look at
* CheckValidRowMarkRel.
*/
void
CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
{
Relation resultRel = resultRelInfo->ri_RelationDesc;
TriggerDesc *trigDesc = resultRel->trigdesc;
FdwRoutine *fdwroutine;
switch (resultRel->rd_rel->relkind)
{
case RELKIND_RELATION:
case RELKIND_PARTITIONED_TABLE:
CheckCmdReplicaIdentity(resultRel, operation);
break;
case RELKIND_SEQUENCE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change sequence \"%s\"",
RelationGetRelationName(resultRel))));
break;
case RELKIND_TOASTVALUE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change TOAST relation \"%s\"",
RelationGetRelationName(resultRel))));
break;
case RELKIND_VIEW:
/*
* Okay only if there's a suitable INSTEAD OF trigger. Messages
* here should match rewriteHandler.c's rewriteTargetView and
* RewriteQuery, except that we omit errdetail because we haven't
* got the information handy (and given that we really shouldn't
* get here anyway, it's not worth great exertion to get).
*/
switch (operation)
{
case CMD_INSERT:
if (!trigDesc || !trigDesc->trig_insert_instead_row)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot insert into view \"%s\"",
RelationGetRelationName(resultRel)),
errhint("To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule.")));
break;
case CMD_UPDATE:
if (!trigDesc || !trigDesc->trig_update_instead_row)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot update view \"%s\"",
RelationGetRelationName(resultRel)),
errhint("To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule.")));
break;
case CMD_DELETE:
if (!trigDesc || !trigDesc->trig_delete_instead_row)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot delete from view \"%s\"",
RelationGetRelationName(resultRel)),
errhint("To enable deleting from the view, provide an INSTEAD OF DELETE trigger or an unconditional ON DELETE DO INSTEAD rule.")));
break;
default:
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
break;
case RELKIND_MATVIEW:
if (!MatViewIncrementalMaintenanceIsEnabled())
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change materialized view \"%s\"",
RelationGetRelationName(resultRel))));
break;
case RELKIND_FOREIGN_TABLE:
/* Okay only if the FDW supports it */
fdwroutine = resultRelInfo->ri_FdwRoutine;
switch (operation)
{
case CMD_INSERT:
if (fdwroutine->ExecForeignInsert == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot insert into foreign table \"%s\"",
RelationGetRelationName(resultRel))));
if (fdwroutine->IsForeignRelUpdatable != NULL &&
(fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_INSERT)) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("foreign table \"%s\" does not allow inserts",
RelationGetRelationName(resultRel))));
break;
case CMD_UPDATE:
if (fdwroutine->ExecForeignUpdate == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot update foreign table \"%s\"",
RelationGetRelationName(resultRel))));
if (fdwroutine->IsForeignRelUpdatable != NULL &&
(fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_UPDATE)) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("foreign table \"%s\" does not allow updates",
RelationGetRelationName(resultRel))));
break;
case CMD_DELETE:
if (fdwroutine->ExecForeignDelete == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot delete from foreign table \"%s\"",
RelationGetRelationName(resultRel))));
if (fdwroutine->IsForeignRelUpdatable != NULL &&
(fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_DELETE)) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("foreign table \"%s\" does not allow deletes",
RelationGetRelationName(resultRel))));
break;
default:
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
break;
default:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change relation \"%s\"",
RelationGetRelationName(resultRel))));
break;
}
}
/*
* Check that a proposed rowmark target relation is a legal target
*
* In most cases parser and/or planner should have noticed this already, but
* they don't cover all cases.
*/
static void
CheckValidRowMarkRel(Relation rel, RowMarkType markType)
{
FdwRoutine *fdwroutine;
switch (rel->rd_rel->relkind)
{
case RELKIND_RELATION:
case RELKIND_PARTITIONED_TABLE:
/* OK */
break;
case RELKIND_SEQUENCE:
/* Must disallow this because we don't vacuum sequences */
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot lock rows in sequence \"%s\"",
RelationGetRelationName(rel))));
break;
case RELKIND_TOASTVALUE:
/* We could allow this, but there seems no good reason to */
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot lock rows in TOAST relation \"%s\"",
RelationGetRelationName(rel))));
break;
case RELKIND_VIEW:
/* Should not get here; planner should have expanded the view */
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot lock rows in view \"%s\"",
RelationGetRelationName(rel))));
break;
case RELKIND_MATVIEW:
/* Allow referencing a matview, but not actual locking clauses */
if (markType != ROW_MARK_REFERENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot lock rows in materialized view \"%s\"",
RelationGetRelationName(rel))));
break;
case RELKIND_FOREIGN_TABLE:
/* Okay only if the FDW supports it */
fdwroutine = GetFdwRoutineForRelation(rel, false);
if (fdwroutine->RefetchForeignRow == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot lock rows in foreign table \"%s\"",
RelationGetRelationName(rel))));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot lock rows in relation \"%s\"",
RelationGetRelationName(rel))));
break;
}
}
/*
* Initialize ResultRelInfo data for one result relation
*
* Caution: before Postgres 9.1, this function included the relkind checking
* that's now in CheckValidResultRel, and it also did ExecOpenIndices if
* appropriate. Be sure callers cover those needs.
*/
void
InitResultRelInfo(ResultRelInfo *resultRelInfo,
Relation resultRelationDesc,
Index resultRelationIndex,
ResultRelInfo *partition_root_rri,
int instrument_options)
{
MemSet(resultRelInfo, 0, sizeof(ResultRelInfo));
resultRelInfo->type = T_ResultRelInfo;
resultRelInfo->ri_RangeTableIndex = resultRelationIndex;
resultRelInfo->ri_RelationDesc = resultRelationDesc;
resultRelInfo->ri_NumIndices = 0;
resultRelInfo->ri_IndexRelationDescs = NULL;
resultRelInfo->ri_IndexRelationInfo = NULL;
/* make a copy so as not to depend on relcache info not changing... */
resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
if (resultRelInfo->ri_TrigDesc)
{
int n = resultRelInfo->ri_TrigDesc->numtriggers;
resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
palloc0(n * sizeof(FmgrInfo));
resultRelInfo->ri_TrigWhenExprs = (ExprState **)
palloc0(n * sizeof(ExprState *));
if (instrument_options)
resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
}
else
{
resultRelInfo->ri_TrigFunctions = NULL;
resultRelInfo->ri_TrigWhenExprs = NULL;
resultRelInfo->ri_TrigInstrument = NULL;
}
if (resultRelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
resultRelInfo->ri_FdwRoutine = GetFdwRoutineForRelation(resultRelationDesc, true);
else
resultRelInfo->ri_FdwRoutine = NULL;
/* The following fields are set later if needed */
resultRelInfo->ri_RowIdAttNo = 0;
resultRelInfo->ri_projectNew = NULL;
resultRelInfo->ri_newTupleSlot = NULL;
resultRelInfo->ri_oldTupleSlot = NULL;
resultRelInfo->ri_projectNewInfoValid = false;
resultRelInfo->ri_FdwState = NULL;
resultRelInfo->ri_usesFdwDirectModify = false;
resultRelInfo->ri_ConstraintExprs = NULL;
resultRelInfo->ri_GeneratedExprs = NULL;
resultRelInfo->ri_projectReturning = NULL;
resultRelInfo->ri_onConflictArbiterIndexes = NIL;
resultRelInfo->ri_onConflict = NULL;
resultRelInfo->ri_ReturningSlot = NULL;
resultRelInfo->ri_TrigOldSlot = NULL;
resultRelInfo->ri_TrigNewSlot = NULL;
/*
* Only ExecInitPartitionInfo() and ExecInitPartitionDispatchInfo() pass
* non-NULL partition_root_rri. For child relations that are part of the
* initial query rather than being dynamically added by tuple routing,
* this field is filled in ExecInitModifyTable().
*/
resultRelInfo->ri_RootResultRelInfo = partition_root_rri;
resultRelInfo->ri_RootToPartitionMap = NULL; /* set by
* ExecInitRoutingInfo */
resultRelInfo->ri_PartitionTupleSlot = NULL; /* ditto */
resultRelInfo->ri_ChildToRootMap = NULL;
resultRelInfo->ri_ChildToRootMapValid = false;
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
}
/*
* ExecGetTriggerResultRel
* Get a ResultRelInfo for a trigger target relation.
*
* Most of the time, triggers are fired on one of the result relations of the
* query, and so we can just return a member of the es_result_relations array,
* or the es_tuple_routing_result_relations list (if any). (Note: in self-join
* situations there might be multiple members with the same OID; if so it
* doesn't matter which one we pick.)
*
* However, it is sometimes necessary to fire triggers on other relations;
* this happens mainly when an RI update trigger queues additional triggers
* on other relations, which will be processed in the context of the outer
* query. For efficiency's sake, we want to have a ResultRelInfo for those
* triggers too; that can avoid repeated re-opening of the relation. (It
* also provides a way for EXPLAIN ANALYZE to report the runtimes of such
* triggers.) So we make additional ResultRelInfo's as needed, and save them
* in es_trig_target_relations.
*/
ResultRelInfo *
ExecGetTriggerResultRel(EState *estate, Oid relid)
{
ResultRelInfo *rInfo;
ListCell *l;
Relation rel;
MemoryContext oldcontext;
/* Search through the query result relations */
foreach(l, estate->es_opened_result_relations)
{
rInfo = lfirst(l);
if (RelationGetRelid(rInfo->ri_RelationDesc) == relid)
return rInfo;
}
/*
* Search through the result relations that were created during tuple
* routing, if any.
*/
foreach(l, estate->es_tuple_routing_result_relations)
{
rInfo = (ResultRelInfo *) lfirst(l);
if (RelationGetRelid(rInfo->ri_RelationDesc) == relid)
return rInfo;
}
/* Nope, but maybe we already made an extra ResultRelInfo for it */
foreach(l, estate->es_trig_target_relations)
{
rInfo = (ResultRelInfo *) lfirst(l);
if (RelationGetRelid(rInfo->ri_RelationDesc) == relid)
return rInfo;
}
/* Nope, so we need a new one */
/*
* Open the target relation's relcache entry. We assume that an
* appropriate lock is still held by the backend from whenever the trigger
* event got queued, so we need take no new lock here. Also, we need not
* recheck the relkind, so no need for CheckValidResultRel.
*/
rel = table_open(relid, NoLock);
/*
* Make the new entry in the right context.
*/
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
rInfo = makeNode(ResultRelInfo);
InitResultRelInfo(rInfo,
rel,
0, /* dummy rangetable index */
NULL,
estate->es_instrument);
estate->es_trig_target_relations =
lappend(estate->es_trig_target_relations, rInfo);
MemoryContextSwitchTo(oldcontext);
/*
* Currently, we don't need any index information in ResultRelInfos used
* only for triggers, so no need to call ExecOpenIndices.
*/
return rInfo;
}
/* ----------------------------------------------------------------
* ExecPostprocessPlan
*
* Give plan nodes a final chance to execute before shutdown
* ----------------------------------------------------------------
*/
static void
ExecPostprocessPlan(EState *estate)
{
ListCell *lc;
/*
* Make sure nodes run forward.
*/
estate->es_direction = ForwardScanDirection;
/*
* Run any secondary ModifyTable nodes to completion, in case the main
* query did not fetch all rows from them. (We do this to ensure that
* such nodes have predictable results.)
*/
foreach(lc, estate->es_auxmodifytables)
{
PlanState *ps = (PlanState *) lfirst(lc);
for (;;)
{
TupleTableSlot *slot;
/* Reset the per-output-tuple exprcontext each time */
ResetPerTupleExprContext(estate);
slot = ExecProcNode(ps);
if (TupIsNull(slot))
break;
}
}
}
/* ----------------------------------------------------------------
* ExecEndPlan
*
* Cleans up the query plan -- closes files and frees up storage
*
* NOTE: we are no longer very worried about freeing storage per se
* in this code; FreeExecutorState should be guaranteed to release all
* memory that needs to be released. What we are worried about doing
* is closing relations and dropping buffer pins. Thus, for example,
* tuple tables must be cleared or dropped to ensure pins are released.
* ----------------------------------------------------------------
*/
static void
ExecEndPlan(PlanState *planstate, EState *estate)
{
ListCell *l;
/*
* shut down the node-type-specific query processing
*/
ExecEndNode(planstate);
/*
* for subplans too
*/
foreach(l, estate->es_subplanstates)
{
PlanState *subplanstate = (PlanState *) lfirst(l);
ExecEndNode(subplanstate);
}
/*
* destroy the executor's tuple table. Actually we only care about
* releasing buffer pins and tupdesc refcounts; there's no need to pfree
* the TupleTableSlots, since the containing memory context is about to go
* away anyway.
*/
ExecResetTupleTable(estate->es_tupleTable, false);
/*
* Close any Relations that have been opened for range table entries or
* result relations.
*/
ExecCloseResultRelations(estate);
ExecCloseRangeTableRelations(estate);
}
/*
* Close any relations that have been opened for ResultRelInfos.
*/
void
ExecCloseResultRelations(EState *estate)
{
ListCell *l;
/*
* close indexes of result relation(s) if any. (Rels themselves are
* closed in ExecCloseRangeTableRelations())
*/
foreach(l, estate->es_opened_result_relations)
{
ResultRelInfo *resultRelInfo = lfirst(l);
ExecCloseIndices(resultRelInfo);
}
/* Close any relations that have been opened by ExecGetTriggerResultRel(). */
foreach(l, estate->es_trig_target_relations)
{
ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(l);
/*
* Assert this is a "dummy" ResultRelInfo, see above. Otherwise we
* might be issuing a duplicate close against a Relation opened by
* ExecGetRangeTableRelation.
*/
Assert(resultRelInfo->ri_RangeTableIndex == 0);
/*
* Since ExecGetTriggerResultRel doesn't call ExecOpenIndices for
* these rels, we needn't call ExecCloseIndices either.
*/
Assert(resultRelInfo->ri_NumIndices == 0);
table_close(resultRelInfo->ri_RelationDesc, NoLock);
}
}
/*
* Close all relations opened by ExecGetRangeTableRelation().
*
* We do not release any locks we might hold on those rels.
*/
void
ExecCloseRangeTableRelations(EState *estate)
{
int i;
for (i = 0; i < estate->es_range_table_size; i++)
{
if (estate->es_relations[i])
table_close(estate->es_relations[i], NoLock);
}
}
/* ----------------------------------------------------------------
* ExecutePlan
*
* Processes the query plan until we have retrieved 'numberTuples' tuples,
* moving in the specified direction.
*
* Runs to completion if numberTuples is 0
*
* Note: the ctid attribute is a 'junk' attribute that is removed before the
* user can see it
* ----------------------------------------------------------------
*/
static void
ExecutePlan(EState *estate,
PlanState *planstate,
bool use_parallel_mode,
CmdType operation,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
DestReceiver *dest,
bool execute_once)
{
TupleTableSlot *slot;
uint64 current_tuple_count;
/*
* initialize local variables
*/
current_tuple_count = 0;
/*
* Set the direction.
*/
estate->es_direction = direction;
/*
* If the plan might potentially be executed multiple times, we must force
* it to run without parallelism, because we might exit early.
*/
if (!execute_once)
use_parallel_mode = false;
estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode)
EnterParallelMode();
/*
* Loop until we've processed the proper number of tuples from the plan.
*/
for (;;)
{
/* Reset the per-output-tuple exprcontext */
ResetPerTupleExprContext(estate);
/*
* Execute the plan and obtain a tuple
*/
slot = ExecProcNode(planstate);
/*
* if the tuple is null, then we assume there is nothing more to
* process so we just end the loop...
*/
if (TupIsNull(slot))
break;
/*
* If we have a junk filter, then project a new tuple with the junk
* removed.
*
* Store this new "clean" tuple in the junkfilter's resultSlot.
* (Formerly, we stored it back over the "dirty" tuple, which is WRONG
* because that tuple slot has the wrong descriptor.)
*/
if (estate->es_junkFilter != NULL)
slot = ExecFilterJunk(estate->es_junkFilter, slot);
/*
* If we are supposed to send the tuple somewhere, do so. (In
* practice, this is probably always the case at this point.)
*/
if (sendTuples)
{
/*
* If we are not able to send the tuple, we assume the destination
* has closed and no more tuples can be sent. If that's the case,
* end the loop.
*/
if (!dest->receiveSlot(slot, dest))
break;
}
/*
* Count tuples processed, if this is a SELECT. (For other operation
* types, the ModifyTable plan node must count the appropriate
* events.)
*/
if (operation == CMD_SELECT)
(estate->es_processed)++;
/*
* check our tuple count.. if we've processed the proper number then
* quit, else loop again and process more tuples. Zero numberTuples
* means no limit.
*/
current_tuple_count++;
if (numberTuples && numberTuples == current_tuple_count)
break;
}
/*
* If we know we won't need to back up, we can release resources at this
* point.
*/
if (!(estate->es_top_eflags & EXEC_FLAG_BACKWARD))
(void) ExecShutdownNode(planstate);
if (use_parallel_mode)
ExitParallelMode();
}
/*
* ExecRelCheck --- check that tuple meets constraints for result relation
*
* Returns NULL if OK, else name of failed check constraint
*/
static const char *
ExecRelCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
int ncheck = rel->rd_att->constr->num_check;
ConstrCheck *check = rel->rd_att->constr->check;
ExprContext *econtext;
MemoryContext oldContext;
int i;
/*
* CheckConstraintFetch let this pass with only a warning, but now we
* should fail rather than possibly failing to enforce an important
* constraint.
*/
if (ncheck != rel->rd_rel->relchecks)
elog(ERROR, "%d pg_constraint record(s) missing for relation \"%s\"",
rel->rd_rel->relchecks - ncheck, RelationGetRelationName(rel));
/*
* If first time through for this result relation, build expression
* nodetrees for rel's constraint expressions. Keep them in the per-query
* memory context so they'll survive throughout the query.
*/
if (resultRelInfo->ri_ConstraintExprs == NULL)
{
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
resultRelInfo->ri_ConstraintExprs =
(ExprState **) palloc(ncheck * sizeof(ExprState *));
for (i = 0; i < ncheck; i++)
{
Expr *checkconstr;
checkconstr = stringToNode(check[i].ccbin);
resultRelInfo->ri_ConstraintExprs[i] =
ExecPrepareExpr(checkconstr, estate);
}
MemoryContextSwitchTo(oldContext);
}
/*
* We will use the EState's per-tuple context for evaluating constraint
* expressions (creating it if it's not already there).
*/
econtext = GetPerTupleExprContext(estate);
/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = slot;
/* And evaluate the constraints */
for (i = 0; i < ncheck; i++)
{
ExprState *checkconstr = resultRelInfo->ri_ConstraintExprs[i];
/*
* NOTE: SQL specifies that a NULL result from a constraint expression
* is not to be treated as a failure. Therefore, use ExecCheck not
* ExecQual.
*/
if (!ExecCheck(checkconstr, econtext))
return check[i].ccname;
}
/* NULL result means no error */
return NULL;
}
/*
* ExecPartitionCheck --- check that tuple meets the partition constraint.
*
* Returns true if it meets the partition constraint. If the constraint
* fails and we're asked to emit an error, do so and don't return; otherwise
* return false.
*/
bool
ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate, bool emitError)
{
ExprContext *econtext;
bool success;
/*
* If first time through, build expression state tree for the partition
* check expression. (In the corner case where the partition check
* expression is empty, ie there's a default partition and nothing else,
* we'll be fooled into executing this code each time through. But it's
* pretty darn cheap in that case, so we don't worry about it.)
*/
if (resultRelInfo->ri_PartitionCheckExpr == NULL)
{
/*
* Ensure that the qual tree and prepared expression are in the
* query-lifespan context.
*/
MemoryContext oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
List *qual = RelationGetPartitionQual(resultRelInfo->ri_RelationDesc);
resultRelInfo->ri_PartitionCheckExpr = ExecPrepareCheck(qual, estate);
MemoryContextSwitchTo(oldcxt);
}
/*
* We will use the EState's per-tuple context for evaluating constraint
* expressions (creating it if it's not already there).
*/
econtext = GetPerTupleExprContext(estate);
/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = slot;
/*
* As in case of the catalogued constraints, we treat a NULL result as
* success here, not a failure.
*/
success = ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext);
/* if asked to emit error, don't actually return on failure */
if (!success && emitError)
ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
return success;
}
/*
* ExecPartitionCheckEmitError - Form and emit an error message after a failed
* partition constraint check.
*/
void
ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
EState *estate)
{
Oid root_relid;
TupleDesc tupdesc;
char *val_desc;
Bitmapset *modifiedCols;
/*
* If the tuple has been routed, it's been converted to the partition's
* rowtype, which might differ from the root table's. We must convert it
* back to the root table's rowtype so that val_desc in the error message
* matches the input tuple.
*/
if (resultRelInfo->ri_RootResultRelInfo)
{
ResultRelInfo *rootrel = resultRelInfo->ri_RootResultRelInfo;
TupleDesc old_tupdesc;
AttrMap *map;
root_relid = RelationGetRelid(rootrel->ri_RelationDesc);
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
old_tupdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc, tupdesc);
/*
* Partition-specific slot's tupdesc can't be changed, so allocate a
* new one.
*/
if (map != NULL)
slot = execute_attr_map_slot(map, slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
modifiedCols = bms_union(ExecGetInsertedCols(rootrel, estate),
ExecGetUpdatedCols(rootrel, estate));
}
else
{
root_relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
tupdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
modifiedCols = bms_union(ExecGetInsertedCols(resultRelInfo, estate),
ExecGetUpdatedCols(resultRelInfo, estate));
}
val_desc = ExecBuildSlotValueDescription(root_relid,
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(resultRelInfo->ri_RelationDesc)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
errtable(resultRelInfo->ri_RelationDesc)));
}
/*
* ExecConstraints - check constraints of the tuple in 'slot'
*
* This checks the traditional NOT NULL and check constraints.
*
* The partition constraint is *NOT* checked.
*
* Note: 'slot' contains the tuple to check the constraints of, which may
* have been converted from the original input tuple after tuple routing.
* 'resultRelInfo' is the final result relation, after tuple routing.
*/
void
ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
TupleConstr *constr = tupdesc->constr;
Bitmapset *modifiedCols;
Assert(constr); /* we should not be called otherwise */
if (constr->has_not_null)
{
int natts = tupdesc->natts;
int attrChk;
for (attrChk = 1; attrChk <= natts; attrChk++)
{
Form_pg_attribute att = TupleDescAttr(tupdesc, attrChk - 1);
if (att->attnotnull && slot_attisnull(slot, attrChk))
{
char *val_desc;
Relation orig_rel = rel;
TupleDesc orig_tupdesc = RelationGetDescr(rel);
/*
* If the tuple has been routed, it's been converted to the
* partition's rowtype, which might differ from the root
* table's. We must convert it back to the root table's
* rowtype so that val_desc shown error message matches the
* input tuple.
*/
if (resultRelInfo->ri_RootResultRelInfo)
{
ResultRelInfo *rootrel = resultRelInfo->ri_RootResultRelInfo;
AttrMap *map;
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(orig_tupdesc,
tupdesc);
/*
* Partition-specific slot's tupdesc can't be changed, so
* allocate a new one.
*/
if (map != NULL)
slot = execute_attr_map_slot(map, slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
modifiedCols = bms_union(ExecGetInsertedCols(rootrel, estate),
ExecGetUpdatedCols(rootrel, estate));
rel = rootrel->ri_RelationDesc;
}
else
modifiedCols = bms_union(ExecGetInsertedCols(resultRelInfo, estate),
ExecGetUpdatedCols(resultRelInfo, estate));
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_NOT_NULL_VIOLATION),
errmsg("null value in column \"%s\" of relation \"%s\" violates not-null constraint",
NameStr(att->attname),
RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
errtablecol(orig_rel, attrChk)));
}
}
}
if (rel->rd_rel->relchecks > 0)
{
const char *failed;
if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL)
{
char *val_desc;
Relation orig_rel = rel;
/* See the comment above. */
if (resultRelInfo->ri_RootResultRelInfo)
{
ResultRelInfo *rootrel = resultRelInfo->ri_RootResultRelInfo;
TupleDesc old_tupdesc = RelationGetDescr(rel);
AttrMap *map;
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc,
tupdesc);
/*
* Partition-specific slot's tupdesc can't be changed, so
* allocate a new one.
*/
if (map != NULL)
slot = execute_attr_map_slot(map, slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
modifiedCols = bms_union(ExecGetInsertedCols(rootrel, estate),
ExecGetUpdatedCols(rootrel, estate));
rel = rootrel->ri_RelationDesc;
}
else
modifiedCols = bms_union(ExecGetInsertedCols(resultRelInfo, estate),
ExecGetUpdatedCols(resultRelInfo, estate));
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates check constraint \"%s\"",
RelationGetRelationName(orig_rel), failed),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
errtableconstraint(orig_rel, failed)));
}
}
}
/*
* ExecWithCheckOptions -- check that tuple satisfies any WITH CHECK OPTIONs
* of the specified kind.
*
* Note that this needs to be called multiple times to ensure that all kinds of
* WITH CHECK OPTIONs are handled (both those from views which have the WITH
* CHECK OPTION set and from row-level security policies). See ExecInsert()
* and ExecUpdate().
*/
void
ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
ExprContext *econtext;
ListCell *l1,
*l2;
/*
* We will use the EState's per-tuple context for evaluating constraint
* expressions (creating it if it's not already there).
*/
econtext = GetPerTupleExprContext(estate);
/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = slot;
/* Check each of the constraints */
forboth(l1, resultRelInfo->ri_WithCheckOptions,
l2, resultRelInfo->ri_WithCheckOptionExprs)
{
WithCheckOption *wco = (WithCheckOption *) lfirst(l1);
ExprState *wcoExpr = (ExprState *) lfirst(l2);
/*
* Skip any WCOs which are not the kind we are looking for at this
* time.
*/
if (wco->kind != kind)
continue;
/*
* WITH CHECK OPTION checks are intended to ensure that the new tuple
* is visible (in the case of a view) or that it passes the
* 'with-check' policy (in the case of row security). If the qual
* evaluates to NULL or FALSE, then the new tuple won't be included in
* the view or doesn't pass the 'with-check' policy for the table.
*/
if (!ExecQual(wcoExpr, econtext))
{
char *val_desc;
Bitmapset *modifiedCols;
switch (wco->kind)
{
/*
* For WITH CHECK OPTIONs coming from views, we might be
* able to provide the details on the row, depending on
* the permissions on the relation (that is, if the user
* could view it directly anyway). For RLS violations, we
* don't include the data since we don't know if the user
* should be able to view the tuple as that depends on the
* USING policy.
*/
case WCO_VIEW_CHECK:
/* See the comment in ExecConstraints(). */
if (resultRelInfo->ri_RootResultRelInfo)
{
ResultRelInfo *rootrel = resultRelInfo->ri_RootResultRelInfo;
TupleDesc old_tupdesc = RelationGetDescr(rel);
AttrMap *map;
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc,
tupdesc);
/*
* Partition-specific slot's tupdesc can't be changed,
* so allocate a new one.
*/
if (map != NULL)
slot = execute_attr_map_slot(map, slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
modifiedCols = bms_union(ExecGetInsertedCols(rootrel, estate),
ExecGetUpdatedCols(rootrel, estate));
rel = rootrel->ri_RelationDesc;
}
else
modifiedCols = bms_union(ExecGetInsertedCols(resultRelInfo, estate),
ExecGetUpdatedCols(resultRelInfo, estate));
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_WITH_CHECK_OPTION_VIOLATION),
errmsg("new row violates check option for view \"%s\"",
wco->relname),
val_desc ? errdetail("Failing row contains %s.",
val_desc) : 0));
break;
case WCO_RLS_INSERT_CHECK:
case WCO_RLS_UPDATE_CHECK:
if (wco->polname != NULL)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new row violates row-level security policy \"%s\" for table \"%s\"",
wco->polname, wco->relname)));
else
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new row violates row-level security policy for table \"%s\"",
wco->relname)));
break;
case WCO_RLS_CONFLICT_CHECK:
if (wco->polname != NULL)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new row violates row-level security policy \"%s\" (USING expression) for table \"%s\"",
wco->polname, wco->relname)));
else
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new row violates row-level security policy (USING expression) for table \"%s\"",
wco->relname)));
break;
default:
elog(ERROR, "unrecognized WCO kind: %u", wco->kind);
break;
}
}
}
}
/*
* ExecBuildSlotValueDescription -- construct a string representing a tuple
*
* This is intentionally very similar to BuildIndexValueDescription, but
* unlike that function, we truncate long field values (to at most maxfieldlen
* bytes). That seems necessary here since heap field values could be very
* long, whereas index entries typically aren't so wide.
*
* Also, unlike the case with index entries, we need to be prepared to ignore
* dropped columns. We used to use the slot's tuple descriptor to decode the
* data, but the slot's descriptor doesn't identify dropped columns, so we
* now need to be passed the relation's descriptor.
*
* Note that, like BuildIndexValueDescription, if the user does not have
* permission to view any of the columns involved, a NULL is returned. Unlike
* BuildIndexValueDescription, if the user has access to view a subset of the
* column involved, that subset will be returned with a key identifying which
* columns they are.
*/
static char *
ExecBuildSlotValueDescription(Oid reloid,
TupleTableSlot *slot,
TupleDesc tupdesc,
Bitmapset *modifiedCols,
int maxfieldlen)
{
StringInfoData buf;
StringInfoData collist;
bool write_comma = false;
bool write_comma_collist = false;
int i;
AclResult aclresult;
bool table_perm = false;
bool any_perm = false;
/*
* Check if RLS is enabled and should be active for the relation; if so,
* then don't return anything. Otherwise, go through normal permission
* checks.
*/
if (check_enable_rls(reloid, InvalidOid, true) == RLS_ENABLED)
return NULL;
initStringInfo(&buf);
appendStringInfoChar(&buf, '(');
/*
* Check if the user has permissions to see the row. Table-level SELECT
* allows access to all columns. If the user does not have table-level
* SELECT then we check each column and include those the user has SELECT
* rights on. Additionally, we always include columns the user provided
* data for.
*/
aclresult = pg_class_aclcheck(reloid, GetUserId(), ACL_SELECT);
if (aclresult != ACLCHECK_OK)
{
/* Set up the buffer for the column list */
initStringInfo(&collist);
appendStringInfoChar(&collist, '(');
}
else
table_perm = any_perm = true;
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
for (i = 0; i < tupdesc->natts; i++)
{
bool column_perm = false;
char *val;
int vallen;
Form_pg_attribute att = TupleDescAttr(tupdesc, i);
/* ignore dropped columns */
if (att->attisdropped)
continue;
if (!table_perm)
{
/*
* No table-level SELECT, so need to make sure they either have
* SELECT rights on the column or that they have provided the data
* for the column. If not, omit this column from the error
* message.
*/
aclresult = pg_attribute_aclcheck(reloid, att->attnum,
GetUserId(), ACL_SELECT);
if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
modifiedCols) || aclresult == ACLCHECK_OK)
{
column_perm = any_perm = true;
if (write_comma_collist)
appendStringInfoString(&collist, ", ");
else
write_comma_collist = true;
appendStringInfoString(&collist, NameStr(att->attname));
}
}
if (table_perm || column_perm)
{
if (slot->tts_isnull[i])
val = "null";
else
{
Oid foutoid;
bool typisvarlena;
getTypeOutputInfo(att->atttypid,
&foutoid, &typisvarlena);
val = OidOutputFunctionCall(foutoid, slot->tts_values[i]);
}
if (write_comma)
appendStringInfoString(&buf, ", ");
else
write_comma = true;
/* truncate if needed */
vallen = strlen(val);
if (vallen <= maxfieldlen)
appendBinaryStringInfo(&buf, val, vallen);
else
{
vallen = pg_mbcliplen(val, vallen, maxfieldlen);
appendBinaryStringInfo(&buf, val, vallen);
appendStringInfoString(&buf, "...");
}
}
}
/* If we end up with zero columns being returned, then return NULL. */
if (!any_perm)
return NULL;
appendStringInfoChar(&buf, ')');
if (!table_perm)
{
appendStringInfoString(&collist, ") = ");
appendBinaryStringInfo(&collist, buf.data, buf.len);
return collist.data;
}
return buf.data;
}
/*
* ExecUpdateLockMode -- find the appropriate UPDATE tuple lock mode for a
* given ResultRelInfo
*/
LockTupleMode
ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo)
{
Bitmapset *keyCols;
Bitmapset *updatedCols;
/*
* Compute lock mode to use. If columns that are part of the key have not
* been modified, then we can use a weaker lock, allowing for better
* concurrency.
*/
updatedCols = ExecGetAllUpdatedCols(relinfo, estate);
keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
INDEX_ATTR_BITMAP_KEY);
if (bms_overlap(keyCols, updatedCols))
return LockTupleExclusive;
return LockTupleNoKeyExclusive;
}
/*
* ExecFindRowMark -- find the ExecRowMark struct for given rangetable index
*
* If no such struct, either return NULL or throw error depending on missing_ok
*/
ExecRowMark *
ExecFindRowMark(EState *estate, Index rti, bool missing_ok)
{
if (rti > 0 && rti <= estate->es_range_table_size &&
estate->es_rowmarks != NULL)
{
ExecRowMark *erm = estate->es_rowmarks[rti - 1];
if (erm)
return erm;
}
if (!missing_ok)
elog(ERROR, "failed to find ExecRowMark for rangetable index %u", rti);
return NULL;
}
/*
* ExecBuildAuxRowMark -- create an ExecAuxRowMark struct
*
* Inputs are the underlying ExecRowMark struct and the targetlist of the
* input plan node (not planstate node!). We need the latter to find out
* the column numbers of the resjunk columns.
*/
ExecAuxRowMark *
ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
{
ExecAuxRowMark *aerm = (ExecAuxRowMark *) palloc0(sizeof(ExecAuxRowMark));
char resname[32];
aerm->rowmark = erm;
/* Look up the resjunk columns associated with this rowmark */
if (erm->markType != ROW_MARK_COPY)
{
/* need ctid for all methods other than COPY */
snprintf(resname, sizeof(resname), "ctid%u", erm->rowmarkId);
aerm->ctidAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
if (!AttributeNumberIsValid(aerm->ctidAttNo))
elog(ERROR, "could not find junk %s column", resname);
}
else
{
/* need wholerow if COPY */
snprintf(resname, sizeof(resname), "wholerow%u", erm->rowmarkId);
aerm->wholeAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
if (!AttributeNumberIsValid(aerm->wholeAttNo))
elog(ERROR, "could not find junk %s column", resname);
}
/* if child rel, need tableoid */
if (erm->rti != erm->prti)
{
snprintf(resname, sizeof(resname), "tableoid%u", erm->rowmarkId);
aerm->toidAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
if (!AttributeNumberIsValid(aerm->toidAttNo))
elog(ERROR, "could not find junk %s column", resname);
}
return aerm;
}
/*
* EvalPlanQual logic --- recheck modified tuple(s) to see if we want to
* process the updated version under READ COMMITTED rules.
*
* See backend/executor/README for some info about how this works.
*/
/*
* Check the updated version of a tuple to see if we want to process it under
* READ COMMITTED rules.
*
* epqstate - state for EvalPlanQual rechecking
* relation - table containing tuple
* rti - rangetable index of table containing tuple
* inputslot - tuple for processing - this can be the slot from
* EvalPlanQualSlot(), for the increased efficiency.
*
* This tests whether the tuple in inputslot still matches the relevant
* quals. For that result to be useful, typically the input tuple has to be
* last row version (otherwise the result isn't particularly useful) and
* locked (otherwise the result might be out of date). That's typically
* achieved by using table_tuple_lock() with the
* TUPLE_LOCK_FLAG_FIND_LAST_VERSION flag.
*
* Returns a slot containing the new candidate update/delete tuple, or
* NULL if we determine we shouldn't process the row.
*/
TupleTableSlot *
EvalPlanQual(EPQState *epqstate, Relation relation,
Index rti, TupleTableSlot *inputslot)
{
TupleTableSlot *slot;
TupleTableSlot *testslot;
Assert(rti > 0);
/*
* Need to run a recheck subquery. Initialize or reinitialize EPQ state.
*/
EvalPlanQualBegin(epqstate);
/*
* Callers will often use the EvalPlanQualSlot to store the tuple to avoid
* an unnecessary copy.
*/
testslot = EvalPlanQualSlot(epqstate, relation, rti);
if (testslot != inputslot)
ExecCopySlot(testslot, inputslot);
/*
* Run the EPQ query. We assume it will return at most one tuple.
*/
slot = EvalPlanQualNext(epqstate);
/*
* If we got a tuple, force the slot to materialize the tuple so that it
* is not dependent on any local state in the EPQ query (in particular,
* it's highly likely that the slot contains references to any pass-by-ref
* datums that may be present in copyTuple). As with the next step, this
* is to guard against early re-use of the EPQ query.
*/
if (!TupIsNull(slot))
ExecMaterializeSlot(slot);
/*
* Clear out the test tuple. This is needed in case the EPQ query is
* re-used to test a tuple for a different relation. (Not clear that can
* really happen, but let's be safe.)
*/
ExecClearTuple(testslot);
return slot;
}
/*
* EvalPlanQualInit -- initialize during creation of a plan state node
* that might need to invoke EPQ processing.
*
* Note: subplan/auxrowmarks can be NULL/NIL if they will be set later
* with EvalPlanQualSetPlan.
*/
void
EvalPlanQualInit(EPQState *epqstate, EState *parentestate,
Plan *subplan, List *auxrowmarks, int epqParam)
{
Index rtsize = parentestate->es_range_table_size;
/* initialize data not changing over EPQState's lifetime */
epqstate->parentestate = parentestate;
epqstate->epqParam = epqParam;
/*
* Allocate space to reference a slot for each potential rti - do so now
* rather than in EvalPlanQualBegin(), as done for other dynamically
* allocated resources, so EvalPlanQualSlot() can be used to hold tuples
* that *may* need EPQ later, without forcing the overhead of
* EvalPlanQualBegin().
*/
epqstate->tuple_table = NIL;
epqstate->relsubs_slot = (TupleTableSlot **)
palloc0(rtsize * sizeof(TupleTableSlot *));
/* ... and remember data that EvalPlanQualBegin will need */
epqstate->plan = subplan;
epqstate->arowMarks = auxrowmarks;
/* ... and mark the EPQ state inactive */
epqstate->origslot = NULL;
epqstate->recheckestate = NULL;
epqstate->recheckplanstate = NULL;
epqstate->relsubs_rowmark = NULL;
epqstate->relsubs_done = NULL;
}
/*
* EvalPlanQualSetPlan -- set or change subplan of an EPQState.
*
* We used to need this so that ModifyTable could deal with multiple subplans.
* It could now be refactored out of existence.
*/
void
EvalPlanQualSetPlan(EPQState *epqstate, Plan *subplan, List *auxrowmarks)
{
/* If we have a live EPQ query, shut it down */
EvalPlanQualEnd(epqstate);
/* And set/change the plan pointer */
epqstate->plan = subplan;
/* The rowmarks depend on the plan, too */
epqstate->arowMarks = auxrowmarks;
}
/*
* Return, and create if necessary, a slot for an EPQ test tuple.
*
* Note this only requires EvalPlanQualInit() to have been called,
* EvalPlanQualBegin() is not necessary.
*/
TupleTableSlot *
EvalPlanQualSlot(EPQState *epqstate,
Relation relation, Index rti)
{
TupleTableSlot **slot;
Assert(relation);
Assert(rti > 0 && rti <= epqstate->parentestate->es_range_table_size);
slot = &epqstate->relsubs_slot[rti - 1];
if (*slot == NULL)
{
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(epqstate->parentestate->es_query_cxt);
*slot = table_slot_create(relation, &epqstate->tuple_table);
MemoryContextSwitchTo(oldcontext);
}
return *slot;
}
/*
* Fetch the current row value for a non-locked relation, identified by rti,
* that needs to be scanned by an EvalPlanQual operation. origslot must have
* been set to contain the current result row (top-level row) that we need to
* recheck. Returns true if a substitution tuple was found, false if not.
*/
bool
EvalPlanQualFetchRowMark(EPQState *epqstate, Index rti, TupleTableSlot *slot)
{
ExecAuxRowMark *earm = epqstate->relsubs_rowmark[rti - 1];
ExecRowMark *erm = earm->rowmark;
Datum datum;
bool isNull;
Assert(earm != NULL);
Assert(epqstate->origslot != NULL);
if (RowMarkRequiresRowShareLock(erm->markType))
elog(ERROR, "EvalPlanQual doesn't support locking rowmarks");
/* if child rel, must check whether it produced this row */
if (erm->rti != erm->prti)
{
Oid tableoid;
datum = ExecGetJunkAttribute(epqstate->origslot,
earm->toidAttNo,
&isNull);
/* non-locked rels could be on the inside of outer joins */
if (isNull)
return false;
tableoid = DatumGetObjectId(datum);
Assert(OidIsValid(erm->relid));
if (tableoid != erm->relid)
{
/* this child is inactive right now */
return false;
}
}
if (erm->markType == ROW_MARK_REFERENCE)
{
Assert(erm->relation != NULL);
/* fetch the tuple's ctid */
datum = ExecGetJunkAttribute(epqstate->origslot,
earm->ctidAttNo,
&isNull);
/* non-locked rels could be on the inside of outer joins */
if (isNull)
return false;
/* fetch requests on foreign tables must be passed to their FDW */
if (erm->relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
{
FdwRoutine *fdwroutine;
bool updated = false;
fdwroutine = GetFdwRoutineForRelation(erm->relation, false);
/* this should have been checked already, but let's be safe */
if (fdwroutine->RefetchForeignRow == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot lock rows in foreign table \"%s\"",
RelationGetRelationName(erm->relation))));
fdwroutine->RefetchForeignRow(epqstate->recheckestate,
erm,
datum,
slot,
&updated);
if (TupIsNull(slot))
elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
/*
* Ideally we'd insist on updated == false here, but that assumes
* that FDWs can track that exactly, which they might not be able
* to. So just ignore the flag.
*/
return true;
}
else
{
/* ordinary table, fetch the tuple */
if (!table_tuple_fetch_row_version(erm->relation,
(ItemPointer) DatumGetPointer(datum),
SnapshotAny, slot))
elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
return true;
}
}
else
{
Assert(erm->markType == ROW_MARK_COPY);
/* fetch the whole-row Var for the relation */
datum = ExecGetJunkAttribute(epqstate->origslot,
earm->wholeAttNo,
&isNull);
/* non-locked rels could be on the inside of outer joins */
if (isNull)
return false;
ExecStoreHeapTupleDatum(datum, slot);
return true;
}
}
/*
* Fetch the next row (if any) from EvalPlanQual testing
*
* (In practice, there should never be more than one row...)
*/
TupleTableSlot *
EvalPlanQualNext(EPQState *epqstate)
{
MemoryContext oldcontext;
TupleTableSlot *slot;
oldcontext = MemoryContextSwitchTo(epqstate->recheckestate->es_query_cxt);
slot = ExecProcNode(epqstate->recheckplanstate);
MemoryContextSwitchTo(oldcontext);
return slot;
}
/*
* Initialize or reset an EvalPlanQual state tree
*/
void
EvalPlanQualBegin(EPQState *epqstate)
{
EState *parentestate = epqstate->parentestate;
EState *recheckestate = epqstate->recheckestate;
if (recheckestate == NULL)
{
/* First time through, so create a child EState */
EvalPlanQualStart(epqstate, epqstate->plan);
}
else
{
/*
* We already have a suitable child EPQ tree, so just reset it.
*/
Index rtsize = parentestate->es_range_table_size;
PlanState *rcplanstate = epqstate->recheckplanstate;
MemSet(epqstate->relsubs_done, 0, rtsize * sizeof(bool));
/* Recopy current values of parent parameters */
if (parentestate->es_plannedstmt->paramExecTypes != NIL)
{
int i;
/*
* Force evaluation of any InitPlan outputs that could be needed
* by the subplan, just in case they got reset since
* EvalPlanQualStart (see comments therein).
*/
ExecSetParamPlanMulti(rcplanstate->plan->extParam,
GetPerTupleExprContext(parentestate));
i = list_length(parentestate->es_plannedstmt->paramExecTypes);
while (--i >= 0)
{
/* copy value if any, but not execPlan link */
recheckestate->es_param_exec_vals[i].value =
parentestate->es_param_exec_vals[i].value;
recheckestate->es_param_exec_vals[i].isnull =
parentestate->es_param_exec_vals[i].isnull;
}
}
/*
* Mark child plan tree as needing rescan at all scan nodes. The
* first ExecProcNode will take care of actually doing the rescan.
*/
rcplanstate->chgParam = bms_add_member(rcplanstate->chgParam,
epqstate->epqParam);
}
}
/*
* Start execution of an EvalPlanQual plan tree.
*
* This is a cut-down version of ExecutorStart(): we copy some state from
* the top-level estate rather than initializing it fresh.
*/
static void
EvalPlanQualStart(EPQState *epqstate, Plan *planTree)
{
EState *parentestate = epqstate->parentestate;
Index rtsize = parentestate->es_range_table_size;
EState *rcestate;
MemoryContext oldcontext;
ListCell *l;
epqstate->recheckestate = rcestate = CreateExecutorState();
oldcontext = MemoryContextSwitchTo(rcestate->es_query_cxt);
/* signal that this is an EState for executing EPQ */
rcestate->es_epq_active = epqstate;
/*
* Child EPQ EStates share the parent's copy of unchanging state such as
* the snapshot, rangetable, and external Param info. They need their own
* copies of local state, including a tuple table, es_param_exec_vals,
* result-rel info, etc.
*/
rcestate->es_direction = ForwardScanDirection;
rcestate->es_snapshot = parentestate->es_snapshot;
rcestate->es_crosscheck_snapshot = parentestate->es_crosscheck_snapshot;
rcestate->es_range_table = parentestate->es_range_table;
rcestate->es_range_table_size = parentestate->es_range_table_size;
rcestate->es_relations = parentestate->es_relations;
rcestate->es_queryEnv = parentestate->es_queryEnv;
rcestate->es_rowmarks = parentestate->es_rowmarks;
rcestate->es_plannedstmt = parentestate->es_plannedstmt;
rcestate->es_junkFilter = parentestate->es_junkFilter;
rcestate->es_output_cid = parentestate->es_output_cid;
/*
* ResultRelInfos needed by subplans are initialized from scratch when the
* subplans themselves are initialized.
*/
rcestate->es_result_relations = NULL;
/* es_trig_target_relations must NOT be copied */
rcestate->es_top_eflags = parentestate->es_top_eflags;
rcestate->es_instrument = parentestate->es_instrument;
/* es_auxmodifytables must NOT be copied */
/*
* The external param list is simply shared from parent. The internal
* param workspace has to be local state, but we copy the initial values
* from the parent, so as to have access to any param values that were
* already set from other parts of the parent's plan tree.
*/
rcestate->es_param_list_info = parentestate->es_param_list_info;
if (parentestate->es_plannedstmt->paramExecTypes != NIL)
{
int i;
/*
* Force evaluation of any InitPlan outputs that could be needed by
* the subplan. (With more complexity, maybe we could postpone this
* till the subplan actually demands them, but it doesn't seem worth
* the trouble; this is a corner case already, since usually the
* InitPlans would have been evaluated before reaching EvalPlanQual.)
*
* This will not touch output params of InitPlans that occur somewhere
* within the subplan tree, only those that are attached to the
* ModifyTable node or above it and are referenced within the subplan.
* That's OK though, because the planner would only attach such
* InitPlans to a lower-level SubqueryScan node, and EPQ execution
* will not descend into a SubqueryScan.
*
* The EState's per-output-tuple econtext is sufficiently short-lived
* for this, since it should get reset before there is any chance of
* doing EvalPlanQual again.
*/
ExecSetParamPlanMulti(planTree->extParam,
GetPerTupleExprContext(parentestate));
/* now make the internal param workspace ... */
i = list_length(parentestate->es_plannedstmt->paramExecTypes);
rcestate->es_param_exec_vals = (ParamExecData *)
palloc0(i * sizeof(ParamExecData));
/* ... and copy down all values, whether really needed or not */
while (--i >= 0)
{
/* copy value if any, but not execPlan link */
rcestate->es_param_exec_vals[i].value =
parentestate->es_param_exec_vals[i].value;
rcestate->es_param_exec_vals[i].isnull =
parentestate->es_param_exec_vals[i].isnull;
}
}
/*
* Initialize private state information for each SubPlan. We must do this
* before running ExecInitNode on the main query tree, since
* ExecInitSubPlan expects to be able to find these entries. Some of the
* SubPlans might not be used in the part of the plan tree we intend to
* run, but since it's not easy to tell which, we just initialize them
* all.
*/
Assert(rcestate->es_subplanstates == NIL);
foreach(l, parentestate->es_plannedstmt->subplans)
{
Plan *subplan = (Plan *) lfirst(l);
PlanState *subplanstate;
subplanstate = ExecInitNode(subplan, rcestate, 0);
rcestate->es_subplanstates = lappend(rcestate->es_subplanstates,
subplanstate);
}
/*
* Build an RTI indexed array of rowmarks, so that
* EvalPlanQualFetchRowMark() can efficiently access the to be fetched
* rowmark.
*/
epqstate->relsubs_rowmark = (ExecAuxRowMark **)
palloc0(rtsize * sizeof(ExecAuxRowMark *));
foreach(l, epqstate->arowMarks)
{
ExecAuxRowMark *earm = (ExecAuxRowMark *) lfirst(l);
epqstate->relsubs_rowmark[earm->rowmark->rti - 1] = earm;
}
/*
* Initialize per-relation EPQ tuple states to not-fetched.
*/
epqstate->relsubs_done = (bool *)
palloc0(rtsize * sizeof(bool));
/*
* Initialize the private state information for all the nodes in the part
* of the plan tree we need to run. This opens files, allocates storage
* and leaves us ready to start processing tuples.
*/
epqstate->recheckplanstate = ExecInitNode(planTree, rcestate, 0);
MemoryContextSwitchTo(oldcontext);
}
/*
* EvalPlanQualEnd -- shut down at termination of parent plan state node,
* or if we are done with the current EPQ child.
*
* This is a cut-down version of ExecutorEnd(); basically we want to do most
* of the normal cleanup, but *not* close result relations (which we are
* just sharing from the outer query). We do, however, have to close any
* result and trigger target relations that got opened, since those are not
* shared. (There probably shouldn't be any of the latter, but just in
* case...)
*/
void
EvalPlanQualEnd(EPQState *epqstate)
{
EState *estate = epqstate->recheckestate;
Index rtsize;
MemoryContext oldcontext;
ListCell *l;
rtsize = epqstate->parentestate->es_range_table_size;
/*
* We may have a tuple table, even if EPQ wasn't started, because we allow
* use of EvalPlanQualSlot() without calling EvalPlanQualBegin().
*/
if (epqstate->tuple_table != NIL)
{
memset(epqstate->relsubs_slot, 0,
rtsize * sizeof(TupleTableSlot *));
ExecResetTupleTable(epqstate->tuple_table, true);
epqstate->tuple_table = NIL;
}
/* EPQ wasn't started, nothing further to do */
if (estate == NULL)
return;
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
ExecEndNode(epqstate->recheckplanstate);
foreach(l, estate->es_subplanstates)
{
PlanState *subplanstate = (PlanState *) lfirst(l);
ExecEndNode(subplanstate);
}
/* throw away the per-estate tuple table, some node may have used it */
ExecResetTupleTable(estate->es_tupleTable, false);
/* Close any result and trigger target relations attached to this EState */
ExecCloseResultRelations(estate);
MemoryContextSwitchTo(oldcontext);
FreeExecutorState(estate);
/* Mark EPQState idle */
epqstate->origslot = NULL;
epqstate->recheckestate = NULL;
epqstate->recheckplanstate = NULL;
epqstate->relsubs_rowmark = NULL;
epqstate->relsubs_done = NULL;
}
/*-------------------------------------------------------------------------
*
* nodeSeqscan.c
* Support routines for sequential scans of relations.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/executor/nodeSeqscan.c
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
* ExecSeqScan sequentially scans a relation.
* ExecSeqNext retrieve next tuple in sequential order.
* ExecInitSeqScan creates and initializes a seqscan node.
* ExecEndSeqScan releases any storage allocated.
* ExecReScanSeqScan rescans the relation
*
* ExecSeqScanEstimate estimates DSM space needed for parallel scan
* ExecSeqScanInitializeDSM initialize DSM for parallel scan
* ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "executor/execdebug.h"
#include "executor/nodeSeqscan.h"
#include "nodes/nodes.h"
#include "utils/rel.h"
/* CS631 start */
/* Including SPI for executing the insert statements into AIDX_QUERIES table */
#include "executor/spi.h"
/* Including for CMDType struct use */
#include "executor/execExpr.h"
/* CS631 end */
static TupleTableSlot *SeqNext(SeqScanState *node);
/* ----------------------------------------------------------------
* Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* SeqNext
*
* This is a workhorse for ExecSeqScan
* ----------------------------------------------------------------
*/
static TupleTableSlot *
SeqNext(SeqScanState *node)
{
TableScanDesc scandesc;
EState *estate;
ScanDirection direction;
TupleTableSlot *slot;
/*
* get information from the estate and scan state
*/
scandesc = node->ss.ss_currentScanDesc;
estate = node->ss.ps.state;
direction = estate->es_direction;
slot = node->ss.ss_ScanTupleSlot;
if (scandesc == NULL)
{
/*
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
scandesc = table_beginscan(node->ss.ss_currentRelation,
estate->es_snapshot,
0, NULL);
node->ss.ss_currentScanDesc = scandesc;
}
/*
* get the next tuple from the table
*/
if (table_scan_getnextslot(scandesc, direction, slot))
return slot;
return NULL;
}
/*
* SeqRecheck -- access method routine to recheck a tuple in EvalPlanQual
*/
static bool
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
{
/*
* Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
* (and this is very bad) - so, here we do not check are keys ok or not.
*/
return true;
}
/* ----------------------------------------------------------------
* ExecSeqScan(node)
*
* Scans the relation sequentially and returns the next qualifying
* tuple.
* We call the ExecScan() routine and pass it the appropriate
* access method functions.
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExecSeqScan(PlanState *pstate)
{
SeqScanState *node = castNode(SeqScanState, pstate);
return ExecScan(&node->ss,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
}
/* ----------------------------------------------------------------
* ExecInitSeqScan
* ----------------------------------------------------------------
*/
SeqScanState *
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
{
SeqScanState *scanstate;
/*
* Once upon a time it was possible to have an outerPlan of a SeqScan, but
* not any more.
*/
Assert(outerPlan(node) == NULL);
Assert(innerPlan(node) == NULL);
/*
* create state structure
*/
scanstate = makeNode(SeqScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
scanstate->ss.ps.ExecProcNode = ExecSeqScan;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &scanstate->ss.ps);
/*
* open the scan relation
*/
scanstate->ss.ss_currentRelation =
ExecOpenScanRelation(estate,
node->scanrelid,
eflags);
/* CS631 start */
/*
* Eg: SELECT * FROM PG_DATABASE
* Relation name => scanstate->ss.ss_currentRelation->rd_rel->relname.data === PG_DATABASE
*/
char* relName = "";
// Relation Name
relName = scanstate->ss.ss_currentRelation->rd_rel->relname.data;
elog(LOG, "%s", relName);
/* CS631 end */
/* and create slot with the appropriate rowtype */
ExecInitScanTupleSlot(estate, &scanstate->ss,
RelationGetDescr(scanstate->ss.ss_currentRelation),
table_slot_callbacks(scanstate->ss.ss_currentRelation));
/* CS631 start */
/*
* List of Attributes selected => scanstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor->attrs
*/
TupleDescData *all_selected_attributes = scanstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
elog(LOG, "%d", all_selected_attributes->natts);
FormData_pg_attribute *attrs = all_selected_attributes->attrs;
char attr_selected[1000] = "";
int j, l, final_string_length, k, ret;
char final_attrs[1000] = "";
char *comma = ",", *eos="\0";
for (int i=0; i<all_selected_attributes->natts; i++){
j = strlen(attr_selected);
l = 0;
final_string_length = strlen(final_attrs);
for(k=strlen(final_attrs); k<strlen(attrs[i].attname.data)+final_string_length; k++){
final_attrs[k] = attrs[i].attname.data[l++];
}
final_attrs[k] = comma[0];
}
final_attrs[k] = eos[0];
elog(LOG, "%s", final_attrs);
/* CS631 end */
/*
* Initialize result type and projection.
*/
ExecInitResultTypeTL(&scanstate->ss.ps);
ExecAssignScanProjectionInfo(&scanstate->ss);
/*
* initialize child expressions
*/
scanstate->ss.ps.qual =
ExecInitQual(node->plan.qual, (PlanState *) scanstate);
/* CS631 start */
/*
* Creating the INSERT query for inserting data into AIDX_QUERIES table
* AIDX_QUERIES(
* type varchar(100), -- Type of the query - select/delete/insert
* tablename varchar(100), -- Table name
* attrs varchar(100) -- attributes in the where clause (currently for insert we insert * in the table))
*
*
*/
char new_new_query[200] = "INSERT INTO AIDX_QUERIES VALUES('";
char *type="";
char *add1 = "','";
char *add2 = "','";
char *add3 = "');";
char qual_attr[100] = "";
if(scanstate->ss.ps.qual!= NULL){
int attnum = scanstate->ss.ps.qual->steps->d.var.attnum;
elog(LOG, "%d", attnum);
for(int i=0; i<strlen(attrs[attnum-1].attname.data); i++){
qual_attr[i] = attrs[attnum-1].attname.data[i];
}
elog(LOG, "%s", qual_attr);
/*Retrieving the Type of the query*/
if(estate->es_plannedstmt->commandType == CMD_SELECT){
type = "select";
elog(LOG,"CMDTYPE SELECT \n");
}
if(estate->es_plannedstmt->commandType == CMD_DELETE){
type = "delete";
elog(LOG,"CMDTYPE DELTE \n");
}
}
else if(scanstate->ss.ps.qual == NULL && estate->es_plannedstmt->commandType == CMD_DELETE){
qual_attr[0] = '*';
type="delete";
}
int i1;
int j1 = strlen(new_new_query);
for (i1 = 0; type[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = type[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add1[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add1[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; relName[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = relName[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add2[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add2[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; qual_attr[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = qual_attr[i1];
}
j1 = strlen(new_new_query);
for (i1 = 0; add3[i1] != '\0'; i1++) {
new_new_query[i1 + j1] = add3[i1];
}
new_new_query[i1 + j1] = '\0';
elog(LOG, "%s", new_new_query);
if(type == "select" || type == "delete")
if( SPI_connect() == SPI_OK_CONNECT )
{
ret = SPI_execute(new_new_query,false,0);
ret = SPI_finish();
}
/* CS631 end */
return scanstate;
}
/* ----------------------------------------------------------------
* ExecEndSeqScan
*
* frees any storage allocated through C routines.
* ----------------------------------------------------------------
*/
void
ExecEndSeqScan(SeqScanState *node)
{
TableScanDesc scanDesc;
/*
* get information from node
*/
scanDesc = node->ss.ss_currentScanDesc;
/*
* Free the exprcontext
*/
ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
if (node->ss.ps.ps_ResultTupleSlot)
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecClearTuple(node->ss.ss_ScanTupleSlot);
/*
* close heap scan
*/
if (scanDesc != NULL)
table_endscan(scanDesc);
}
/* ----------------------------------------------------------------
* Join Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecReScanSeqScan
*
* Rescans the relation.
* ----------------------------------------------------------------
*/
void
ExecReScanSeqScan(SeqScanState *node)
{
TableScanDesc scan;
scan = node->ss.ss_currentScanDesc;
if (scan != NULL)
table_rescan(scan, /* scan desc */
NULL); /* new scan keys */
ExecScanReScan((ScanState *) node);
}
/* ----------------------------------------------------------------
* Parallel Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecSeqScanEstimate
*
* Compute the amount of space we'll need in the parallel
* query DSM, and inform pcxt->estimator about our needs.
* ----------------------------------------------------------------
*/
void
ExecSeqScanEstimate(SeqScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation,
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecSeqScanInitializeDSM
*
* Set up a parallel heap scan descriptor.
* ----------------------------------------------------------------
*/
void
ExecSeqScanInitializeDSM(SeqScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
ParallelTableScanDesc pscan;
pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
table_parallelscan_initialize(node->ss.ss_currentRelation,
pscan,
estate->es_snapshot);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}
/* ----------------------------------------------------------------
* ExecSeqScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecSeqScanReInitializeDSM(SeqScanState *node,
ParallelContext *pcxt)
{
ParallelTableScanDesc pscan;
pscan = node->ss.ss_currentScanDesc->rs_parallel;
table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
}
/* ----------------------------------------------------------------
* ExecSeqScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
* ----------------------------------------------------------------
*/
void
ExecSeqScanInitializeWorker(SeqScanState *node,
ParallelWorkerContext *pwcxt)
{
ParallelTableScanDesc pscan;
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}
# Generated subdirectories
/log/
/results/
/tmp_check/
# src/test/modules/worker_spi/Makefile
MODULES = worker_spi
EXTENSION = worker_spi
DATA = worker_spi--1.0.sql
PGFILEDESC = "worker_spi - background worker example"
REGRESS = worker_spi
# enable our module in shared_preload_libraries for dynamic bgworkers
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/worker_spi/dynamic.conf
# Disable installcheck to ensure we cover dynamic bgworkers.
NO_INSTALLCHECK = 1
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/worker_spi
top_builddir = ../../
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
shared_preload_libraries = worker_spi
worker_spi.database = contrib_regression
CREATE EXTENSION worker_spi;
SELECT worker_spi_launch(4) IS NOT NULL;
?column?
----------
t
(1 row)
-- wait until the worker completes its initialization
DO $$
DECLARE
visible bool;
loops int := 0;
BEGIN
LOOP
visible := table_name IS NOT NULL
FROM information_schema.tables
WHERE table_schema = 'schema4' AND table_name = 'counted';
IF visible OR loops > 120 * 10 THEN EXIT; END IF;
PERFORM pg_sleep(0.1);
loops := loops + 1;
END LOOP;
END
$$;
INSERT INTO schema4.counted VALUES ('total', 0), ('delta', 1);
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
-- wait until the worker has processed the tuple we just inserted
DO $$
DECLARE
count int;
loops int := 0;
BEGIN
LOOP
count := count(*) FROM schema4.counted WHERE type = 'delta';
IF count = 0 OR loops > 120 * 10 THEN EXIT; END IF;
PERFORM pg_sleep(0.1);
loops := loops + 1;
END LOOP;
END
$$;
SELECT * FROM schema4.counted;
type | value
-------+-------
total | 1
(1 row)
CREATE EXTENSION worker_spi;
SELECT worker_spi_launch(4) IS NOT NULL;
-- wait until the worker completes its initialization
DO $$
DECLARE
visible bool;
loops int := 0;
BEGIN
LOOP
visible := table_name IS NOT NULL
FROM information_schema.tables
WHERE table_schema = 'schema4' AND table_name = 'counted';
IF visible OR loops > 120 * 10 THEN EXIT; END IF;
PERFORM pg_sleep(0.1);
loops := loops + 1;
END LOOP;
END
$$;
INSERT INTO schema4.counted VALUES ('total', 0), ('delta', 1);
SELECT pg_reload_conf();
-- wait until the worker has processed the tuple we just inserted
DO $$
DECLARE
count int;
loops int := 0;
BEGIN
LOOP
count := count(*) FROM schema4.counted WHERE type = 'delta';
IF count = 0 OR loops > 120 * 10 THEN EXIT; END IF;
PERFORM pg_sleep(0.1);
loops := loops + 1;
END LOOP;
END
$$;
SELECT * FROM schema4.counted;
/* src/test/modules/worker_spi/worker_spi--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME'
LANGUAGE C;
/* -------------------------------------------------------------------------
*
* worker_spi.c
* Sample background worker code that demonstrates various coding
* patterns: establishing a database connection; starting and committing
* transactions; using GUC variables, and heeding SIGHUP to reread
* the configuration file; reporting to pg_stat_activity; using the
* process latch to sleep and exit in case of postmaster death.
*
* This code connects to a database, creates a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/worker_spi/worker_spi.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
void _PG_init(void);
void worker_spi_main(Datum) pg_attribute_noreturn();
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
static volatile sig_atomic_t got_sigterm = false;
/* GUC variables */
static int worker_spi_naptime = 0;
static int worker_spi_total_workers = 1;
static char *worker_spi_database = NULL;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
/* CS631 start */
// Function to iterate through the output of the query executed by SPI
const char* outExecResult(int ret)
{
int64 rows = 0;
int i = 0;
int j = 0;
char * new_query;
if(ret <= 0)
return;
rows = SPI_processed; // number of rows returned
// SPI_tuptable is a structure that holds the result of the query
if (SPI_tuptable)
{
TupleDesc tupdesc = SPI_tuptable->tupdesc;
SPITupleTable *tuptable = SPI_tuptable;
char buf[8192] = {0};
// For loop to iterate through the rows
for (i = 1; i <= tupdesc->natts; i++)
{
char *value = NULL;
char *type = NULL;
char *inter = (i == tupdesc->natts) ? " " : " |";
char *cur_loc = buf + strlen (buf);
int remain_base = sizeof(buf) - strlen(buf);
value = SPI_fname(tupdesc, i);
type = SPI_gettype(tupdesc, i);
}
int assign_var = 0;
int assign_max_size = 8192;
// For loop to iterate over stored tuples and over columns
for (j = 0; j < rows; j++)
{
HeapTuple tuple = tuptable->vals[j];
for (int i = 0; i < 8192; i++){
buf[i] = 0;
}
char* rel,*att; // relation and attribute
for (i = 1; i <= tupdesc->natts; i++)
{
char *temp_value = NULL;
char *inter = (i == tupdesc->natts) ? " " : " |";
char *cur_loc = buf + strlen (buf);
int remain_base = sizeof(buf) - strlen(buf);
temp_value = SPI_getvalue(tuple, tupdesc, i);
if(i==1){
rel = temp_value; // relation
}
if(i==2){
att = temp_value; // attribute
}
}
if(j==1){
//Initialize the query
sprintf(new_query,"CREATE INDEX IF NOT EXISTS %s_%s ON %s(%s);\n",rel,att,rel,att);
}
else{
//Appending to the original query
sprintf(new_query,"CREATE INDEX IF NOT EXISTS %s_%s ON %s(%s);\n%s",rel,att,rel,att,new_query);
}
//printf("%s\n",new_query);
//fflush(stdout);
}
}
printf("Returning index query\n%s\n",new_query);
fflush(stdout);
return new_query;
}
// Function to execute and sql query string
static void
my_exec_spi(char *sql)
{
int ret = 0;
printf("%s = SQL STATEMENT", sql);
fflush(stdout);
ret = SPI_execute(sql, false, 0);
//outExecResult(ret);
}
/* CS631 end */
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake
* it up.
*/
static void
worker_spi_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
SetLatch(MyLatch);
errno = save_errno;
}
/*
* Signal handler for SIGHUP
* Set a flag to tell the main loop to reread the config file, and set
* our latch to wake it up.
*/
static void
worker_spi_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_sighup = true;
SetLatch(MyLatch);
errno = save_errno;
}
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
*/
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
//appendStringInfo(&buf,"select * from AIDX_QUERIES");
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
//appendStringInfo(&buf,"SELECT * FROM AIDX_QUERIES");
/* set statement start time */
elog(LOG,"WORKER IS RUNNING");
printf("WORKER RUNNING");
SetCurrentStatementStartTimestamp();
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_spi_main(Datum main_arg)
{
int index = DatumGetInt32(main_arg);
worktable *table;
StringInfoData buf;
char name[20];
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, worker_spi_sighup);
pqsignal(SIGTERM, worker_spi_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
// appendStringInfo(&buf,
// "WITH deleted AS (DELETE "
// "FROM %s.%s "
// "WHERE type = 'delta' RETURNING value), "
// "total AS (SELECT coalesce(sum(value), 0) as sum "
// "FROM deleted) "
// "UPDATE %s.%s "
// "SET value = %s.value + total.sum "
// "FROM total WHERE type = 'total' "
// "RETURNING %s.value;",
// table->schema, table->name,
// table->schema, table->name,
// table->name,
// table->name);
appendStringInfo(&buf,
"");
//appendStringInfo(&buf,"UPDATE AIDX_QUERIES set type = 'INSERT' where table_name='student'");
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
int toggle = 0;
char* index_creation_query;
int ret;
int ind_ret;
while (!got_sigterm)
{
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
worker_spi_naptime * 1000L,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
//if(toggle==0){
PushActiveSnapshot(GetTransactionSnapshot());
//}
/* CS631 start */
/* We can now execute queries via SPI */
char *sql10 = "select table_name,attr,count(*) from aidx_queries group by table_name,attr;";
if(toggle==0){
pgstat_report_activity(STATE_RUNNING,sql10 );
ret = SPI_execute(sql10, false, 0);
index_creation_query=outExecResult(ret);
}
else{
pgstat_report_activity(STATE_RUNNING, index_creation_query);
//printf("%d Meet\n",ret);
//fflush(stdout);
printf("INDEX TO CREATE:\n%s\n",index_creation_query);
fflush(stdout);
ind_ret = SPI_execute(index_creation_query,false,0);
printf("INDEX DONE STATUS = %d\n",ind_ret);
fflush(stdout);
}
/* CS631 end */
if (ret != SPI_OK_SELECT)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
//elog(LOG,"%s",SPI_tuptable->vals[0]);
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* And finish our transaction.
*/
SPI_finish();
//if(toggle==0){
PopActiveSnapshot();
//}
CommitTransactionCommand();
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);
//#######################################################################
/* CS631 start */
toggle = 1-toggle;
/* CS631 end */
}
proc_exit(1);
}
/*
* Entrypoint of this module.
*
* We register more than one worker process here, to demonstrate how that can
* be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
unsigned int i;
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
20,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
1,
1,
100,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_spi.database",
"Database to connect to.",
NULL,
&worker_spi_database,
"postgres",
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/* set up common data for all our workers */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
worker.bgw_notify_pid = 0;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
/*
* Dynamically launch an SPI worker.
*/
Datum
worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
worker.bgw_main_arg = Int32GetDatum(i);
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}
# worker_spi extension
comment = 'Sample background worker'
default_version = '1.0'
module_pathname = '$libdir/worker_spi'
relocatable = true
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment