pg_operator.c 27.2 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * pg_operator.c
4
 *	  routines to support manipulation of the pg_operator relation
5
 *
Bruce Momjian's avatar
Bruce Momjian committed
6
 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/catalog/pg_operator.c,v 1.75 2002/08/05 03:29:16 tgl Exp $
12 13
 *
 * NOTES
14 15
 *	  these routines moved here from commands/define.c and somewhat cleaned up.
 *
16 17
 *-------------------------------------------------------------------------
 */
Bruce Momjian's avatar
Bruce Momjian committed
18 19 20 21
#include "postgres.h"

#include "access/heapam.h"
#include "catalog/catname.h"
22
#include "catalog/dependency.h"
23
#include "catalog/indexing.h"
24
#include "catalog/namespace.h"
Bruce Momjian's avatar
Bruce Momjian committed
25 26 27
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "miscadmin.h"
Bruce Momjian's avatar
Bruce Momjian committed
28
#include "parser/parse_func.h"
29
#include "parser/parse_oper.h"
30
#include "utils/acl.h"
Bruce Momjian's avatar
Bruce Momjian committed
31
#include "utils/builtins.h"
32
#include "utils/lsyscache.h"
Bruce Momjian's avatar
Bruce Momjian committed
33 34
#include "utils/syscache.h"

35

36
static Oid OperatorGet(const char *operatorName,
37
					   Oid operatorNamespace,
38 39 40
					   Oid leftObjectId,
					   Oid rightObjectId,
					   bool *defined);
41

42 43 44 45 46
static Oid OperatorLookup(List *operatorName,
						  Oid leftObjectId,
						  Oid rightObjectId,
						  bool *defined);

47
static Oid OperatorShellMake(const char *operatorName,
48
							 Oid operatorNamespace,
49 50
							 Oid leftTypeId,
							 Oid rightTypeId);
51

52
static void OperatorUpd(Oid baseId, Oid commId, Oid negId);
53

54 55 56 57 58 59
static Oid get_other_operator(List *otherOp,
							  Oid otherLeftTypeId, Oid otherRightTypeId,
							  const char *operatorName, Oid operatorNamespace,
							  Oid leftTypeId, Oid rightTypeId,
							  bool isCommutator);

60 61
static void makeOperatorDependencies(HeapTuple tuple, Oid pg_operator_relid);

62 63 64 65 66 67 68 69 70 71 72 73 74

/*
 * Check whether a proposed operator name is legal
 *
 * This had better match the behavior of parser/scan.l!
 *
 * We need this because the parser is not smart enough to check that
 * the arguments of CREATE OPERATOR's COMMUTATOR, NEGATOR, etc clauses
 * are operator names rather than some other lexical entity.
 */
static bool
validOperatorName(const char *name)
{
75
	size_t		len = strlen(name);
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

	/* Can't be empty or too long */
	if (len == 0 || len >= NAMEDATALEN)
		return false;

	/* Can't contain any invalid characters */
	/* Test string here should match op_chars in scan.l */
	if (strspn(name, "~!@#^&|`?$+-*/%<>=") != len)
		return false;

	/* Can't contain slash-star or dash-dash (comment starts) */
	if (strstr(name, "/*") || strstr(name, "--"))
		return false;

	/*
91 92 93 94 95
	 * For SQL92 compatibility, '+' and '-' cannot be the last char of a
	 * multi-char operator unless the operator contains chars that are not
	 * in SQL92 operators. The idea is to lex '=-' as two operators, but
	 * not to forbid operator names like '?-' that could not be sequences
	 * of SQL92 operators.
96 97
	 */
	if (len > 1 &&
98 99
		(name[len - 1] == '+' ||
		 name[len - 1] == '-'))
100
	{
101
		int			ic;
102

103
		for (ic = len - 2; ic >= 0; ic--)
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
		{
			if (strchr("~!@#^&|`?$%", name[ic]))
				break;
		}
		if (ic < 0)
			return false;		/* nope, not valid */
	}

	/* != isn't valid either, because parser will convert it to <> */
	if (strcmp(name, "!=") == 0)
		return false;

	return true;
}


120 121
/*
 * OperatorGet
122
 *
123 124
 *		finds an operator given an exact specification (name, namespace,
 *		left and right type IDs).
125
 *
126
 *		*defined is set TRUE if defined (not a shell)
127
 */
128
static Oid
129
OperatorGet(const char *operatorName,
130
			Oid operatorNamespace,
131 132 133
			Oid leftObjectId,
			Oid rightObjectId,
			bool *defined)
134
{
135
	HeapTuple	tup;
136 137
	Oid			operatorObjectId;

138 139 140 141 142
	tup = SearchSysCache(OPERNAMENSP,
						 PointerGetDatum(operatorName),
						 ObjectIdGetDatum(leftObjectId),
						 ObjectIdGetDatum(rightObjectId),
						 ObjectIdGetDatum(operatorNamespace));
143 144
	if (HeapTupleIsValid(tup))
	{
145
		RegProcedure oprcode = ((Form_pg_operator) GETSTRUCT(tup))->oprcode;
Bruce Momjian's avatar
Bruce Momjian committed
146

147
		operatorObjectId = HeapTupleGetOid(tup);
148
		*defined = RegProcedureIsValid(oprcode);
149
		ReleaseSysCache(tup);
150 151 152 153 154 155
	}
	else
	{
		operatorObjectId = InvalidOid;
		*defined = false;
	}
156

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
	return operatorObjectId;
}

/*
 * OperatorLookup
 *
 *		looks up an operator given a possibly-qualified name and
 *		left and right type IDs.
 *
 *		*defined is set TRUE if defined (not a shell)
 */
static Oid
OperatorLookup(List *operatorName,
			   Oid leftObjectId,
			   Oid rightObjectId,
			   bool *defined)
{
	Oid			operatorObjectId;
175
	RegProcedure oprcode;
176 177 178 179 180 181 182 183 184 185 186

	operatorObjectId = LookupOperName(operatorName, leftObjectId,
									  rightObjectId);
	if (!OidIsValid(operatorObjectId))
	{
		*defined = false;
		return InvalidOid;
	}

	oprcode = get_opcode(operatorObjectId);
	*defined = RegProcedureIsValid(oprcode);
187

188
	return operatorObjectId;
189 190
}

191

192 193 194
/*
 * OperatorShellMake
 *		Make a "shell" entry for a not-yet-existing operator.
195
 */
196
static Oid
197
OperatorShellMake(const char *operatorName,
198
				  Oid operatorNamespace,
199 200
				  Oid leftTypeId,
				  Oid rightTypeId)
201
{
202 203
	Relation	pg_operator_desc;
	Oid			operatorObjectId;
204
	int			i;
205 206 207
	HeapTuple	tup;
	Datum		values[Natts_pg_operator];
	char		nulls[Natts_pg_operator];
208
	NameData	oname;
209
	TupleDesc	tupDesc;
210

211 212 213 214 215 216
	/*
	 * validate operator name
	 */
	if (!validOperatorName(operatorName))
		elog(ERROR, "\"%s\" is not a valid operator name", operatorName);

217 218
	/*
	 * initialize our *nulls and *values arrays
219 220 221 222 223 224 225
	 */
	for (i = 0; i < Natts_pg_operator; ++i)
	{
		nulls[i] = ' ';
		values[i] = (Datum) NULL;		/* redundant, but safe */
	}

226
	/*
227
	 * initialize values[] with the operator name and input data types.
228
	 * Note that oprcode is set to InvalidOid, indicating it's a shell.
229 230
	 */
	i = 0;
231
	namestrcpy(&oname, operatorName);
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
	values[i++] = NameGetDatum(&oname);				/* oprname */
	values[i++] = ObjectIdGetDatum(operatorNamespace);	/* oprnamespace */
	values[i++] = Int32GetDatum(GetUserId());		/* oprowner */
	values[i++] = CharGetDatum(leftTypeId ? (rightTypeId ? 'b' : 'r') : 'l');	/* oprkind */
	values[i++] = BoolGetDatum(false);				/* oprcanhash */
	values[i++] = ObjectIdGetDatum(leftTypeId);		/* oprleft */
	values[i++] = ObjectIdGetDatum(rightTypeId);	/* oprright */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprresult */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprcom */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprnegate */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprlsortop */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprrsortop */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprltcmpop */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprgtcmpop */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprcode */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprrest */
	values[i++] = ObjectIdGetDatum(InvalidOid);		/* oprjoin */
249

250
	/*
251
	 * open pg_operator
252
	 */
253
	pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);
254 255
	tupDesc = pg_operator_desc->rd_att;

256 257 258
	/*
	 * create a new operator tuple
	 */
259
	tup = heap_formtuple(tupDesc, values, nulls);
260

261
	/*
262
	 * insert our "shell" operator tuple
263
	 */
264
	operatorObjectId = simple_heap_insert(pg_operator_desc, tup);
265

266
	CatalogUpdateIndexes(pg_operator_desc, tup);
267

268 269 270
	/* Add dependencies for the entry */
	makeOperatorDependencies(tup, RelationGetRelid(pg_operator_desc));

271
	heap_freetuple(tup);
272

273 274
	/*
	 * close the operator relation and return the oid.
275
	 */
276
	heap_close(pg_operator_desc, RowExclusiveLock);
277

278
	return operatorObjectId;
279 280
}

281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
/*
 * OperatorCreate
 *
 * "X" indicates an optional argument (i.e. one that can be NULL or 0)
 *		operatorName			name for new operator
 *		operatorNamespace		namespace for new operator
 *		leftTypeId				X left type ID
 *		rightTypeId				X right type ID
 *		procedureName			procedure for operator
 *		commutatorName			X commutator operator
 *		negatorName				X negator operator
 *		restrictionName			X restriction sel. procedure
 *		joinName				X join sel. procedure
 *		canHash					hash join can be used with this operator
 *		leftSortName			X left sort operator (for merge join)
 *		rightSortName			X right sort operator (for merge join)
 *		ltCompareName			X L<R compare operator (for merge join)
 *		gtCompareName			X L>R compare operator (for merge join)
299 300 301 302 303
 *
 * This routine gets complicated because it allows the user to
 * specify operators that do not exist.  For example, if operator
 * "op" is being defined, the negator operator "negop" and the
 * commutator "commop" can also be defined without specifying
304
 * any information other than their names.	Since in order to
305 306 307 308 309 310 311 312 313 314
 * add "op" to the PG_OPERATOR catalog, all the Oid's for these
 * operators must be placed in the fields of "op", a forward
 * declaration is done on the commutator and negator operators.
 * This is called creating a shell, and its main effect is to
 * create a tuple in the PG_OPERATOR catalog with minimal
 * information about the operator (just its name and types).
 * Forward declaration is used only for this purpose, it is
 * not available to the user as it is for type definition.
 *
 * Algorithm:
315 316
 *
 * check if operator already defined
317 318
 *	  if so, but oprcode is null, save the Oid -- we are filling in a shell
 *	  otherwise error
319 320
 * get the attribute types from relation descriptor for pg_operator
 * assign values to the fields of the operator:
321 322 323 324 325 326 327 328 329
 *	 operatorName
 *	 owner id (simply the user id of the caller)
 *	 operator "kind" either "b" for binary or "l" for left unary
 *	 canHash boolean
 *	 leftTypeObjectId -- type must already be defined
 *	 rightTypeObjectId -- this is optional, enter ObjectId=0 if none specified
 *	 resultType -- defer this, since it must be determined from
 *				   the pg_procedure catalog
 *	 commutatorObjectId -- if this is NULL, enter ObjectId=0
330
 *					  else if this already exists, enter its ObjectId
331 332 333 334
 *					  else if this does not yet exist, and is not
 *						the same as the main operatorName, then create
 *						a shell and enter the new ObjectId
 *					  else if this does not exist but IS the same
335 336 337
 *						name & types as the main operator, set the ObjectId=0.
 *						(We are creating a self-commutating operator.)
 *						The link will be fixed later by OperatorUpd.
338 339 340 341 342
 *	 negatorObjectId   -- same as for commutatorObjectId
 *	 leftSortObjectId  -- same as for commutatorObjectId
 *	 rightSortObjectId -- same as for commutatorObjectId
 *	 operatorProcedure -- must access the pg_procedure catalog to get the
 *				   ObjectId of the procedure that actually does the operator
343
 *				   actions this is required.  Do a lookup to find out the
344 345 346 347
 *				   return type of the procedure
 *	 restrictionProcedure -- must access the pg_procedure catalog to get
 *				   the ObjectId but this is optional
 *	 joinProcedure -- same as restrictionProcedure
348 349
 * now either insert or replace the operator into the pg_operator catalog
 * if the operator shell is being filled in
350 351
 *	 access the catalog in order to get a valid buffer
 *	 create a tuple using ModifyHeapTuple
352
 *	 get the t_self from the modified tuple and call RelationReplaceHeapTuple
353
 * else if a new operator is being created
354
 *	 create a tuple using heap_formtuple
355
 *	 call simple_heap_insert
356
 */
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
void
OperatorCreate(const char *operatorName,
			   Oid operatorNamespace,
			   Oid leftTypeId,
			   Oid rightTypeId,
			   List *procedureName,
			   List *commutatorName,
			   List *negatorName,
			   List *restrictionName,
			   List *joinName,
			   bool canHash,
			   List *leftSortName,
			   List *rightSortName,
			   List *ltCompareName,
			   List *gtCompareName)
372
{
373 374 375 376 377 378
	Relation	pg_operator_desc;
	HeapTuple	tup;
	char		nulls[Natts_pg_operator];
	char		replaces[Natts_pg_operator];
	Datum		values[Natts_pg_operator];
	Oid			operatorObjectId;
379
	bool		operatorAlreadyDefined;
380 381 382 383 384 385 386 387 388 389
	Oid			procOid;
	Oid			operResultType;
	Oid			commutatorId,
				negatorId,
				leftSortId,
				rightSortId,
				ltCompareId,
				gtCompareId,
				restOid,
				joinOid;
390
	bool		selfCommutator = false;
391
	Oid			typeId[FUNC_MAX_ARGS];
392
	int			nargs;
393
	NameData	oname;
394
	TupleDesc	tupDesc;
395
	int			i;
396 397

	/*
398
	 * Sanity checks
399 400 401 402
	 */
	if (!validOperatorName(operatorName))
		elog(ERROR, "\"%s\" is not a valid operator name", operatorName);

403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
	if (!OidIsValid(leftTypeId) && !OidIsValid(rightTypeId))
		elog(ERROR, "at least one of leftarg or rightarg must be specified");

	if (!(OidIsValid(leftTypeId) && OidIsValid(rightTypeId)))
	{
		/* If it's not a binary op, these things mustn't be set: */
		if (commutatorName)
			elog(ERROR, "only binary operators can have commutators");
		if (joinName)
			elog(ERROR, "only binary operators can have join selectivity");
		if (canHash)
			elog(ERROR, "only binary operators can hash");
		if (leftSortName || rightSortName || ltCompareName || gtCompareName)
			elog(ERROR, "only binary operators can mergejoin");
	}
418 419

	operatorObjectId = OperatorGet(operatorName,
420
								   operatorNamespace,
421 422
								   leftTypeId,
								   rightTypeId,
423
								   &operatorAlreadyDefined);
424

425
	if (operatorAlreadyDefined)
Bruce Momjian's avatar
Bruce Momjian committed
426
		elog(ERROR, "OperatorDef: operator \"%s\" already defined",
427 428
			 operatorName);

Bruce Momjian's avatar
Bruce Momjian committed
429 430 431
	/*
	 * At this point, if operatorObjectId is not InvalidOid then we are
	 * filling in a previously-created shell.
432 433
	 */

434 435 436 437
	/*
	 * Look up registered procedures -- find the return type of
	 * procedureName to place in "result" field. Do this before shells are
	 * created so we don't have to worry about deleting them later.
438
	 */
439
	MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
440
	if (!OidIsValid(leftTypeId))
441 442 443 444
	{
		typeId[0] = rightTypeId;
		nargs = 1;
	}
445
	else if (!OidIsValid(rightTypeId))
446 447 448 449 450 451 452 453 454 455
	{
		typeId[0] = leftTypeId;
		nargs = 1;
	}
	else
	{
		typeId[0] = leftTypeId;
		typeId[1] = rightTypeId;
		nargs = 2;
	}
456 457
	procOid = LookupFuncName(procedureName, nargs, typeId);
	if (!OidIsValid(procOid))
458
		func_error("OperatorDef", procedureName, nargs, typeId, NULL);
459
	operResultType = get_func_rettype(procOid);
460

461
	/*
462
	 * find restriction estimator
463 464
	 */
	if (restrictionName)
465
	{
466
		MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
467 468 469 470
		typeId[0] = 0;			/* Query (opaque type) */
		typeId[1] = OIDOID;		/* operator OID */
		typeId[2] = 0;			/* args list (opaque type) */
		typeId[3] = INT4OID;	/* varRelid */
471

472
		restOid = LookupFuncName(restrictionName, 4, typeId);
473
		if (!OidIsValid(restOid))
474
			func_error("OperatorDef", restrictionName, 4, typeId, NULL);
475 476
	}
	else
477
		restOid = InvalidOid;
478

479
	/*
480
	 * find join estimator
481 482
	 */
	if (joinName)
483
	{
484
		MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
485 486 487
		typeId[0] = 0;			/* Query (opaque type) */
		typeId[1] = OIDOID;		/* operator OID */
		typeId[2] = 0;			/* args list (opaque type) */
488

489
		joinOid = LookupFuncName(joinName, 3, typeId);
490
		if (!OidIsValid(joinOid))
491
			func_error("OperatorDef", joinName, 3, typeId, NULL);
492 493
	}
	else
494
		joinOid = InvalidOid;
495

496
	/*
497 498
	 * set up values in the operator tuple
	 */
499 500 501 502 503 504 505 506

	for (i = 0; i < Natts_pg_operator; ++i)
	{
		values[i] = (Datum) NULL;
		replaces[i] = 'r';
		nulls[i] = ' ';
	}

507
	i = 0;
508
	namestrcpy(&oname, operatorName);
509 510 511 512 513 514 515 516
	values[i++] = NameGetDatum(&oname);			/* oprname */
	values[i++] = ObjectIdGetDatum(operatorNamespace);	/* oprnamespace */
	values[i++] = Int32GetDatum(GetUserId());		/* oprowner */
	values[i++] = CharGetDatum(leftTypeId ? (rightTypeId ? 'b' : 'r') : 'l');	/* oprkind */
	values[i++] = BoolGetDatum(canHash);			/* oprcanhash */
	values[i++] = ObjectIdGetDatum(leftTypeId);		/* oprleft */
	values[i++] = ObjectIdGetDatum(rightTypeId);	/* oprright */
	values[i++] = ObjectIdGetDatum(operResultType);	/* oprresult */
517 518

	/*
519 520
	 * Set up the other operators.	If they do not currently exist, create
	 * shells in order to get ObjectId's.
521 522
	 */

523
	if (commutatorName)
524
	{
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
		/* commutator has reversed arg types */
		commutatorId = get_other_operator(commutatorName,
										  rightTypeId, leftTypeId,
										  operatorName, operatorNamespace,
										  leftTypeId, rightTypeId,
										  true);
		/*
		 * self-linkage to this operator; will fix below. Note
		 * that only self-linkage for commutation makes sense.
		 */
		if (!OidIsValid(commutatorId))
			selfCommutator = true;
	}
	else
		commutatorId = InvalidOid;
	values[i++] = ObjectIdGetDatum(commutatorId);	/* oprcom */
541

542 543 544 545 546 547 548 549 550 551 552 553
	if (negatorName)
	{
		/* negator has same arg types */
		negatorId = get_other_operator(negatorName,
									   leftTypeId, rightTypeId,
									   operatorName, operatorNamespace,
									   leftTypeId, rightTypeId,
									   false);
	}
	else
		negatorId = InvalidOid;
	values[i++] = ObjectIdGetDatum(negatorId);		/* oprnegate */
554

555 556 557 558 559 560 561 562
	if (leftSortName)
	{
		/* left sort op takes left-side data type */
		leftSortId = get_other_operator(leftSortName,
									   leftTypeId, leftTypeId,
									   operatorName, operatorNamespace,
									   leftTypeId, rightTypeId,
									   false);
563
	}
564 565 566
	else
		leftSortId = InvalidOid;
	values[i++] = ObjectIdGetDatum(leftSortId);		/* oprlsortop */
567

568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
	if (rightSortName)
	{
		/* right sort op takes right-side data type */
		rightSortId = get_other_operator(rightSortName,
										 rightTypeId, rightTypeId,
										 operatorName, operatorNamespace,
										 leftTypeId, rightTypeId,
										 false);
	}
	else
		rightSortId = InvalidOid;
	values[i++] = ObjectIdGetDatum(rightSortId);	/* oprrsortop */

	if (ltCompareName)
	{
		/* comparator has same arg types */
		ltCompareId = get_other_operator(ltCompareName,
										 leftTypeId, rightTypeId,
										 operatorName, operatorNamespace,
										 leftTypeId, rightTypeId,
										 false);
	}
	else
		ltCompareId = InvalidOid;
	values[i++] = ObjectIdGetDatum(ltCompareId);	/* oprltcmpop */

	if (gtCompareName)
	{
		/* comparator has same arg types */
		gtCompareId = get_other_operator(gtCompareName,
										 leftTypeId, rightTypeId,
										 operatorName, operatorNamespace,
										 leftTypeId, rightTypeId,
										 false);
	}
	else
		gtCompareId = InvalidOid;
	values[i++] = ObjectIdGetDatum(gtCompareId);	/* oprgtcmpop */

	values[i++] = ObjectIdGetDatum(procOid);		/* oprcode */
	values[i++] = ObjectIdGetDatum(restOid);		/* oprrest */
	values[i++] = ObjectIdGetDatum(joinOid);		/* oprjoin */
610

611 612
	pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);

613
	/*
614
	 * If we are adding to an operator shell, update; else insert
615 616 617
	 */
	if (operatorObjectId)
	{
618 619 620 621 622 623
		tup = SearchSysCacheCopy(OPEROID,
								 ObjectIdGetDatum(operatorObjectId),
								 0, 0, 0);
		if (!HeapTupleIsValid(tup))
			elog(ERROR, "OperatorDef: operator %u not found",
				 operatorObjectId);
624

625 626 627 628 629
		tup = heap_modifytuple(tup,
							   pg_operator_desc,
							   values,
							   nulls,
							   replaces);
630

631
		simple_heap_update(pg_operator_desc, &tup->t_self, tup);
632 633 634 635 636 637
	}
	else
	{
		tupDesc = pg_operator_desc->rd_att;
		tup = heap_formtuple(tupDesc, values, nulls);

638
		operatorObjectId = simple_heap_insert(pg_operator_desc, tup);
639 640
	}

641
	/* Must update the indexes in either case */
642
	CatalogUpdateIndexes(pg_operator_desc, tup);
643

644 645 646
	/* Add dependencies for the entry */
	makeOperatorDependencies(tup, RelationGetRelid(pg_operator_desc));

647
	heap_close(pg_operator_desc, RowExclusiveLock);
648 649

	/*
650
	 * If a commutator and/or negator link is provided, update the other
Bruce Momjian's avatar
Bruce Momjian committed
651 652 653 654 655 656 657 658
	 * operator(s) to point at this one, if they don't already have a
	 * link. This supports an alternate style of operator definition
	 * wherein the user first defines one operator without giving negator
	 * or commutator, then defines the other operator of the pair with the
	 * proper commutator or negator attribute.	That style doesn't require
	 * creation of a shell, and it's the only style that worked right
	 * before Postgres version 6.5. This code also takes care of the
	 * situation where the new operator is its own commutator.
659
	 */
660 661 662 663
	if (selfCommutator)
		commutatorId = operatorObjectId;

	if (OidIsValid(commutatorId) || OidIsValid(negatorId))
664
		OperatorUpd(operatorObjectId, commutatorId, negatorId);
665 666
}

667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
/*
 * Try to lookup another operator (commutator, etc)
 *
 * If not found, check to see if it is exactly the operator we are trying
 * to define; if so, return InvalidOid.  (Note that this case is only
 * sensible for a commutator, so we error out otherwise.)  If it is not
 * the same operator, create a shell operator.
 */
static Oid
get_other_operator(List *otherOp, Oid otherLeftTypeId, Oid otherRightTypeId,
				   const char *operatorName, Oid operatorNamespace,
				   Oid leftTypeId, Oid rightTypeId, bool isCommutator)
{
	Oid			other_oid;
	bool		otherDefined;
	char	   *otherName;
	Oid			otherNamespace;
684
	AclResult	aclresult;
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714

	other_oid = OperatorLookup(otherOp,
							   otherLeftTypeId,
							   otherRightTypeId,
							   &otherDefined);

	if (OidIsValid(other_oid))
	{
		/* other op already in catalogs */
		return other_oid;
	}

	otherNamespace = QualifiedNameGetCreationNamespace(otherOp,
													   &otherName);

	if (strcmp(otherName, operatorName) == 0 &&
		otherNamespace == operatorNamespace &&
		otherLeftTypeId == leftTypeId &&
		otherRightTypeId == rightTypeId)
	{
		/*
		 * self-linkage to this operator; caller will fix later. Note
		 * that only self-linkage for commutation makes sense.
		 */
		if (!isCommutator)
			elog(ERROR, "operator cannot be its own negator or sort operator");
		return InvalidOid;
	}

	/* not in catalogs, different from operator, so make shell */
715 716 717 718 719 720

	aclresult = pg_namespace_aclcheck(otherNamespace, GetUserId(),
									  ACL_CREATE);
	if (aclresult != ACLCHECK_OK)
		aclcheck_error(aclresult, get_namespace_name(otherNamespace));

721 722 723 724 725 726 727 728 729 730 731 732
	other_oid = OperatorShellMake(otherName,
								  otherNamespace,
								  otherLeftTypeId,
								  otherRightTypeId);
	if (!OidIsValid(other_oid))
		elog(ERROR,
			 "OperatorDef: can't create operator shell \"%s\"",
			 NameListToString(otherOp));
	return other_oid;
}

/*
733 734
 * OperatorUpd
 *
735
 *	For a given operator, look up its negator and commutator operators.
736 737
 *	If they are defined, but their negator and commutator fields
 *	(respectively) are empty, then use the new operator for neg or comm.
738 739
 *	This solves a problem for users who need to insert two new operators
 *	which are the negator or commutator of each other.
740 741 742 743
 */
static void
OperatorUpd(Oid baseId, Oid commId, Oid negId)
{
744
	int			i;
745 746 747 748 749
	Relation	pg_operator_desc;
	HeapTuple	tup;
	char		nulls[Natts_pg_operator];
	char		replaces[Natts_pg_operator];
	Datum		values[Natts_pg_operator];
750 751 752

	for (i = 0; i < Natts_pg_operator; ++i)
	{
753
		values[i] = (Datum) 0;
754 755 756 757
		replaces[i] = ' ';
		nulls[i] = ' ';
	}

758 759 760 761 762 763 764
	/*
	 * check and update the commutator & negator, if necessary
	 *
	 * First make sure we can see them...
	 */
	CommandCounterIncrement();

765
	pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);
766

767 768 769
	tup = SearchSysCacheCopy(OPEROID,
							 ObjectIdGetDatum(commId),
							 0, 0, 0);
770

Bruce Momjian's avatar
Bruce Momjian committed
771 772
	/*
	 * if the commutator and negator are the same operator, do one update.
773 774 775
	 * XXX this is probably useless code --- I doubt it ever makes sense
	 * for commutator and negator to be the same thing...
	 */
776 777 778 779
	if (commId == negId)
	{
		if (HeapTupleIsValid(tup))
		{
780
			Form_pg_operator t = (Form_pg_operator) GETSTRUCT(tup);
781

782
			if (!OidIsValid(t->oprcom) || !OidIsValid(t->oprnegate))
783 784 785
			{
				if (!OidIsValid(t->oprnegate))
				{
786
					values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
787 788 789 790 791
					replaces[Anum_pg_operator_oprnegate - 1] = 'r';
				}

				if (!OidIsValid(t->oprcom))
				{
792
					values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
793 794 795 796 797 798 799 800 801
					replaces[Anum_pg_operator_oprcom - 1] = 'r';
				}

				tup = heap_modifytuple(tup,
									   pg_operator_desc,
									   values,
									   nulls,
									   replaces);

802
				simple_heap_update(pg_operator_desc, &tup->t_self, tup);
803

804
				CatalogUpdateIndexes(pg_operator_desc, tup);
805
			}
806
		}
807

808
		heap_close(pg_operator_desc, RowExclusiveLock);
809 810 811 812 813

		return;
	}

	/* if commutator and negator are different, do two updates */
814

815
	if (HeapTupleIsValid(tup) &&
816
		!(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprcom)))
817 818 819
	{
		values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
		replaces[Anum_pg_operator_oprcom - 1] = 'r';
820

821
		tup = heap_modifytuple(tup,
822 823 824 825 826
							   pg_operator_desc,
							   values,
							   nulls,
							   replaces);

827
		simple_heap_update(pg_operator_desc, &tup->t_self, tup);
828

829
		CatalogUpdateIndexes(pg_operator_desc, tup);
830

831 832 833 834 835 836
		values[Anum_pg_operator_oprcom - 1] = (Datum) NULL;
		replaces[Anum_pg_operator_oprcom - 1] = ' ';
	}

	/* check and update the negator, if necessary */

837 838 839
	tup = SearchSysCacheCopy(OPEROID,
							 ObjectIdGetDatum(negId),
							 0, 0, 0);
840 841

	if (HeapTupleIsValid(tup) &&
842
		!(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprnegate)))
843 844 845
	{
		values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
		replaces[Anum_pg_operator_oprnegate - 1] = 'r';
846

847 848 849 850 851 852
		tup = heap_modifytuple(tup,
							   pg_operator_desc,
							   values,
							   nulls,
							   replaces);

853
		simple_heap_update(pg_operator_desc, &tup->t_self, tup);
854

855
		CatalogUpdateIndexes(pg_operator_desc, tup);
856 857
	}

858
	heap_close(pg_operator_desc, RowExclusiveLock);
859
}
860 861 862 863 864

/*
 * Create dependencies for a new operator (either a freshly inserted
 * complete operator, a new shell operator, or a just-updated shell).
 *
865
 * NB: the OidIsValid tests in this routine are necessary, in case
866 867 868 869 870 871 872 873 874 875
 * the given operator is a shell.
 */
static void
makeOperatorDependencies(HeapTuple tuple, Oid pg_operator_relid)
{
	Form_pg_operator	oper = (Form_pg_operator) GETSTRUCT(tuple);
	ObjectAddress	myself,
					referenced;

	myself.classId = pg_operator_relid;
876
	myself.objectId = HeapTupleGetOid(tuple);
877 878 879 880 881
	myself.objectSubId = 0;

	/* In case we are updating a shell, delete any existing entries */
	deleteDependencyRecordsFor(myself.classId, myself.objectId);

882 883 884 885 886 887 888 889 890
	/* Dependency on namespace */
	if (OidIsValid(oper->oprnamespace))
	{
		referenced.classId = get_system_catalog_relid(NamespaceRelationName);
		referenced.objectId = oper->oprnamespace;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
	/* Dependency on left type */
	if (OidIsValid(oper->oprleft))
	{
		referenced.classId = RelOid_pg_type;
		referenced.objectId = oper->oprleft;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/* Dependency on right type */
	if (OidIsValid(oper->oprright))
	{
		referenced.classId = RelOid_pg_type;
		referenced.objectId = oper->oprright;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/* Dependency on result type */
	if (OidIsValid(oper->oprresult))
	{
		referenced.classId = RelOid_pg_type;
		referenced.objectId = oper->oprresult;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/*
	 * NOTE: we do not consider the operator to depend on the associated
	 * operators oprcom, oprnegate, oprlsortop, oprrsortop, oprltcmpop,
	 * oprgtcmpop.  We would not want to delete this operator if those
	 * go away, but only reset the link fields; which is not a function
	 * that the dependency code can presently handle.  (Something could
	 * perhaps be done with objectSubId though.)  For now, it's okay to
	 * let those links dangle if a referenced operator is removed.
	 */

	/* Dependency on implementation function */
	if (OidIsValid(oper->oprcode))
	{
		referenced.classId = RelOid_pg_proc;
		referenced.objectId = oper->oprcode;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/* Dependency on restriction selectivity function */
	if (OidIsValid(oper->oprrest))
	{
		referenced.classId = RelOid_pg_proc;
		referenced.objectId = oper->oprrest;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/* Dependency on join selectivity function */
	if (OidIsValid(oper->oprjoin))
	{
		referenced.classId = RelOid_pg_proc;
		referenced.objectId = oper->oprjoin;
		referenced.objectSubId = 0;
		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}
}