command.c 31.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * command.c
4
 *	  random postgres portal and utility support code
5
 *
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
6 7
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/commands/Attic/command.c,v 1.78 2000/06/09 15:50:43 momjian Exp $
12 13
 *
 * NOTES
14 15 16 17 18 19 20 21
 *	  The PortalExecutorHeapMemory crap needs to be eliminated
 *	  by designing a better executor / portal processing memory
 *	  interface.
 *
 *	  The PerformAddAttribute() code, like most of the relation
 *	  manipulating code in the commands/ directory, should go
 *	  someplace closer to the lib/catalog code.
 *
22 23
 *-------------------------------------------------------------------------
 */
Bruce Momjian's avatar
Bruce Momjian committed
24 25 26 27
#include "postgres.h"

#include "catalog/catalog.h"
#include "catalog/catname.h"
Bruce Momjian's avatar
Bruce Momjian committed
28
#include "catalog/indexing.h"
29
#include "catalog/pg_attrdef.h"
Bruce Momjian's avatar
Bruce Momjian committed
30
#include "commands/command.h"
31
#include "executor/spi.h"
32
#include "catalog/heap.h"
33
#include "catalog/pg_shadow.h"
Bruce Momjian's avatar
Bruce Momjian committed
34
#include "miscadmin.h"
Bruce Momjian's avatar
Bruce Momjian committed
35 36
#include "optimizer/prep.h"
#include "utils/acl.h"
37
#include "utils/fmgroids.h"
38
#include "commands/trigger.h"
39 40
#ifdef	_DROP_COLUMN_HACK__
#include "catalog/pg_index.h"
41 42 43 44 45
#include "catalog/pg_relcheck.h"
#include "access/genam.h"
#include "commands/comment.h"
#include "commands/defrem.h"
#include "optimizer/clauses.h"
46
#include "parser/parse.h"
47
#endif	 /* _DROP_COLUMN_HACK__ */
48

49
/* ----------------
50
 *		PortalExecutorHeapMemory stuff
51
 *
52
 *		This is where the XXXSuperDuperHacky code was. -cim 3/15/90
53 54
 * ----------------
 */
55
MemoryContext PortalExecutorHeapMemory = NULL;
56 57

/* --------------------------------
58
 *		PortalCleanup
59 60 61 62 63
 * --------------------------------
 */
void
PortalCleanup(Portal portal)
{
64
	MemoryContext context;
65 66 67 68 69 70 71 72 73 74 75 76 77

	/* ----------------
	 *	sanity checks
	 * ----------------
	 */
	AssertArg(PortalIsValid(portal));
	AssertArg(portal->cleanup == PortalCleanup);

	/* ----------------
	 *	set proper portal-executor context before calling ExecMain.
	 * ----------------
	 */
	context = MemoryContextSwitchTo((MemoryContext) PortalGetHeapMemory(portal));
Bruce Momjian's avatar
Bruce Momjian committed
78
	PortalExecutorHeapMemory = (MemoryContext) PortalGetHeapMemory(portal);
79 80 81 82 83 84 85 86 87 88 89 90 91

	/* ----------------
	 *	tell the executor to shutdown the query
	 * ----------------
	 */
	ExecutorEnd(PortalGetQueryDesc(portal), PortalGetState(portal));

	/* ----------------
	 *	switch back to previous context
	 * ----------------
	 */
	MemoryContextSwitchTo(context);
	PortalExecutorHeapMemory = (MemoryContext) NULL;
92 93 94
}

/* --------------------------------
95
 *		PerformPortalFetch
96 97 98 99
 * --------------------------------
 */
void
PerformPortalFetch(char *name,
100 101 102 103
				   bool forward,
				   int count,
				   char *tag,
				   CommandDest dest)
104
{
105 106 107 108
	Portal		portal;
	int			feature;
	QueryDesc  *queryDesc;
	MemoryContext context;
109
	Const		limcount;
110 111 112 113 114 115 116 117 118 119 120

	/* ----------------
	 *	sanity checks
	 * ----------------
	 */
	if (name == NULL)
	{
		elog(NOTICE, "PerformPortalFetch: blank portal unsupported");
		return;
	}

121
	/* ----------------
Bruce Momjian's avatar
Bruce Momjian committed
122
	 *	Create a const node from the given count value
123 124 125
	 * ----------------
	 */
	memset(&limcount, 0, sizeof(limcount));
Bruce Momjian's avatar
Bruce Momjian committed
126 127 128 129 130
	limcount.type = T_Const;
	limcount.consttype = INT4OID;
	limcount.constlen = sizeof(int4);
	limcount.constvalue = (Datum) count;
	limcount.constisnull = FALSE;
131 132
	limcount.constbyval = TRUE;
	limcount.constisset = FALSE;
Bruce Momjian's avatar
Bruce Momjian committed
133
	limcount.constiscast = FALSE;
134 135


136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	/* ----------------
	 *	get the portal from the portal name
	 * ----------------
	 */
	portal = GetPortalByName(name);
	if (!PortalIsValid(portal))
	{
		elog(NOTICE, "PerformPortalFetch: portal \"%s\" not found",
			 name);
		return;
	}

	/* ----------------
	 *	switch into the portal context
	 * ----------------
	 */
	context = MemoryContextSwitchTo((MemoryContext) PortalGetHeapMemory(portal));

154
	AssertState(context == (MemoryContext) PortalGetHeapMemory(GetPortalByName(NULL)));
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169

	/* ----------------
	 *	setup "feature" to tell the executor what direction and
	 *	how many tuples to fetch.
	 * ----------------
	 */
	if (forward)
		feature = EXEC_FOR;
	else
		feature = EXEC_BACK;

	/* ----------------
	 *	tell the destination to prepare to recieve some tuples
	 * ----------------
	 */
170
	queryDesc = PortalGetQueryDesc(portal);
171 172

	if (dest == None)			/* MOVE */
173
	{
174 175 176
		QueryDesc  *qdesc = (QueryDesc *) palloc(sizeof(QueryDesc));

		memcpy(qdesc, queryDesc, sizeof(QueryDesc));
177 178 179
		qdesc->dest = dest;
		queryDesc = qdesc;
	}
180

181
	BeginCommand(name,
182
				 queryDesc->operation,
183
				 portal->attinfo,		/* QueryDescGetTypeInfo(queryDesc),
184
										 * */
185 186 187 188 189 190 191 192 193 194 195
				 false,			/* portal fetches don't end up in
								 * relations */
				 false,			/* this is a portal fetch, not a "retrieve
								 * portal" */
				 tag,
				 dest);

	/* ----------------
	 *	execute the portal fetch operation
	 * ----------------
	 */
Bruce Momjian's avatar
Bruce Momjian committed
196
	PortalExecutorHeapMemory = (MemoryContext) PortalGetHeapMemory(portal);
197

Bruce Momjian's avatar
Bruce Momjian committed
198 199
	ExecutorRun(queryDesc, PortalGetState(portal), feature,
				(Node *) NULL, (Node *) &limcount);
200

201 202 203
	if (dest == None)			/* MOVE */
		pfree(queryDesc);

204 205 206 207 208 209 210 211 212 213 214 215
	/* ----------------
	 * Note: the "end-of-command" tag is returned by higher-level
	 *		 utility code
	 *
	 * Return blank portal for now.
	 * Otherwise, this named portal will be cleaned.
	 * Note: portals will only be supported within a BEGIN...END
	 * block in the near future.  Later, someone will fix it to
	 * do what is possible across transaction boundries.
	 * ----------------
	 */
	MemoryContextSwitchTo(
Bruce Momjian's avatar
Bruce Momjian committed
216
			 (MemoryContext) PortalGetHeapMemory(GetPortalByName(NULL)));
217 218 219
}

/* --------------------------------
220
 *		PerformPortalClose
221 222 223 224 225
 * --------------------------------
 */
void
PerformPortalClose(char *name, CommandDest dest)
{
226
	Portal		portal;
227 228 229 230 231 232 233 234 235 236 237

	/* ----------------
	 *	sanity checks
	 * ----------------
	 */
	if (name == NULL)
	{
		elog(NOTICE, "PerformPortalClose: blank portal unsupported");
		return;
	}

238 239 240 241 242
	if (PortalNameIsSpecial(name))
		elog(ERROR,
			 "The portal name \"%s\" is reserved for internal use",
			 name);

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
	/* ----------------
	 *	get the portal from the portal name
	 * ----------------
	 */
	portal = GetPortalByName(name);
	if (!PortalIsValid(portal))
	{
		elog(NOTICE, "PerformPortalClose: portal \"%s\" not found",
			 name);
		return;
	}

	/* ----------------
	 *	Note: PortalCleanup is called as a side-effect
	 * ----------------
	 */
259
	PortalDrop(&portal);
260 261 262
}

/* ----------------
263
 *		AlterTableAddColumn
264
 *		(formerly known as PerformAddAttribute)
265
 *
266
 *		adds an additional attribute to a relation
267
 *
268 269 270 271 272
 *		Adds attribute field(s) to a relation.	Each new attribute
 *		is given attnums in sequential order and is added to the
 *		ATTRIBUTE relation.  If the AMI fails, defunct tuples will
 *		remain in the ATTRIBUTE relation for later vacuuming.
 *		Later, there may be some reserved attribute names???
273
 *
274
 *		(If needed, can instead use elog to handle exceptions.)
275
 *
276 277 278 279 280
 *		Note:
 *				Initial idea of ordering the tuple attributes so that all
 *		the variable length domains occured last was scratched.  Doing
 *		so would not speed access too much (in general) and would create
 *		many complications in formtuple, amgetattr, and addattribute.
281
 *
282 283 284 285 286 287 288 289 290
 *		scan attribute catalog for name conflict (within rel)
 *		scan type catalog for absence of data type (if not arg)
 *		create attnum magically???
 *		create attribute tuple
 *		insert attribute in attribute catalog
 *		modify reldesc
 *		create new relation tuple
 *		insert new relation in relation catalog
 *		delete original relation from relation catalog
291 292 293
 * ----------------
 */
void
294
AlterTableAddColumn(const char *relationName,
295
					bool inherits,
296
					ColumnDef *colDef)
297
{
298
	Relation	rel,
299
				attrdesc;
300
	Oid			myrelid;
301 302
	HeapTuple	reltup;
	HeapTuple	attributeTuple;
303
	Form_pg_attribute attribute;
304
	FormData_pg_attribute attributeD;
305 306 307 308 309 310 311
	int			i;
	int			minattnum,
				maxatts;
	HeapTuple	tup;
	Relation	idescs[Num_pg_attr_indices];
	Relation	ridescs[Num_pg_class_indices];
	bool		hasindex;
312 313 314 315 316 317 318

	/*
	 * permissions checking.  this would normally be done in utility.c,
	 * but this particular routine is recursive.
	 *
	 * normally, only the owner of a class can change its schema.
	 */
319
	if (!allowSystemTableMods && IsSystemRelationName(relationName))
320
		elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog",
321
			 relationName);
322
#ifndef NO_SECURITY
323 324
	if (!pg_ownercheck(UserName, relationName, RELNAME))
		elog(ERROR, "ALTER TABLE: permission denied");
325
#endif
326

327
	/*
328 329
	 * Grab an exclusive lock on the target table, which we will NOT
	 * release until end of transaction.
330
	 */
331
	rel = heap_openr(relationName, AccessExclusiveLock);
332 333 334
	myrelid = RelationGetRelid(rel);
	heap_close(rel, NoLock);	/* close rel but keep lock! */

335 336 337 338
	/*
	 * we can't add a not null attribute
	 */
	if (colDef->is_not_null)
339
		elog(ERROR, "Can't add a NOT NULL attribute to an existing relation");
340

341
	if (colDef->raw_default || colDef->cooked_default)
342
		elog(ERROR, "Adding columns with defaults is not implemented.");
343

344 345 346 347 348 349 350 351 352 353 354 355 356 357

	/*
	 * if the first element in the 'schema' list is a "*" then we are
	 * supposed to add this attribute to all classes that inherit from
	 * 'relationName' (as well as to 'relationName').
	 *
	 * any permissions or problems with duplicate attributes will cause the
	 * whole transaction to abort, which is what we want -- all or
	 * nothing.
	 */
	if (colDef != NULL)
	{
		if (inherits)
		{
358 359
			List	   *child,
					   *children;
360 361

			/* this routine is actually in the planner */
362
			children = find_all_inheritors(myrelid);
363 364 365 366 367 368 369 370

			/*
			 * find_all_inheritors does the recursive search of the
			 * inheritance hierarchy, so all we have to do is process all
			 * of the relids in the list that it returns.
			 */
			foreach(child, children)
			{
371
				Oid			childrelid = lfirsti(child);
372

373 374
				if (childrelid == myrelid)
					continue;
375
				rel = heap_open(childrelid, AccessExclusiveLock);
376 377
				AlterTableAddColumn(RelationGetRelationName(rel),
									false, colDef);
378
				heap_close(rel, AccessExclusiveLock);
379
			}
380
		}
381 382
	}

383
	rel = heap_openr(RelationRelationName, RowExclusiveLock);
384

385 386 387 388 389
	reltup = SearchSysCacheTupleCopy(RELNAME,
									 PointerGetDatum(relationName),
									 0, 0, 0);

	if (!HeapTupleIsValid(reltup))
390
		elog(ERROR, "ALTER TABLE: relation \"%s\" not found",
391 392
			 relationName);

393
	/*
394
	 * XXX is the following check sufficient?
395
	 */
396
	if (((Form_pg_class) GETSTRUCT(reltup))->relkind != RELKIND_RELATION)
397
	{
398
		elog(ERROR, "ALTER TABLE: relation \"%s\" is not a table",
399 400 401 402 403 404
			 relationName);
	}

	minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
	maxatts = minattnum + 1;
	if (maxatts > MaxHeapAttributeNumber)
405
		elog(ERROR, "ALTER TABLE: relations limited to %d columns",
406 407
			 MaxHeapAttributeNumber);

408
	attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
409

410
	/*
411
	 * Open all (if any) pg_attribute indices
412
	 */
413
	hasindex = RelationGetForm(attrdesc)->relhasindex;
414 415 416
	if (hasindex)
		CatalogOpenIndices(Num_pg_attr_indices, Name_pg_attr_indices, idescs);

417
	attributeD.attrelid = reltup->t_data->t_oid;
418 419 420 421 422

	attributeTuple = heap_addheader(Natts_pg_attribute,
									sizeof attributeD,
									(char *) &attributeD);

423
	attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
424 425 426 427

	i = 1 + minattnum;

	{
428
		HeapTuple	typeTuple;
429
		Form_pg_type tform;
430
		char	   *typename;
431
		int			attnelems;
432

433
		tup = SearchSysCacheTuple(ATTNAME,
434
								  ObjectIdGetDatum(reltup->t_data->t_oid),
435 436
								  PointerGetDatum(colDef->colname),
								  0, 0);
437 438

		if (HeapTupleIsValid(tup))
439
			elog(ERROR, "ALTER TABLE: column name \"%s\" already exists in table \"%s\"",
440
				 colDef->colname, relationName);
441 442 443 444 445

		/*
		 * check to see if it is an array attribute.
		 */

446
		typename = colDef->typename->name;
447 448 449 450

		if (colDef->typename->arrayBounds)
		{
			attnelems = length(colDef->typename->arrayBounds);
451
			typename = makeArrayTypeName(colDef->typename->name);
452 453 454 455
		}
		else
			attnelems = 0;

456
		typeTuple = SearchSysCacheTuple(TYPENAME,
457
										PointerGetDatum(typename),
458
										0, 0, 0);
459
		tform = (Form_pg_type) GETSTRUCT(typeTuple);
460 461

		if (!HeapTupleIsValid(typeTuple))
462
			elog(ERROR, "ALTER TABLE: type \"%s\" does not exist", typename);
463
		namestrcpy(&(attribute->attname), colDef->colname);
464
		attribute->atttypid = typeTuple->t_data->t_oid;
465
		attribute->attlen = tform->typlen;
466
		attribute->attdisbursion = 0;
Bruce Momjian's avatar
Bruce Momjian committed
467
		attribute->attcacheoff = -1;
468
		attribute->atttypmod = colDef->typename->typmod;
469
		attribute->attnum = i;
470
		attribute->attbyval = tform->typbyval;
471
		attribute->attnelems = attnelems;
472
		attribute->attisset = (bool) (tform->typtype == 'c');
473
		attribute->attstorage = 'p';
474
		attribute->attalign = tform->typalign;
475
		attribute->attnotnull = false;
476 477
		attribute->atthasdef = (colDef->raw_default != NULL ||
								colDef->cooked_default != NULL);
478 479 480 481 482 483 484

		heap_insert(attrdesc, attributeTuple);
		if (hasindex)
			CatalogIndexInsert(idescs,
							   Num_pg_attr_indices,
							   attrdesc,
							   attributeTuple);
485
	}
486

487
	if (hasindex)
488
		CatalogCloseIndices(Num_pg_attr_indices, idescs);
489 490

	heap_close(attrdesc, RowExclusiveLock);
491 492

	((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
493
	heap_update(rel, &reltup->t_self, reltup, NULL);
494 495 496

	/* keep catalog indices current */
	CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, ridescs);
497
	CatalogIndexInsert(ridescs, Num_pg_class_indices, rel, reltup);
498 499
	CatalogCloseIndices(Num_pg_class_indices, ridescs);

500
	heap_freetuple(reltup);
501 502 503 504 505 506 507 508 509 510 511 512 513 514

	heap_close(rel, NoLock);
}



static void drop_default(Oid relid, int16 attnum);


/*
 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
 */
void
AlterTableAlterColumn(const char *relationName,
515 516
					  bool inh, const char *colName,
					  Node *newDefault)
517
{
518 519 520 521
	Relation	rel;
	HeapTuple	tuple;
	int16		attnum;
	Oid			myrelid;
522 523 524 525 526 527 528 529 530

	if (!allowSystemTableMods && IsSystemRelationName(relationName))
		elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog",
			 relationName);
#ifndef NO_SECURITY
	if (!pg_ownercheck(UserName, relationName, RELNAME))
		elog(ERROR, "ALTER TABLE: permission denied");
#endif

531 532 533
	rel = heap_openr(relationName, AccessExclusiveLock);
	myrelid = RelationGetRelid(rel);
	heap_close(rel, NoLock);
534

535 536 537
	/*
	 * Propagate to children if desired
	 */
538
	if (inh)
539 540 541 542 543 544 545 546 547 548 549 550 551
	{
		List	   *child,
				   *children;

		/* this routine is actually in the planner */
		children = find_all_inheritors(myrelid);

		/*
		 * find_all_inheritors does the recursive search of the
		 * inheritance hierarchy, so all we have to do is process all of
		 * the relids in the list that it returns.
		 */
		foreach(child, children)
552
		{
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
			Oid			childrelid = lfirsti(child);

			if (childrelid == myrelid)
				continue;
			rel = heap_open(childrelid, AccessExclusiveLock);
			AlterTableAlterColumn(RelationGetRelationName(rel),
								  false, colName, newDefault);
			heap_close(rel, AccessExclusiveLock);
		}
	}

	/* -= now do the thing on this relation =- */

	/* reopen the business */
	rel = heap_openr((char *) relationName, AccessExclusiveLock);

	/*
	 * get the number of the attribute
	 */
	tuple = SearchSysCacheTuple(ATTNAME,
								ObjectIdGetDatum(myrelid),
								NameGetDatum(namein((char *) colName)),
								0, 0);

	if (!HeapTupleIsValid(tuple))
	{
		heap_close(rel, AccessExclusiveLock);
		elog(ERROR, "ALTER TABLE: relation \"%s\" has no column \"%s\"",
			 relationName, colName);
	}

	attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;

	if (newDefault)				/* SET DEFAULT */
	{
		List	   *rawDefaults = NIL;
		RawColumnDefault *rawEnt;

		/* Get rid of the old one first */
		drop_default(myrelid, attnum);
593 594 595

		rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
		rawEnt->attnum = attnum;
596
		rawEnt->raw_default = newDefault;
597 598
		rawDefaults = lappend(rawDefaults, rawEnt);

599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
		/*
		 * This function is intended for CREATE TABLE, so it processes a
		 * _list_ of defaults, but we just do one.
		 */
		AddRelationRawConstraints(rel, rawDefaults, NIL);
	}

	else
/* DROP DEFAULT */
	{
		Relation	attr_rel;
		ScanKeyData scankeys[3];
		HeapScanDesc scan;
		HeapTuple	tuple;

		attr_rel = heap_openr(AttributeRelationName, AccessExclusiveLock);
615 616
		ScanKeyEntryInitialize(&scankeys[0], 0x0,
							   Anum_pg_attribute_attrelid, F_OIDEQ,
617
							   ObjectIdGetDatum(myrelid));
618 619
		ScanKeyEntryInitialize(&scankeys[1], 0x0,
							   Anum_pg_attribute_attnum, F_INT2EQ,
620
							   Int16GetDatum(attnum));
621 622 623
		ScanKeyEntryInitialize(&scankeys[2], 0x0,
							   Anum_pg_attribute_atthasdef, F_BOOLEQ,
							   Int32GetDatum(TRUE));
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649

		scan = heap_beginscan(attr_rel, false, SnapshotNow, 3, scankeys);
		AssertState(scan != NULL);

		if (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
		{
			HeapTuple	newtuple;
			Relation	irelations[Num_pg_attr_indices];

			/* update to false */
			newtuple = heap_copytuple(tuple);
			((Form_pg_attribute) GETSTRUCT(newtuple))->atthasdef = FALSE;
			heap_update(attr_rel, &tuple->t_self, newtuple, NULL);

			/* keep the system catalog indices current */
			CatalogOpenIndices(Num_pg_attr_indices, Name_pg_attr_indices, irelations);
			CatalogIndexInsert(irelations, Num_pg_attr_indices, attr_rel, newtuple);
			CatalogCloseIndices(Num_pg_attr_indices, irelations);

			/* get rid of actual default definition */
			drop_default(myrelid, attnum);
		}

		heap_endscan(scan);
		heap_close(attr_rel, NoLock);
	}
650 651 652 653 654 655 656 657 658

	heap_close(rel, NoLock);
}



static void
drop_default(Oid relid, int16 attnum)
{
659 660 661 662
	ScanKeyData scankeys[2];
	HeapScanDesc scan;
	Relation	attrdef_rel;
	HeapTuple	tuple;
663

664
	attrdef_rel = heap_openr(AttrDefaultRelationName, AccessExclusiveLock);
665 666
	ScanKeyEntryInitialize(&scankeys[0], 0x0,
						   Anum_pg_attrdef_adrelid, F_OIDEQ,
667
						   ObjectIdGetDatum(relid));
668 669
	ScanKeyEntryInitialize(&scankeys[1], 0x0,
						   Anum_pg_attrdef_adnum, F_INT2EQ,
670
						   Int16GetDatum(attnum));
671

672 673
	scan = heap_beginscan(attrdef_rel, false, SnapshotNow, 2, scankeys);
	AssertState(scan != NULL);
674

675 676
	if (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
		heap_delete(attrdef_rel, &tuple->t_self, NULL);
677

678
	heap_endscan(scan);
679

680
	heap_close(attrdef_rel, NoLock);
681 682 683
}


684 685 686
#ifdef	_DROP_COLUMN_HACK__
/*
 *	ALTER TABLE DROP COLUMN trial implementation
687
 *
688 689 690 691 692 693 694 695 696
 */

/*
 *	system table scan(index scan/sequential scan)
 */
typedef struct SysScanDescData
{
	Relation	heap_rel;
	Relation	irel;
697 698 699
	HeapScanDesc scan;
	IndexScanDesc iscan;
	HeapTupleData tuple;
700
	Buffer		buffer;
701 702
}			SysScanDescData, *SysScanDesc;

703 704 705
static void *
systable_beginscan(Relation rel, const char *indexRelname, int nkeys, ScanKey entry)
{
706 707
	bool		hasindex = (rel->rd_rel->relhasindex && !IsIgnoringSystemIndexes());
	SysScanDesc sysscan;
708 709 710 711 712 713 714 715 716

	sysscan = (SysScanDesc) palloc(sizeof(SysScanDescData));
	sysscan->heap_rel = rel;
	sysscan->irel = (Relation) NULL;
	sysscan->tuple.t_datamcxt = NULL;
	sysscan->tuple.t_data = NULL;
	sysscan->buffer = InvalidBuffer;
	if (hasindex)
	{
717
		sysscan->irel = index_openr((char *) indexRelname);
718 719 720 721 722 723 724 725 726
		sysscan->iscan = index_beginscan(sysscan->irel, false, nkeys, entry);
	}
	else
		sysscan->scan = heap_beginscan(rel, false, SnapshotNow, nkeys, entry);
	return (void *) sysscan;
}
static void
systable_endscan(void *scan)
{
727
	SysScanDesc sysscan = (SysScanDesc) scan;
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742

	if (sysscan->irel)
	{
		if (BufferIsValid(sysscan->buffer))
			ReleaseBuffer(sysscan->buffer);
		index_endscan(sysscan->iscan);
		index_close(sysscan->irel);
	}
	else
		heap_endscan(sysscan->scan);
	pfree(scan);
}
static HeapTuple
systable_getnext(void *scan)
{
743
	SysScanDesc sysscan = (SysScanDesc) scan;
744
	HeapTuple	htup = (HeapTuple) NULL;
745
	RetrieveIndexResult indexRes;
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780

	if (sysscan->irel)
	{
		if (BufferIsValid(sysscan->buffer))
		{
			ReleaseBuffer(sysscan->buffer);
			sysscan->buffer = InvalidBuffer;
		}
		while (indexRes = index_getnext(sysscan->iscan, ForwardScanDirection), indexRes != NULL)
		{
			sysscan->tuple.t_self = indexRes->heap_iptr;
			heap_fetch(sysscan->heap_rel, SnapshotNow, &sysscan->tuple, &(sysscan->buffer));
			pfree(indexRes);
			if (sysscan->tuple.t_data != NULL)
			{
				htup = &sysscan->tuple;
				break;
			}
		}
	}
	else
		htup = heap_getnext(sysscan->scan, 0);
	return htup;
}

/*
 *	find a specified attribute in a node entry
 */
static bool
find_attribute_walker(Node *node, int attnum)
{
	if (node == NULL)
		return false;
	if (IsA(node, Var))
	{
781 782
		Var		   *var = (Var *) node;

783 784 785 786
		if (var->varlevelsup == 0 && var->varno == 1 &&
			var->varattno == attnum)
			return true;
	}
787
	return expression_tree_walker(node, find_attribute_walker, (void *) attnum);
788 789 790 791
}
static bool
find_attribute_in_node(Node *node, int attnum)
{
792
	return expression_tree_walker(node, find_attribute_walker, (void *) attnum);
793
}
794

795 796 797 798 799 800
/*
 *	Remove/check references for the column
 */
static bool
RemoveColumnReferences(Oid reloid, int attnum, bool checkonly, HeapTuple reltup)
{
801 802 803 804 805 806 807 808 809 810 811
	Relation	indexRelation,
				rcrel;
	ScanKeyData entry;
	HeapScanDesc scan;
	void	   *sysscan;
	HeapTuple	htup,
				indexTuple;
	Form_pg_index index;
	Form_pg_relcheck relcheck;
	Form_pg_class pgcform = (Form_pg_class) NULL;
	int			i;
812 813
	bool		checkok = true;

814

815
	if (!checkonly)
816 817
		pgcform = (Form_pg_class) GETSTRUCT(reltup);

818
	/*
819
	 * Remove/check constraints here
820 821
	 */
	ScanKeyEntryInitialize(&entry, (bits16) 0x0, Anum_pg_relcheck_rcrelid,
822
					   (RegProcedure) F_OIDEQ, ObjectIdGetDatum(reloid));
823
	rcrel = heap_openr(RelCheckRelationName, RowExclusiveLock);
824
	sysscan = systable_beginscan(rcrel, RelCheckIndex, 1, &entry);
825 826 827

	while (HeapTupleIsValid(htup = systable_getnext(sysscan)))
	{
828 829
		char	   *ccbin;
		Node	   *node;
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854

		relcheck = (Form_pg_relcheck) GETSTRUCT(htup);
		ccbin = textout(&relcheck->rcbin);
		if (!ccbin)
			continue;
		node = stringToNode(ccbin);
		pfree(ccbin);
		if (find_attribute_in_node(node, attnum))
		{
			if (checkonly)
			{
				checkok = false;
				elog(ERROR, "target column is used in a constraint");
			}
			else
			{
				heap_delete(rcrel, &htup->t_self, NULL);
				pgcform->relchecks--;
			}
		}
	}
	systable_endscan(sysscan);
	heap_close(rcrel, NoLock);

	/*
855
	 * What to do with triggers/rules/views/procedues ?
856 857 858
	 */

	/*
859
	 * Remove/check indexes
860 861 862
	 */
	indexRelation = heap_openr(IndexRelationName, RowExclusiveLock);
	ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, F_OIDEQ,
863
						   ObjectIdGetDatum(reloid));
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
	scan = heap_beginscan(indexRelation, false, SnapshotNow, 1, &entry);
	while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
	{
		index = (Form_pg_index) GETSTRUCT(indexTuple);
		for (i = 0; i < INDEX_MAX_KEYS; i++)
		{
			if (index->indkey[i] == InvalidAttrNumber)
				break;
			else if (index->indkey[i] == attnum)
			{
				if (checkonly)
				{
					checkok = false;
					elog(ERROR, "target column is used in an index");
				}
				else
				{
					htup = SearchSysCacheTuple(RELOID,
882 883
									 ObjectIdGetDatum(index->indexrelid),
											   0, 0, 0);
884 885 886 887 888 889 890 891 892 893 894
					RemoveIndex(NameStr(((Form_pg_class) GETSTRUCT(htup))->relname));
				}
				break;
			}
		}
	}
	heap_endscan(scan);
	heap_close(indexRelation, NoLock);

	return checkok;
}
895 896

#endif	 /* _DROP_COLUMN_HACK__ */
897 898 899 900 901 902

/*
 * ALTER TABLE DROP COLUMN
 */
void
AlterTableDropColumn(const char *relationName,
903 904
					 bool inh, const char *colName,
					 int behavior)
905
{
906
#ifdef	_DROP_COLUMN_HACK__
907 908 909 910 911
	Relation	rel,
				attrdesc,
				adrel;
	Oid			myrelid,
				attoid;
912
	HeapTuple	reltup;
913
	HeapTupleData classtuple;
914 915 916 917
	Buffer		buffer;
	Form_pg_attribute attribute;
	HeapTuple	tup;
	Relation	idescs[Num_pg_attr_indices];
918
	int			attnum;
919 920
	bool		hasindex;
	char		dropColname[32];
921 922
	void	   *sysscan;
	ScanKeyData scankeys[2];
923

924
	if (inh)
925
		elog(ERROR, "ALTER TABLE / DROP COLUMN with inherit option is not supported yet");
926

927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
	/*
	 * permissions checking.  this would normally be done in utility.c,
	 * but this particular routine is recursive.
	 *
	 * normally, only the owner of a class can change its schema.
	 */
	if (!allowSystemTableMods && IsSystemRelationName(relationName))
		elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog",
			 relationName);
#ifndef NO_SECURITY
	if (!pg_ownercheck(UserName, relationName, RELNAME))
		elog(ERROR, "ALTER TABLE: permission denied");
#endif

	/*
942 943
	 * Grab an exclusive lock on the target table, which we will NOT
	 * release until end of transaction.
944 945 946 947 948 949
	 */
	rel = heap_openr(relationName, AccessExclusiveLock);
	myrelid = RelationGetRelid(rel);
	heap_close(rel, NoLock);	/* close rel but keep lock! */

	/*
950
	 * What to do when rel has inheritors ?
951 952 953 954 955 956
	 */
	if (length(find_all_inheritors(myrelid)) > 1)
		elog(ERROR, "ALTER TABLE: cannot drop a column on table that is inherited from");


	/*
957
	 * lock the pg_class tuple for update
958 959
	 */
	reltup = SearchSysCacheTuple(RELNAME, PointerGetDatum(relationName),
960
								 0, 0, 0);
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992

	if (!HeapTupleIsValid(reltup))
		elog(ERROR, "ALTER TABLE: relation \"%s\" not found",
			 relationName);
	rel = heap_openr(RelationRelationName, RowExclusiveLock);
	classtuple.t_self = reltup->t_self;
	switch (heap_mark4update(rel, &classtuple, &buffer))
	{
		case HeapTupleSelfUpdated:
		case HeapTupleMayBeUpdated:
			break;
		default:
			elog(ERROR, "couldn't lock pg_class tuple");
	}
	reltup = heap_copytuple(&classtuple);
	ReleaseBuffer(buffer);

	/*
	 * XXX is the following check sufficient?
	 */
	if (((Form_pg_class) GETSTRUCT(reltup))->relkind != RELKIND_RELATION)
	{
		elog(ERROR, "ALTER TABLE: relation \"%s\" is not a table",
			 relationName);
	}

	attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);

	/*
	 * Get the target pg_attribute tuple
	 */
	tup = SearchSysCacheTupleCopy(ATTNAME,
993 994
								  ObjectIdGetDatum(reltup->t_data->t_oid),
								  PointerGetDatum(colName), 0, 0);
995 996
	if (!HeapTupleIsValid(tup))
		elog(ERROR, "ALTER TABLE: column name \"%s\" doesn't exist in table \"%s\"",
997
			 colName, relationName);
998 999 1000 1001 1002 1003

	attribute = (Form_pg_attribute) GETSTRUCT(tup);
	if (attribute->attnum <= 0)
		elog(ERROR, "ALTER TABLE: column name \"%s\" was already dropped", colName);
	attnum = attribute->attnum;
	attoid = tup->t_data->t_oid;
1004

1005
	/*
1006
	 * Check constraints/indices etc here
1007 1008 1009 1010 1011 1012 1013 1014
	 */
	if (behavior != CASCADE)
	{
		if (!RemoveColumnReferences(myrelid, attnum, true, NULL))
			elog(ERROR, "the column is referenced");
	}

	/*
1015
	 * change the target pg_attribute tuple
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
	 */
	sprintf(dropColname, "*already Dropped*%d", attnum);
	namestrcpy(&(attribute->attname), dropColname);
	ATTRIBUTE_DROP_COLUMN(attribute);

	heap_update(attrdesc, &tup->t_self, tup, NULL);
	hasindex = (!IsIgnoringSystemIndexes() && RelationGetForm(attrdesc)->relhasindex);
	if (hasindex)
	{
		CatalogOpenIndices(Num_pg_attr_indices, Name_pg_attr_indices, idescs);
		CatalogIndexInsert(idescs, Num_pg_attr_indices,
1027
						   attrdesc, tup);
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
		CatalogCloseIndices(Num_pg_attr_indices, idescs);
	}
	heap_close(attrdesc, NoLock);
	heap_freetuple(tup);

	/* delete comments */
	DeleteComments(attoid);
	/* delete attrdef */
	adrel = heap_openr(AttrDefaultRelationName, RowExclusiveLock);
	ScanKeyEntryInitialize(&scankeys[0], 0x0, Anum_pg_attrdef_adrelid,
1038 1039 1040 1041 1042 1043 1044 1045
						   F_OIDEQ, ObjectIdGetDatum(myrelid));

	/*
	 * Oops pg_attrdef doesn't have (adrelid,adnum) index
	 * ScanKeyEntryInitialize(&scankeys[1], 0x0, Anum_pg_attrdef_adnum,
	 * F_INT2EQ, Int16GetDatum(attnum)); sysscan =
	 * systable_beginscan(adrel, AttrDefaultIndex, 2, scankeys);
	 */
1046 1047
	sysscan = systable_beginscan(adrel, AttrDefaultIndex, 1, scankeys);
	while (HeapTupleIsValid(tup = systable_getnext(sysscan)))
1048
	{
1049 1050 1051 1052 1053 1054 1055 1056
		if (((Form_pg_attrdef) GETSTRUCT(tup))->adnum == attnum)
		{
			heap_delete(adrel, &tup->t_self, NULL);
			break;
		}
	}
	systable_endscan(sysscan);
	heap_close(adrel, NoLock);
1057

1058
	/*
1059
	 * Remove objects which reference this column
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
	 */
	if (behavior == CASCADE)
	{
		Relation	ridescs[Num_pg_class_indices];

		RemoveColumnReferences(myrelid, attnum, false, reltup);
		/* update pg_class tuple */
		heap_update(rel, &reltup->t_self, reltup, NULL);
		CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, ridescs);
		CatalogIndexInsert(ridescs, Num_pg_class_indices, rel, reltup);
		CatalogCloseIndices(Num_pg_class_indices, ridescs);
	}

	heap_freetuple(reltup);
	heap_close(rel, NoLock);
#else
1076 1077
				elog(ERROR, "ALTER TABLE / DROP COLUMN is not implemented");
#endif	 /* _DROP_COLUMN_HACK__ */
1078
}
1079

1080 1081


1082 1083 1084
/*
 * ALTER TABLE ADD CONSTRAINT
 */
1085 1086
void
AlterTableAddConstraint(const char *relationName,
1087
						bool inh, Node *newConstraint)
1088
{
1089
	if (newConstraint == NULL)
1090 1091
		elog(ERROR, "ALTER TABLE / ADD CONSTRAINT passed invalid constraint.");

1092
	switch (nodeTag(newConstraint))
1093 1094 1095 1096 1097
	{
		case T_Constraint:
			elog(ERROR, "ALTER TABLE / ADD CONSTRAINT is not implemented");
		case T_FkConstraint:
			{
1098 1099
				FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
				Relation	rel;
1100
				HeapScanDesc scan;
1101 1102 1103 1104
				HeapTuple	tuple;
				Trigger		trig;
				List	   *list;
				int			count;
1105

1106
				/*
1107
				 * Grab an exclusive lock on the pk table, so that someone
1108
				 * doesn't delete rows out from under us.
1109 1110 1111 1112 1113
				 */

				rel = heap_openr(fkconstraint->pktable_name, AccessExclusiveLock);
				heap_close(rel, NoLock);

1114 1115 1116 1117 1118 1119
				/*
				 * Grab an exclusive lock on the fk table, and then scan
				 * through each tuple, calling the RI_FKey_Match_Ins
				 * (insert trigger) as if that tuple had just been
				 * inserted.  If any of those fail, it should elog(ERROR)
				 * and that's that.
1120 1121 1122 1123
				 */
				rel = heap_openr(relationName, AccessExclusiveLock);
				trig.tgoid = 0;
				trig.tgname = "<unknown>";
1124
				trig.tgfoid = 0;
1125 1126 1127 1128 1129 1130
				trig.tgtype = 0;
				trig.tgenabled = TRUE;
				trig.tgisconstraint = TRUE;
				trig.tginitdeferred = FALSE;
				trig.tgdeferrable = FALSE;

1131 1132 1133 1134
				trig.tgargs = (char **) palloc(
					 sizeof(char *) * (4 + length(fkconstraint->fk_attrs)
									   + length(fkconstraint->pk_attrs)));

1135
				trig.tgargs[0] = "<unnamed>";
1136
				trig.tgargs[1] = (char *) relationName;
1137 1138 1139
				trig.tgargs[2] = fkconstraint->pktable_name;
				trig.tgargs[3] = fkconstraint->match_type;
				count = 4;
1140
				foreach(list, fkconstraint->fk_attrs)
1141
				{
1142 1143
					Ident	   *fk_at = lfirst(list);

1144 1145
					trig.tgargs[count++] = fk_at->name;
				}
1146
				foreach(list, fkconstraint->pk_attrs)
1147
				{
1148 1149
					Ident	   *pk_at = lfirst(list);

1150 1151 1152 1153 1154
					trig.tgargs[count++] = pk_at->name;
				}
				trig.tgnargs = count;

				scan = heap_beginscan(rel, false, SnapshotNow, 0, NULL);
1155
				AssertState(scan != NULL);
1156 1157 1158

				while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
				{
1159 1160 1161 1162
					/* Make a call to the check function */
					/* No parameters are passed, but we do set a context */
					FunctionCallInfoData	fcinfo;
					TriggerData				trigdata;
1163

1164 1165
					MemSet(&fcinfo, 0, sizeof(fcinfo));
					/* We assume RI_FKey_check_ins won't look at flinfo... */
1166

1167 1168 1169 1170 1171 1172
					trigdata.type = T_TriggerData;
					trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
					trigdata.tg_relation = rel;
					trigdata.tg_trigtuple = tuple;
					trigdata.tg_newtuple = NULL;
					trigdata.tg_trigger = &trig;
1173

1174
					fcinfo.context = (Node *) &trigdata;
1175

1176
					RI_FKey_check_ins(&fcinfo);
1177 1178
				}
				heap_endscan(scan);
1179 1180
				heap_close(rel, NoLock);		/* close rel but keep
												 * lock! */
1181 1182 1183 1184 1185 1186 1187

				pfree(trig.tgargs);
			}
			break;
		default:
			elog(ERROR, "ALTER TABLE / ADD CONSTRAINT unable to determine type of constraint passed");
	}
1188 1189 1190 1191
}



1192 1193 1194 1195 1196
/*
 * ALTER TABLE DROP CONSTRAINT
 */
void
AlterTableDropConstraint(const char *relationName,
1197 1198
						 bool inh, const char *constrName,
						 int behavior)
1199
{
1200
	elog(ERROR, "ALTER TABLE / DROP CONSTRAINT is not implemented");
1201 1202 1203 1204 1205 1206 1207 1208 1209
}



/*
 *
 * LOCK TABLE
 *
 */
1210
void
1211
LockTableCommand(LockStmt *lockstmt)
1212 1213 1214
{
	Relation	rel;
	int			aclresult;
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
	HeapTuple	tup;

	
	/* ----------
	 * Check pg_shadow for global lock setting 
	 * ----------
	 */
	tup = SearchSysCacheTuple(SHADOWNAME, PointerGetDatum(GetPgUserName()), 0, 0, 0);

	if (!HeapTupleIsValid(tup))
 		elog(ERROR, "LOCK TABLE: look at pg_shadow failed"); 

 	if (!((Form_pg_shadow) GETSTRUCT(tup))->uselocktable)
 		elog(ERROR, "LOCK TABLE: permission denied");	

1230

1231
	rel = heap_openr(lockstmt->relname, NoLock);
1232
	if (!RelationIsValid(rel))
1233
		elog(ERROR, "Relation '%s' does not exist", lockstmt->relname);
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

	if (lockstmt->mode == AccessShareLock)
		aclresult = pg_aclcheck(lockstmt->relname, GetPgUserName(), ACL_RD);
	else
		aclresult = pg_aclcheck(lockstmt->relname, GetPgUserName(), ACL_WR);

	if (aclresult != ACLCHECK_OK)
		elog(ERROR, "LOCK TABLE: permission denied");

	LockRelation(rel, lockstmt->mode);

1245
	heap_close(rel, NoLock);	/* close rel, keep lock */
1246
}