Commit a3519a2f authored by Hiroshi Inoue's avatar Hiroshi Inoue

Allow

  CREATE VIEW as SELECT CTID, ....
  SELECT currtid( a view, ..).
parent 79420840
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.201 2002/05/21 22:05:53 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.202 2002/05/22 07:46:58 inoue Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
...@@ -353,7 +353,7 @@ heap_storage_create(Relation rel) ...@@ -353,7 +353,7 @@ heap_storage_create(Relation rel)
* -------------------------------- * --------------------------------
*/ */
static void static void
CheckAttributeNames(TupleDesc tupdesc, bool relhasoids) CheckAttributeNames(TupleDesc tupdesc, bool relhasoids, int relkind)
{ {
int i; int i;
int j; int j;
...@@ -365,17 +365,18 @@ CheckAttributeNames(TupleDesc tupdesc, bool relhasoids) ...@@ -365,17 +365,18 @@ CheckAttributeNames(TupleDesc tupdesc, bool relhasoids)
* also, warn user if attribute to be created has an unknown typid * also, warn user if attribute to be created has an unknown typid
* (usually as a result of a 'retrieve into' - jolly * (usually as a result of a 'retrieve into' - jolly
*/ */
for (i = 0; i < natts; i++) if (relkind != RELKIND_VIEW)
{ for (i = 0; i < natts; i++)
if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname), {
if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
relhasoids) != NULL) relhasoids) != NULL)
elog(ERROR, "name of column \"%s\" conflicts with an existing system column", elog(ERROR, "name of column \"%s\" conflicts with an existing system column",
NameStr(tupdesc->attrs[i]->attname)); NameStr(tupdesc->attrs[i]->attname));
if (tupdesc->attrs[i]->atttypid == UNKNOWNOID) if (tupdesc->attrs[i]->atttypid == UNKNOWNOID)
elog(WARNING, "Attribute '%s' has an unknown type" elog(WARNING, "Attribute '%s' has an unknown type"
"\n\tProceeding with relation creation anyway", "\n\tProceeding with relation creation anyway",
NameStr(tupdesc->attrs[i]->attname)); NameStr(tupdesc->attrs[i]->attname));
} }
/* /*
* next check for repeated attribute names * next check for repeated attribute names
...@@ -402,7 +403,8 @@ CheckAttributeNames(TupleDesc tupdesc, bool relhasoids) ...@@ -402,7 +403,8 @@ CheckAttributeNames(TupleDesc tupdesc, bool relhasoids)
static void static void
AddNewAttributeTuples(Oid new_rel_oid, AddNewAttributeTuples(Oid new_rel_oid,
TupleDesc tupdesc, TupleDesc tupdesc,
bool relhasoids) bool relhasoids,
int relkind)
{ {
Form_pg_attribute *dpp; Form_pg_attribute *dpp;
int i; int i;
...@@ -453,36 +455,36 @@ AddNewAttributeTuples(Oid new_rel_oid, ...@@ -453,36 +455,36 @@ AddNewAttributeTuples(Oid new_rel_oid,
* next we add the system attributes. Skip OID if rel has no OIDs. * next we add the system attributes. Skip OID if rel has no OIDs.
*/ */
dpp = SysAtt; dpp = SysAtt;
for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++) if (relkind != RELKIND_VIEW)
{ for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++)
if (relhasoids || (*dpp)->attnum != ObjectIdAttributeNumber)
{ {
Form_pg_attribute attStruct; if (relhasoids || (*dpp)->attnum != ObjectIdAttributeNumber)
{
Form_pg_attribute attStruct;
tup = heap_addheader(Natts_pg_attribute, tup = heap_addheader(Natts_pg_attribute,
ATTRIBUTE_TUPLE_SIZE, ATTRIBUTE_TUPLE_SIZE,
(void *) *dpp); (void *) *dpp);
/* Fill in the correct relation OID in the copied tuple */ /* Fill in the correct relation OID in the copied tuple */
attStruct = (Form_pg_attribute) GETSTRUCT(tup); attStruct = (Form_pg_attribute) GETSTRUCT(tup);
attStruct->attrelid = new_rel_oid; attStruct->attrelid = new_rel_oid;
/* /*
* Unneeded since they should be OK in the constant data * Unneeded since they should be OK in the constant data
* anyway * anyway
*/ */
/* attStruct->attstattarget = 0; */ /* attStruct->attstattarget = 0; */
/* attStruct->attcacheoff = -1; */ /* attStruct->attcacheoff = -1; */
simple_heap_insert(rel, tup); simple_heap_insert(rel, tup);
if (hasindex) if (hasindex)
CatalogIndexInsert(idescs, Num_pg_attr_indices, rel, tup); CatalogIndexInsert(idescs, Num_pg_attr_indices, rel, tup);
heap_freetuple(tup); heap_freetuple(tup);
}
} }
dpp++;
}
/* /*
* close pg_attribute indices * close pg_attribute indices
...@@ -664,7 +666,7 @@ heap_create_with_catalog(const char *relname, ...@@ -664,7 +666,7 @@ heap_create_with_catalog(const char *relname,
elog(ERROR, "Number of columns is out of range (1 to %d)", elog(ERROR, "Number of columns is out of range (1 to %d)",
MaxHeapAttributeNumber); MaxHeapAttributeNumber);
CheckAttributeNames(tupdesc, relhasoids); CheckAttributeNames(tupdesc, relhasoids, relkind);
if (get_relname_relid(relname, relnamespace)) if (get_relname_relid(relname, relnamespace))
elog(ERROR, "Relation '%s' already exists", relname); elog(ERROR, "Relation '%s' already exists", relname);
...@@ -717,7 +719,7 @@ heap_create_with_catalog(const char *relname, ...@@ -717,7 +719,7 @@ heap_create_with_catalog(const char *relname,
* now add tuples to pg_attribute for the attributes in our new * now add tuples to pg_attribute for the attributes in our new
* relation. * relation.
*/ */
AddNewAttributeTuples(new_rel_oid, new_rel_desc->rd_att, relhasoids); AddNewAttributeTuples(new_rel_oid, new_rel_desc->rd_att, relhasoids, relkind);
/* /*
* store constraints and defaults passed in the tupdesc, if any. * store constraints and defaults passed in the tupdesc, if any.
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/adt/tid.c,v 1.29 2002/03/30 01:02:41 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/adt/tid.c,v 1.30 2002/05/22 07:46:58 inoue Exp $
* *
* NOTES * NOTES
* input routine largely stolen from boxin(). * input routine largely stolen from boxin().
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "catalog/pg_type.h"
#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X)) #define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X))
#define ItemPointerGetDatum(X) PointerGetDatum(X) #define ItemPointerGetDatum(X) PointerGetDatum(X)
...@@ -133,6 +134,65 @@ setLastTid(const ItemPointer tid) ...@@ -133,6 +134,65 @@ setLastTid(const ItemPointer tid)
Current_last_tid = *tid; Current_last_tid = *tid;
} }
/*
* Handle CTIDs of views.
* CTID should be defined in the view and it must
* correspond to the CTID of a base relation.
*/
static Datum
currtid_for_view(Relation viewrel, ItemPointer tid)
{
TupleDesc att = RelationGetDescr(viewrel);
RuleLock *rulelock;
RewriteRule *rewrite;
int i, natts = att->natts, tididx = -1;
for (i = 0; i < natts ; i++)
{
if (strcasecmp(NameStr(att->attrs[i]->attname), "ctid") == 0)
{
if (att->attrs[i]->atttypid != TIDOID)
elog(ERROR, "ctid isn't of type TID");
tididx = i;
}
}
if (tididx < 0)
elog(ERROR, "currtid can't handle views with no CTID");
if (rulelock = viewrel->rd_rules, !rulelock)
elog(ERROR, "the view has no rules");
for (i = 0; i < rulelock->numLocks; i++)
{
rewrite = rulelock->rules[i];
if (rewrite->event == CMD_SELECT)
{
Query *query;
TargetEntry *tle;
if (length(rewrite->actions) != 1)
elog(ERROR, "only one select rule is allowed in views");
query = (Query *) lfirst(rewrite->actions);
tle = (TargetEntry *) nth(tididx, query->targetList);
if (tle && tle->expr && nodeTag(tle->expr) == T_Var)
{
Var *var = (Var *) tle->expr;
RangeTblEntry *rte;
if (var->varno > 0 && var->varno < INNER && var->varattno == SelfItemPointerAttributeNumber)
{
rte = (RangeTblEntry *) nth(var->varno - 1, query->rtable);
if (rte)
{
heap_close(viewrel, AccessShareLock);
return DirectFunctionCall2(currtid_byreloid, ObjectIdGetDatum(rte->relid), PointerGetDatum(tid));
}
}
}
break;
}
}
elog(ERROR, "currtid can't handle this view");
return (Datum) 0;
}
Datum Datum
currtid_byreloid(PG_FUNCTION_ARGS) currtid_byreloid(PG_FUNCTION_ARGS)
{ {
...@@ -149,6 +209,8 @@ currtid_byreloid(PG_FUNCTION_ARGS) ...@@ -149,6 +209,8 @@ currtid_byreloid(PG_FUNCTION_ARGS)
} }
rel = heap_open(reloid, AccessShareLock); rel = heap_open(reloid, AccessShareLock);
if (rel->rd_rel->relkind == RELKIND_VIEW)
return currtid_for_view(rel, tid);
ItemPointerCopy(tid, result); ItemPointerCopy(tid, result);
heap_get_latest_tid(rel, SnapshotNow, result); heap_get_latest_tid(rel, SnapshotNow, result);
...@@ -170,6 +232,8 @@ currtid_byrelname(PG_FUNCTION_ARGS) ...@@ -170,6 +232,8 @@ currtid_byrelname(PG_FUNCTION_ARGS)
relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname, relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname,
"currtid_byrelname")); "currtid_byrelname"));
rel = heap_openrv(relrv, AccessShareLock); rel = heap_openrv(relrv, AccessShareLock);
if (rel->rd_rel->relkind == RELKIND_VIEW)
return currtid_for_view(rel, tid);
result = (ItemPointer) palloc(sizeof(ItemPointerData)); result = (ItemPointer) palloc(sizeof(ItemPointerData));
ItemPointerCopy(tid, result); ItemPointerCopy(tid, result);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment