relcache.c 205 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
Bruce Momjian's avatar
Bruce Momjian committed
6
 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  src/backend/utils/cache/relcache.c
12 13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
17
 *		RelationCacheInitialize			- initialize relcache (to empty)
18 19
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
20 21
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
22 23
 *
 * NOTES
24 25
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
26
 */
27 28
#include "postgres.h"

29
#include <sys/file.h>
30
#include <fcntl.h>
31
#include <unistd.h>
32

33 34
#include "access/htup_details.h"
#include "access/multixact.h"
35
#include "access/nbtree.h"
36
#include "access/parallel.h"
37
#include "access/reloptions.h"
38
#include "access/sysattr.h"
39
#include "access/table.h"
40
#include "access/tableam.h"
41
#include "access/tupdesc_details.h"
42
#include "access/xact.h"
43
#include "access/xlog.h"
44
#include "catalog/catalog.h"
Bruce Momjian's avatar
Bruce Momjian committed
45
#include "catalog/indexing.h"
46
#include "catalog/namespace.h"
47
#include "catalog/partition.h"
48
#include "catalog/pg_am.h"
49
#include "catalog/pg_amproc.h"
Bruce Momjian's avatar
Bruce Momjian committed
50
#include "catalog/pg_attrdef.h"
51
#include "catalog/pg_auth_members.h"
52
#include "catalog/pg_authid.h"
53
#include "catalog/pg_constraint.h"
54
#include "catalog/pg_database.h"
55
#include "catalog/pg_namespace.h"
56
#include "catalog/pg_opclass.h"
Bruce Momjian's avatar
Bruce Momjian committed
57
#include "catalog/pg_proc.h"
Peter Eisentraut's avatar
Peter Eisentraut committed
58
#include "catalog/pg_publication.h"
59
#include "catalog/pg_rewrite.h"
60
#include "catalog/pg_shseclabel.h"
61
#include "catalog/pg_statistic_ext.h"
Peter Eisentraut's avatar
Peter Eisentraut committed
62
#include "catalog/pg_subscription.h"
63
#include "catalog/pg_tablespace.h"
64
#include "catalog/pg_trigger.h"
65
#include "catalog/pg_type.h"
66
#include "catalog/schemapg.h"
67
#include "catalog/storage.h"
68
#include "commands/policy.h"
69
#include "commands/trigger.h"
Bruce Momjian's avatar
Bruce Momjian committed
70
#include "miscadmin.h"
71
#include "nodes/makefuncs.h"
72
#include "nodes/nodeFuncs.h"
73
#include "optimizer/optimizer.h"
74
#include "rewrite/rewriteDefine.h"
75
#include "rewrite/rowsecurity.h"
76
#include "storage/lmgr.h"
Bruce Momjian's avatar
Bruce Momjian committed
77
#include "storage/smgr.h"
78
#include "utils/array.h"
79
#include "utils/builtins.h"
80
#include "utils/datum.h"
81
#include "utils/fmgroids.h"
82
#include "utils/inval.h"
83
#include "utils/lsyscache.h"
84
#include "utils/memutils.h"
85
#include "utils/relmapper.h"
Alvaro Herrera's avatar
Alvaro Herrera committed
86
#include "utils/resowner_private.h"
Robert Haas's avatar
Robert Haas committed
87
#include "utils/snapmgr.h"
88
#include "utils/syscache.h"
Bruce Momjian's avatar
Bruce Momjian committed
89

90
#define RELCACHE_INIT_FILEMAGIC		0x573266	/* version ID value */
91

92
/*
93 94
 * Whether to bother checking if relation cache memory needs to be freed
 * eagerly.  See also RelationBuildDesc() and pg_config_manual.h.
95
 */
96 97
#if defined(RECOVER_RELATION_BUILD_MEMORY) && (RECOVER_RELATION_BUILD_MEMORY != 0)
#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
98 99
#else
#define RECOVER_RELATION_BUILD_MEMORY 0
100
#ifdef DISCARD_CACHES_ENABLED
101
#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
102 103 104
#endif
#endif

105
/*
106
 *		hardcoded tuple descriptors, contents generated by genbki.pl
107
 */
108 109 110 111 112
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
113 114
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
115
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
116
static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
Peter Eisentraut's avatar
Peter Eisentraut committed
117
static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
118

119
/*
120
 *		Hash tables that index the relation cache
121
 *
122 123
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
124
 */
125 126 127 128 129 130
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

131
static HTAB *RelationIdCache;
132

133 134 135 136
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
Bruce Momjian's avatar
Bruce Momjian committed
137
bool		criticalRelcachesBuilt = false;
138

139 140
/*
 * This flag is false until we have prepared the critical relcache entries
141
 * for shared catalogs (which are the tables needed for login).
142 143 144
 */
bool		criticalSharedRelcachesBuilt = false;

145 146
/*
 * This counter counts relcache inval events received since backend startup
Bruce Momjian's avatar
Bruce Momjian committed
147
 * (but only for rels that are actually in cache).  Presently, we use it only
148 149 150 151
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
152

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
/*
 * in_progress_list is a stack of ongoing RelationBuildDesc() calls.  CREATE
 * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
 * It critically relies on each backend absorbing those changes no later than
 * next transaction start.  Hence, RelationBuildDesc() loops until it finishes
 * without accepting a relevant invalidation.  (Most invalidation consumers
 * don't do this.)
 */
typedef struct inprogressent
{
	Oid			reloid;			/* OID of relation being built */
	bool		invalidated;	/* whether an invalidation arrived for it */
} InProgressEnt;

static InProgressEnt *in_progress_list;
static int	in_progress_list_len;
static int	in_progress_list_maxlen;

171
/*
172 173 174 175 176 177 178 179
 * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
 * cleanup work.  This list intentionally has limited size; if it overflows,
 * we fall back to scanning the whole hashtable.  There is no value in a very
 * large list because (1) at some point, a hash_seq_search scan is faster than
 * retail lookups, and (2) the value of this is to reduce EOXact work for
 * short transactions, which can't have dirtied all that many tables anyway.
 * EOXactListAdd() does not bother to prevent duplicate list entries, so the
 * cleanup processing must be idempotent.
180
 */
181 182 183 184 185 186 187 188 189 190 191 192
#define MAX_EOXACT_LIST 32
static Oid	eoxact_list[MAX_EOXACT_LIST];
static int	eoxact_list_len = 0;
static bool eoxact_list_overflowed = false;

#define EOXactListAdd(rel) \
	do { \
		if (eoxact_list_len < MAX_EOXACT_LIST) \
			eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
		else \
			eoxact_list_overflowed = true; \
	} while (0)
193

194 195 196 197 198 199
/*
 * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
 * cleanup work.  The array expands as needed; there is no hashtable because
 * we don't need to access individual items except at EOXact.
 */
static TupleDesc *EOXactTupleDescArray;
Bruce Momjian's avatar
Bruce Momjian committed
200 201
static int	NextEOXactTupleDescNum = 0;
static int	EOXactTupleDescArrayLen = 0;
202

203
/*
204
 *		macros to manipulate the lookup hashtable
205
 */
206
#define RelationCacheInsert(RELATION, replace_allowed)	\
207
do { \
208 209 210
	RelIdCacheEnt *hentry; bool found; \
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   (void *) &((RELATION)->rd_id), \
211
										   HASH_ENTER, &found); \
212 213
	if (found) \
	{ \
214
		/* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
215 216 217 218 219
		Relation _old_rel = hentry->reldesc; \
		Assert(replace_allowed); \
		hentry->reldesc = (RELATION); \
		if (RelationHasReferenceCountZero(_old_rel)) \
			RelationDestroyRelation(_old_rel, false); \
220
		else if (!IsBootstrapProcessingMode()) \
221 222 223 224 225
			elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
				 RelationGetRelationName(_old_rel)); \
	} \
	else \
		hentry->reldesc = (RELATION); \
226 227
} while(0)

228
#define RelationIdCacheLookup(ID, RELATION) \
229
do { \
230
	RelIdCacheEnt *hentry; \
231 232 233
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   (void *) &(ID), \
										   HASH_FIND, NULL); \
234
	if (hentry) \
235 236 237 238 239 240 241
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
242 243 244
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   (void *) &((RELATION)->rd_id), \
245
										   HASH_REMOVE, NULL); \
246 247 248
	if (hentry == NULL) \
		elog(WARNING, "failed to delete relcache entry for OID %u", \
			 (RELATION)->rd_id); \
249
} while(0)
250

251 252 253

/*
 * Special cache for opclass-related information
254
 *
255
 * Note: only default support procs get cached, ie, those with
256
 * lefttype = righttype = opcintype.
257 258 259 260
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
261
	bool		valid;			/* set true after successful fill-in */
262
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
263 264
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
265
	RegProcedure *supportProcs; /* OIDs of support procedures */
266 267 268 269 270
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


271
/* non-export function prototypes */
272

273
static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
274
static void RelationClearRelation(Relation relation, bool rebuild);
275

276
static void RelationReloadIndexInfo(Relation relation);
277
static void RelationReloadNailed(Relation relation);
278
static void RelationFlushRelation(Relation relation);
279
static void RememberToFreeTupleDescAtEOX(TupleDesc td);
280 281 282
#ifdef USE_ASSERT_CHECKING
static void AssertPendingSyncConsistency(Relation relation);
#endif
283 284
static void AtEOXact_cleanup(Relation relation, bool isCommit);
static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
Tom Lane's avatar
Tom Lane committed
285
								SubTransactionId mySubid, SubTransactionId parentSubid);
286 287
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
Bruce Momjian's avatar
Bruce Momjian committed
288
static void write_item(const void *data, Size len, FILE *fp);
289

290
static void formrdesc(const char *relationName, Oid relationReltype,
Tom Lane's avatar
Tom Lane committed
291
					  bool isshared, int natts, const FormData_pg_attribute *attrs);
292

Robert Haas's avatar
Robert Haas committed
293
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
294
static Relation AllocateRelationDesc(Form_pg_class relp);
295
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
296
static void RelationBuildTupleDesc(Relation relation);
297
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
298
static void RelationInitPhysicalAddr(Relation relation);
299
static void load_critical_index(Oid indexoid, Oid heapoid);
300
static TupleDesc GetPgClassDescriptor(void);
301
static TupleDesc GetPgIndexDescriptor(void);
302 303
static void AttrDefaultFetch(Relation relation, int ndef);
static int	AttrDefaultCmp(const void *a, const void *b);
304
static void CheckConstraintFetch(Relation relation);
305
static int	CheckConstraintCmp(const void *a, const void *b);
306
static void InitIndexAmRoutine(Relation relation);
307
static void IndexSupportInitialize(oidvector *indclass,
Tom Lane's avatar
Tom Lane committed
308 309 310 311 312
								   RegProcedure *indexSupport,
								   Oid *opFamily,
								   Oid *opcInType,
								   StrategyNumber maxSupportNumber,
								   AttrNumber maxAttributeNumber);
313
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
Tom Lane's avatar
Tom Lane committed
314
										  StrategyNumber numSupport);
315
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
316
static void unlink_initfile(const char *initfilename, int elevel);
317

318

319
/*
320
 *		ScanPgRelation
321
 *
322 323 324
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
325 326 327 328
 *		scenarios; it isn't guaranteed that all scans used to build the
 *		relcache entry will use the same snapshot.  If, for example,
 *		an attribute were to be added after scanning pg_class and before
 *		scanning pg_attribute, relnatts wouldn't match.
329 330 331
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
332
 */
333
static HeapTuple
Robert Haas's avatar
Robert Haas committed
334
ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
335
{
336 337
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
338
	SysScanDesc pg_class_scan;
339
	ScanKeyData key[1];
340
	Snapshot	snapshot = NULL;
341

342 343 344 345 346 347 348 349 350
	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

351
	/*
352
	 * form a scan key
353
	 */
354
	ScanKeyInit(&key[0],
355
				Anum_pg_class_oid,
356 357
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
358

359
	/*
Bruce Momjian's avatar
Bruce Momjian committed
360
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
361 362 363
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
364
	 */
365
	pg_class_desc = table_open(RelationRelationId, AccessShareLock);
Robert Haas's avatar
Robert Haas committed
366 367 368 369

	/*
	 * The caller might need a tuple that's newer than the one the historic
	 * snapshot; currently the only case requiring to do so is looking up the
370
	 * relfilenode of non mapped system relations during decoding. That
371
	 * snapshot can't change in the midst of a relcache build, so there's no
372
	 * need to register the snapshot.
Robert Haas's avatar
Robert Haas committed
373 374 375 376
	 */
	if (force_non_historic)
		snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);

377
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
378
									   indexOK && criticalRelcachesBuilt,
Robert Haas's avatar
Robert Haas committed
379
									   snapshot,
380
									   1, key);
381

382
	pg_class_tuple = systable_getnext(pg_class_scan);
383

Hiroshi Inoue's avatar
Hiroshi Inoue committed
384
	/*
385
	 * Must copy tuple before releasing buffer.
Hiroshi Inoue's avatar
Hiroshi Inoue committed
386
	 */
387 388
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
389

390 391
	/* all done */
	systable_endscan(pg_class_scan);
392
	table_close(pg_class_desc, AccessShareLock);
393

394
	return pg_class_tuple;
395 396
}

397
/*
398
 *		AllocateRelationDesc
399
 *
400
 *		This is used to allocate memory for a new relation descriptor
401
 *		and initialize the rd_rel field from the given pg_class tuple.
402
 */
403
static Relation
404
AllocateRelationDesc(Form_pg_class relp)
405
{
406
	Relation	relation;
407
	MemoryContext oldcxt;
408
	Form_pg_class relationForm;
409

410 411
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
412

413
	/*
414
	 * allocate and zero space for new relation descriptor
415
	 */
416
	relation = (Relation) palloc0(sizeof(RelationData));
417

418
	/* make sure relation is marked as having no open file yet */
419
	relation->rd_smgr = NULL;
420

421
	/*
422
	 * Copy the relation tuple form
423
	 *
Bruce Momjian's avatar
Bruce Momjian committed
424 425
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
426 427
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
Bruce Momjian's avatar
Bruce Momjian committed
428 429 430 431
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
432 433
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
434

435
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
436 437

	/* initialize relation tuple form */
438
	relation->rd_rel = relationForm;
439

440
	/* and allocate attribute tuple form storage */
441
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
442 443
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
444 445 446

	MemoryContextSwitchTo(oldcxt);

447
	return relation;
448 449
}

450
/*
451 452 453 454 455
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
456
 * Note: rd_rel and (if an index) rd_indam must be valid already
457 458
 */
static void
459
RelationParseRelOptions(Relation relation, HeapTuple tuple)
460
{
461
	bytea	   *options;
Alvaro Herrera's avatar
Alvaro Herrera committed
462
	amoptions_function amoptsfn;
463

464
	relation->rd_options = NULL;
465

Alvaro Herrera's avatar
Alvaro Herrera committed
466 467 468 469
	/*
	 * Look up any AM-specific parse function; fall out if relkind should not
	 * have options.
	 */
470 471
	switch (relation->rd_rel->relkind)
	{
472 473
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
474
		case RELKIND_VIEW:
475
		case RELKIND_MATVIEW:
476
		case RELKIND_PARTITIONED_TABLE:
Alvaro Herrera's avatar
Alvaro Herrera committed
477 478 479 480
			amoptsfn = NULL;
			break;
		case RELKIND_INDEX:
		case RELKIND_PARTITIONED_INDEX:
481
			amoptsfn = relation->rd_indam->amoptions;
482 483 484
			break;
		default:
			return;
485 486
	}

487
	/*
Bruce Momjian's avatar
Bruce Momjian committed
488 489 490
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
491
	 */
Alvaro Herrera's avatar
Alvaro Herrera committed
492
	options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
493

494 495 496 497 498 499
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
500 501 502 503 504
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
505
		pfree(options);
506 507 508
	}
}

509
/*
510
 *		RelationBuildTupleDesc
511
 *
512
 *		Form the relation's tuple descriptor from information in
513
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
514 515
 */
static void
516
RelationBuildTupleDesc(Relation relation)
517
{
518 519
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
520 521
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
522
	int			need;
523
	TupleConstr *constr;
524
	AttrMissing *attrmiss = NULL;
525
	int			ndef = 0;
526

527 528 529 530
	/* fill rd_att's type ID fields (compare heap.c's AddNewRelationTuple) */
	relation->rd_att->tdtypeid =
		relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
	relation->rd_att->tdtypmod = -1;	/* just to be sure */
531

532 533
	constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
													sizeof(TupleConstr));
Hiroshi Inoue's avatar
Hiroshi Inoue committed
534
	constr->has_not_null = false;
Peter Eisentraut's avatar
Peter Eisentraut committed
535
	constr->has_generated_stored = false;
536

537
	/*
538
	 * Form a scan key that selects only user attributes (attnum > 0).
539 540
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
541
	 */
542 543 544 545 546 547 548 549
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
550

551
	/*
Bruce Momjian's avatar
Bruce Momjian committed
552
	 * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
553 554
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
555
	 */
556
	pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
557
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
558
										   AttributeRelidNumIndexId,
559
										   criticalRelcachesBuilt,
560
										   NULL,
561
										   2, skey);
562

563
	/*
564
	 * add attribute data to relation->rd_att
565
	 */
566
	need = RelationGetNumberOfAttributes(relation);
567

568
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
569
	{
570
		Form_pg_attribute attp;
571
		int			attnum;
572

573
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
574

575
		attnum = attp->attnum;
576
		if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
577
			elog(ERROR, "invalid attribute number %d for relation \"%s\"",
578 579
				 attp->attnum, RelationGetRelationName(relation));

580
		memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
581
			   attp,
582
			   ATTRIBUTE_FIXED_PART_SIZE);
583

584 585
		/* Update constraint/default info */
		if (attp->attnotnull)
586
			constr->has_not_null = true;
Peter Eisentraut's avatar
Peter Eisentraut committed
587 588
		if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
			constr->has_generated_stored = true;
589 590
		if (attp->atthasdef)
			ndef++;
591

592
		/* If the column has a "missing" value, put it in the attrmiss array */
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
		if (attp->atthasmissing)
		{
			Datum		missingval;
			bool		missingNull;

			/* Do we have a missing value? */
			missingval = heap_getattr(pg_attribute_tuple,
									  Anum_pg_attribute_attmissingval,
									  pg_attribute_desc->rd_att,
									  &missingNull);
			if (!missingNull)
			{
				/* Yes, fetch from the array */
				MemoryContext oldcxt;
				bool		is_null;
				int			one = 1;
				Datum		missval;

				if (attrmiss == NULL)
					attrmiss = (AttrMissing *)
						MemoryContextAllocZero(CacheMemoryContext,
											   relation->rd_rel->relnatts *
											   sizeof(AttrMissing));

				missval = array_get_element(missingval,
											1,
											&one,
											-1,
											attp->attlen,
											attp->attbyval,
											attp->attalign,
											&is_null);
				Assert(!is_null);
				if (attp->attbyval)
				{
					/* for copy by val just copy the datum direct */
629
					attrmiss[attnum - 1].am_value = missval;
630 631 632 633 634
				}
				else
				{
					/* otherwise copy in the correct context */
					oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
635 636 637
					attrmiss[attnum - 1].am_value = datumCopy(missval,
															  attp->attbyval,
															  attp->attlen);
638 639
					MemoryContextSwitchTo(oldcxt);
				}
640
				attrmiss[attnum - 1].am_present = true;
641 642
			}
		}
643 644 645
		need--;
		if (need == 0)
			break;
646
	}
647

648
	/*
649
	 * end the scan and close the attribute relation
650
	 */
651
	systable_endscan(pg_attribute_scan);
652
	table_close(pg_attribute_desc, AccessShareLock);
Hiroshi Inoue's avatar
Hiroshi Inoue committed
653

654
	if (need != 0)
655
		elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
656 657
			 need, RelationGetRelid(relation));

658
	/*
659
	 * The attcacheoff values we read from pg_attribute should all be -1
Bruce Momjian's avatar
Bruce Momjian committed
660
	 * ("unknown").  Verify this if assert checking is on.  They will be
661
	 * computed when and if needed during tuple access.
662 663 664
	 */
#ifdef USE_ASSERT_CHECKING
	{
665
		int			i;
666

667
		for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
668
			Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
669 670 671
	}
#endif

672
	/*
673
	 * However, we can easily set the attcacheoff value for the first
Bruce Momjian's avatar
Bruce Momjian committed
674
	 * attribute: it must be zero.  This eliminates the need for special cases
675
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
676
	 */
677
	if (RelationGetNumberOfAttributes(relation) > 0)
678
		TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
679

680 681 682
	/*
	 * Set up constraint/default info
	 */
683 684 685 686
	if (constr->has_not_null ||
		constr->has_generated_stored ||
		ndef > 0 ||
		attrmiss ||
687
		relation->rd_rel->relchecks > 0)
688
	{
689
		relation->rd_att->constr = constr;
690

691
		if (ndef > 0)			/* DEFAULTs */
692
			AttrDefaultFetch(relation, ndef);
693 694
		else
			constr->num_defval = 0;
695

696 697
		constr->missing = attrmiss;

698
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
699
			CheckConstraintFetch(relation);
700 701 702 703 704 705 706
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
707
	}
708 709
}

710
/*
711
 *		RelationBuildRuleLock
712
 *
713 714
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
715 716 717 718 719 720 721
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
Bruce Momjian's avatar
Bruce Momjian committed
722
 * manageable.  The other subsidiary data structures are simple enough
723
 * to be easy to free explicitly, anyway.
724 725 726 727
 */
static void
RelationBuildRuleLock(Relation relation)
{
728 729
	MemoryContext rulescxt;
	MemoryContext oldcxt;
730 731 732 733
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
	SysScanDesc rewrite_scan;
734 735 736 737 738
	ScanKeyData key;
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
739

740
	/*
741
	 * Make the private context.  Assume it'll not contain much data.
742
	 */
743 744 745
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 "relation rules",
									 ALLOCSET_SMALL_SIZES);
746
	relation->rd_rulescxt = rulescxt;
747
	MemoryContextCopyAndSetIdentifier(rulescxt,
748
									  RelationGetRelationName(relation));
749

750
	/*
751 752
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
753 754
	 */
	maxlocks = 4;
755 756
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
757 758
	numlocks = 0;

759
	/*
760
	 * form a scan key
761
	 */
762 763 764 765
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
766

767
	/*
768
	 * open pg_rewrite and begin a scan
769
	 *
770 771
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
Bruce Momjian's avatar
Bruce Momjian committed
772 773
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
774
	 */
775
	rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
776
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
Bruce Momjian's avatar
Bruce Momjian committed
777
	rewrite_scan = systable_beginscan(rewrite_desc,
778
									  RewriteRelRulenameIndexId,
779
									  true, NULL,
780 781 782
									  1, &key);

	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
783
	{
784
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
785
		bool		isnull;
786 787
		Datum		rule_datum;
		char	   *rule_str;
788
		RewriteRule *rule;
789

790 791
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
792

793
		rule->ruleId = rewrite_form->oid;
794

795
		rule->event = rewrite_form->ev_type - '0';
796
		rule->enabled = rewrite_form->ev_enabled;
797 798
		rule->isInstead = rewrite_form->is_instead;

799
		/*
Bruce Momjian's avatar
Bruce Momjian committed
800 801 802 803
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
804 805
		 */
		rule_datum = heap_getattr(rewrite_tuple,
806
								  Anum_pg_rewrite_ev_action,
807
								  rewrite_tupdesc,
Bruce Momjian's avatar
Bruce Momjian committed
808
								  &isnull);
809
		Assert(!isnull);
810
		rule_str = TextDatumGetCString(rule_datum);
811
		oldcxt = MemoryContextSwitchTo(rulescxt);
812
		rule->actions = (List *) stringToNode(rule_str);
813
		MemoryContextSwitchTo(oldcxt);
814
		pfree(rule_str);
815

816 817 818 819
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
820
		Assert(!isnull);
821
		rule_str = TextDatumGetCString(rule_datum);
822
		oldcxt = MemoryContextSwitchTo(rulescxt);
823
		rule->qual = (Node *) stringToNode(rule_str);
824
		MemoryContextSwitchTo(oldcxt);
825
		pfree(rule_str);
826

827 828
		/*
		 * We want the rule's table references to be checked as though by the
Bruce Momjian's avatar
Bruce Momjian committed
829
		 * table owner, not the user referencing the rule.  Therefore, scan
830
		 * through the rule's actions and set the checkAsUser field on all
Bruce Momjian's avatar
Bruce Momjian committed
831
		 * rtable entries.  We have to look at the qual as well, in case it
832 833
		 * contains sublinks.
		 *
Bruce Momjian's avatar
Bruce Momjian committed
834 835 836 837 838
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
839 840 841 842
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

843
		if (numlocks >= maxlocks)
844 845
		{
			maxlocks *= 2;
846 847
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
848
		}
849
		rules[numlocks++] = rule;
850
	}
851

852
	/*
853
	 * end the scan and close the attribute relation
854
	 */
855
	systable_endscan(rewrite_scan);
856
	table_close(rewrite_desc, AccessShareLock);
857

858 859 860 861 862 863 864 865 866 867 868
	/*
	 * there might not be any rules (if relhasrules is out-of-date)
	 */
	if (numlocks == 0)
	{
		relation->rd_rules = NULL;
		relation->rd_rulescxt = NULL;
		MemoryContextDelete(rulescxt);
		return;
	}

869
	/*
870
	 * form a RuleLock and insert into relation
871
	 */
872
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
873 874 875 876
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
877 878
}

879
/*
880 881 882 883 884 885 886 887 888
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
889
	int			i;
890

891
	/*
Bruce Momjian's avatar
Bruce Momjian committed
892
	 * As of 7.3 we assume the rule ordering is repeatable, because
893 894
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
895
	 */
896 897 898 899 900 901 902 903 904
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
905 906 907
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
908 909 910
				return false;
			if (rule1->event != rule2->event)
				return false;
911 912
			if (rule1->enabled != rule2->enabled)
				return false;
913 914
			if (rule1->isInstead != rule2->isInstead)
				return false;
915
			if (!equal(rule1->qual, rule2->qual))
916
				return false;
917
			if (!equal(rule1->actions, rule2->actions))
918 919 920 921 922 923
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
924 925
}

926 927 928 929 930 931 932 933 934
/*
 *		equalPolicy
 *
 *		Determine whether two policies are equivalent
 */
static bool
equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
{
	int			i;
Peter Eisentraut's avatar
Peter Eisentraut committed
935 936
	Oid		   *r1,
			   *r2;
937 938 939 940 941 942

	if (policy1 != NULL)
	{
		if (policy2 == NULL)
			return false;

943
		if (policy1->polcmd != policy2->polcmd)
944
			return false;
945
		if (policy1->hassublinks != policy2->hassublinks)
946
			return false;
Bruce Momjian's avatar
Bruce Momjian committed
947
		if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
948 949 950 951 952 953 954 955 956 957 958 959 960
			return false;
		if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
			return false;

		r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
		r2 = (Oid *) ARR_DATA_PTR(policy2->roles);

		for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
		{
			if (r1[i] != r2[i])
				return false;
		}

961
		if (!equal(policy1->qual, policy2->qual))
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
			return false;
		if (!equal(policy1->with_check_qual, policy2->with_check_qual))
			return false;
	}
	else if (policy2 != NULL)
		return false;

	return true;
}

/*
 *		equalRSDesc
 *
 *		Determine whether two RowSecurityDesc's are equivalent
 */
static bool
equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
{
Bruce Momjian's avatar
Bruce Momjian committed
980 981
	ListCell   *lc,
			   *rc;
982 983 984 985 986 987 988 989 990 991 992 993 994 995

	if (rsdesc1 == NULL && rsdesc2 == NULL)
		return true;

	if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
		(rsdesc1 == NULL && rsdesc2 != NULL))
		return false;

	if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
		return false;

	/* RelationBuildRowSecurity should build policies in order */
	forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
	{
Bruce Momjian's avatar
Bruce Momjian committed
996 997
		RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
		RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
998

Bruce Momjian's avatar
Bruce Momjian committed
999
		if (!equalPolicy(l, r))
1000 1001 1002
			return false;
	}

1003
	return true;
1004
}
1005

1006
/*
1007 1008
 *		RelationBuildDesc
 *
1009
 *		Build a relation descriptor.  The caller must hold at least
1010
 *		AccessShareLock on the target relid.
1011
 *
1012 1013
 *		The new descriptor is inserted into the hash table if insertIt is true.
 *
1014 1015 1016
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1017
 */
1018
static Relation
1019
RelationBuildDesc(Oid targetRelId, bool insertIt)
1020
{
1021
	int			in_progress_offset;
1022 1023
	Relation	relation;
	Oid			relid;
1024
	HeapTuple	pg_class_tuple;
1025
	Form_pg_class relp;
1026

1027 1028 1029 1030 1031 1032
	/*
	 * This function and its subroutines can allocate a good deal of transient
	 * data in CurrentMemoryContext.  Traditionally we've just leaked that
	 * data, reasoning that the caller's context is at worst of transaction
	 * scope, and relcache loads shouldn't happen so often that it's essential
	 * to recover transient data before end of statement/transaction.  However
1033 1034
	 * that's definitely not true when debug_discard_caches is active, and
	 * perhaps it's not true in other cases.
1035
	 *
1036
	 * When debug_discard_caches is active or when forced to by
1037
	 * RECOVER_RELATION_BUILD_MEMORY=1, arrange to allocate the junk in a
1038 1039 1040
	 * temporary context that we'll free before returning.  Make it a child of
	 * caller's context so that it will get cleaned up appropriately if we
	 * error out partway through.
1041
	 */
1042 1043 1044
#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
	MemoryContext tmpcxt = NULL;
	MemoryContext oldcxt = NULL;
1045

1046
	if (RECOVER_RELATION_BUILD_MEMORY || debug_discard_caches > 0)
1047 1048 1049 1050 1051 1052
	{
		tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
									   "RelationBuildDesc workspace",
									   ALLOCSET_DEFAULT_SIZES);
		oldcxt = MemoryContextSwitchTo(tmpcxt);
	}
1053 1054
#endif

1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
	/* Register to catch invalidation messages */
	if (in_progress_list_len >= in_progress_list_maxlen)
	{
		int			allocsize;

		allocsize = in_progress_list_maxlen * 2;
		in_progress_list = repalloc(in_progress_list,
									allocsize * sizeof(*in_progress_list));
		in_progress_list_maxlen = allocsize;
	}
	in_progress_offset = in_progress_list_len++;
	in_progress_list[in_progress_offset].reloid = targetRelId;
retry:
	in_progress_list[in_progress_offset].invalidated = false;

1070
	/*
1071
	 * find the tuple in pg_class corresponding to the given relation id
1072
	 */
Robert Haas's avatar
Robert Haas committed
1073
	pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1074

1075
	/*
1076
	 * if no such tuple exists, return NULL
1077 1078
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
1079
	{
1080 1081 1082 1083 1084 1085 1086
#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
		if (tmpcxt)
		{
			/* Return to caller's context, and blow away the temporary context */
			MemoryContextSwitchTo(oldcxt);
			MemoryContextDelete(tmpcxt);
		}
1087
#endif
1088 1089
		Assert(in_progress_offset + 1 == in_progress_list_len);
		in_progress_list_len--;
1090
		return NULL;
1091
	}
1092

1093
	/*
1094
	 * get information from the pg_class_tuple
1095 1096
	 */
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1097
	relid = relp->oid;
1098
	Assert(relid == targetRelId);
1099

1100
	/*
1101 1102
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
	 * to relation->rd_rel.
1103
	 */
1104
	relation = AllocateRelationDesc(relp);
1105

1106
	/*
1107
	 * initialize the relation's relation id (relation->rd_id)
1108
	 */
1109
	RelationGetRelid(relation) = relid;
1110

1111
	/*
1112 1113
	 * Normal relations are not nailed into the cache.  Since we don't flush
	 * new relations, it won't be new.  It could be temp though.
1114
	 */
1115
	relation->rd_refcnt = 0;
1116
	relation->rd_isnailed = false;
1117
	relation->rd_createSubid = InvalidSubTransactionId;
1118
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1119 1120
	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;
1121
	switch (relation->rd_rel->relpersistence)
1122
	{
Robert Haas's avatar
Robert Haas committed
1123
		case RELPERSISTENCE_UNLOGGED:
1124 1125
		case RELPERSISTENCE_PERMANENT:
			relation->rd_backend = InvalidBackendId;
1126
			relation->rd_islocaltemp = false;
1127 1128
			break;
		case RELPERSISTENCE_TEMP:
1129
			if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1130
			{
1131
				relation->rd_backend = BackendIdForTempRelations();
1132 1133
				relation->rd_islocaltemp = true;
			}
1134 1135 1136
			else
			{
				/*
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
				 * If it's a temp table, but not one of ours, we have to use
				 * the slow, grotty method to figure out the owning backend.
				 *
				 * Note: it's possible that rd_backend gets set to MyBackendId
				 * here, in case we are looking at a pg_class entry left over
				 * from a crashed backend that coincidentally had the same
				 * BackendId we're using.  We should *not* consider such a
				 * table to be "ours"; this is why we need the separate
				 * rd_islocaltemp flag.  The pg_class entry will get flushed
				 * if/when we clean out the corresponding temp table namespace
				 * in preparation for using it.
1148 1149 1150 1151
				 */
				relation->rd_backend =
					GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
				Assert(relation->rd_backend != InvalidBackendId);
1152
				relation->rd_islocaltemp = false;
1153 1154 1155 1156 1157 1158
			}
			break;
		default:
			elog(ERROR, "invalid relpersistence: %c",
				 relation->rd_rel->relpersistence);
			break;
1159
	}
1160

1161
	/*
1162
	 * initialize the tuple descriptor (relation->rd_att).
1163
	 */
1164
	RelationBuildTupleDesc(relation);
1165

1166
	/*
1167
	 * Fetch rules and triggers that affect this relation
1168
	 */
1169
	if (relation->rd_rel->relhasrules)
1170 1171
		RelationBuildRuleLock(relation);
	else
1172
	{
1173
		relation->rd_rules = NULL;
1174 1175
		relation->rd_rulescxt = NULL;
	}
1176

1177
	if (relation->rd_rel->relhastriggers)
1178 1179 1180 1181
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1182
	if (relation->rd_rel->relrowsecurity)
1183 1184
		RelationBuildRowSecurity(relation);
	else
1185
		relation->rd_rsdesc = NULL;
1186

1187 1188 1189 1190
	/* foreign key data is not loaded till asked for */
	relation->rd_fkeylist = NIL;
	relation->rd_fkeyvalid = false;

1191 1192 1193 1194
	/* partitioning data is not loaded till asked for */
	relation->rd_partkey = NULL;
	relation->rd_partkeycxt = NULL;
	relation->rd_partdesc = NULL;
1195 1196
	relation->rd_partdesc_nodetached = NULL;
	relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
1197
	relation->rd_pdcxt = NULL;
1198
	relation->rd_pddcxt = NULL;
1199 1200 1201
	relation->rd_partcheck = NIL;
	relation->rd_partcheckvalid = false;
	relation->rd_partcheckcxt = NULL;
1202

1203
	/*
1204
	 * initialize access method information
1205
	 */
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
	switch (relation->rd_rel->relkind)
	{
		case RELKIND_INDEX:
		case RELKIND_PARTITIONED_INDEX:
			Assert(relation->rd_rel->relam != InvalidOid);
			RelationInitIndexAccessInfo(relation);
			break;
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
		case RELKIND_MATVIEW:
			Assert(relation->rd_rel->relam != InvalidOid);
			RelationInitTableAccessMethod(relation);
			break;
		case RELKIND_SEQUENCE:
			Assert(relation->rd_rel->relam == InvalidOid);
			RelationInitTableAccessMethod(relation);
			break;
		case RELKIND_VIEW:
		case RELKIND_COMPOSITE_TYPE:
		case RELKIND_FOREIGN_TABLE:
		case RELKIND_PARTITIONED_TABLE:
			Assert(relation->rd_rel->relam == InvalidOid);
			break;
	}
1230

1231 1232 1233
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1234
	/*
1235
	 * initialize the relation lock manager information
1236
	 */
Tom Lane's avatar
Tom Lane committed
1237
	RelationInitLockInfo(relation); /* see lmgr.c */
1238

1239 1240 1241 1242
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1243

1244
	/* make sure relation is marked as having no open file yet */
1245
	relation->rd_smgr = NULL;
1246

1247 1248 1249 1250 1251
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
	/*
	 * If an invalidation arrived mid-build, start over.  Between here and the
	 * end of this function, don't add code that does or reasonably could read
	 * system catalogs.  That range must be free from invalidation processing
	 * for the !insertIt case.  For the insertIt case, RelationCacheInsert()
	 * will enroll this relation in ordinary relcache invalidation processing,
	 */
	if (in_progress_list[in_progress_offset].invalidated)
	{
		RelationDestroyRelation(relation, false);
		goto retry;
	}
	Assert(in_progress_offset + 1 == in_progress_list_len);
	in_progress_list_len--;

1267
	/*
1268
	 * Insert newly created relation into relcache hash table, if requested.
1269 1270 1271 1272 1273 1274 1275 1276 1277
	 *
	 * There is one scenario in which we might find a hashtable entry already
	 * present, even though our caller failed to find it: if the relation is a
	 * system catalog or index that's used during relcache load, we might have
	 * recursively created the same relcache entry during the preceding steps.
	 * So allow RelationCacheInsert to delete any already-present relcache
	 * entry for the same OID.  The already-present entry should have refcount
	 * zero (else somebody forgot to close it); in the event that it doesn't,
	 * we'll elog a WARNING and leak the already-present entry.
1278
	 */
1279
	if (insertIt)
1280
		RelationCacheInsert(relation, true);
1281

1282 1283 1284
	/* It's fully valid */
	relation->rd_isvalid = true;

1285 1286 1287 1288 1289 1290 1291
#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
	if (tmpcxt)
	{
		/* Return to caller's context, and blow away the temporary context */
		MemoryContextSwitchTo(oldcxt);
		MemoryContextDelete(tmpcxt);
	}
1292 1293
#endif

1294
	return relation;
1295 1296
}

1297 1298
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
1299 1300 1301 1302
 *
 * Note: at the physical level, relations in the pg_global tablespace must
 * be treated as shared, even if relisshared isn't set.  Hence we do not
 * look at relisshared here.
1303 1304 1305 1306
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
1307 1308
	Oid			oldnode = relation->rd_node.relNode;

1309 1310 1311 1312
	/* these relations kinds never have storage */
	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
		return;

1313 1314 1315 1316
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
1317
	if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1318 1319 1320
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
Robert Haas's avatar
Robert Haas committed
1321

1322
	if (relation->rd_rel->relfilenode)
Robert Haas's avatar
Robert Haas committed
1323 1324
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
1325 1326 1327 1328 1329
		 * Even if we are using a decoding snapshot that doesn't represent the
		 * current state of the catalog we need to make sure the filenode
		 * points to the current file since the older file will be gone (or
		 * truncated). The new file will still contain older rows so lookups
		 * in them will work correctly. This wouldn't work correctly if
1330
		 * rewrites were allowed to change the schema in an incompatible way,
Bruce Momjian's avatar
Bruce Momjian committed
1331 1332
		 * but those are prevented both on catalog tables and on user tables
		 * declared as additional catalog tables.
Robert Haas's avatar
Robert Haas committed
1333 1334 1335 1336 1337
		 */
		if (HistoricSnapshotActive()
			&& RelationIsAccessibleInLogicalDecoding(relation)
			&& IsTransactionState())
		{
Bruce Momjian's avatar
Bruce Momjian committed
1338 1339
			HeapTuple	phys_tuple;
			Form_pg_class physrel;
Robert Haas's avatar
Robert Haas committed
1340 1341

			phys_tuple = ScanPgRelation(RelationGetRelid(relation),
Tom Lane's avatar
Tom Lane committed
1342
										RelationGetRelid(relation) != ClassOidIndexId,
Robert Haas's avatar
Robert Haas committed
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
										true);
			if (!HeapTupleIsValid(phys_tuple))
				elog(ERROR, "could not find pg_class entry for %u",
					 RelationGetRelid(relation));
			physrel = (Form_pg_class) GETSTRUCT(phys_tuple);

			relation->rd_rel->reltablespace = physrel->reltablespace;
			relation->rd_rel->relfilenode = physrel->relfilenode;
			heap_freetuple(phys_tuple);
		}

1354
		relation->rd_node.relNode = relation->rd_rel->relfilenode;
Robert Haas's avatar
Robert Haas committed
1355
	}
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
	else
	{
		/* Consult the relation mapper */
		relation->rd_node.relNode =
			RelationMapOidToFilenode(relation->rd_id,
									 relation->rd_rel->relisshared);
		if (!OidIsValid(relation->rd_node.relNode))
			elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
				 RelationGetRelationName(relation), relation->rd_id);
	}
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378

	/*
	 * For RelationNeedsWAL() to answer correctly on parallel workers, restore
	 * rd_firstRelfilenodeSubid.  No subtransactions start or end while in
	 * parallel mode, so the specific SubTransactionId does not matter.
	 */
	if (IsParallelWorker() && oldnode != relation->rd_node.relNode)
	{
		if (RelFileNodeSkippingWAL(relation->rd_node))
			relation->rd_firstRelfilenodeSubid = TopSubTransactionId;
		else
			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	}
1379 1380
}

1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
/*
 * Fill in the IndexAmRoutine for an index relation.
 *
 * relation's rd_amhandler and rd_indexcxt must be valid already.
 */
static void
InitIndexAmRoutine(Relation relation)
{
	IndexAmRoutine *cached,
			   *tmp;

	/*
	 * Call the amhandler in current, short-lived memory context, just in case
	 * it leaks anything (it probably won't, but let's be paranoid).
	 */
	tmp = GetIndexAmRoutine(relation->rd_amhandler);

	/* OK, now transfer the data into relation's rd_indexcxt. */
	cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
												   sizeof(IndexAmRoutine));
	memcpy(cached, tmp, sizeof(IndexAmRoutine));
1402
	relation->rd_indam = cached;
1403 1404 1405 1406

	pfree(tmp);
}

1407 1408 1409 1410 1411
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1412
{
1413 1414
	HeapTuple	tuple;
	Form_pg_am	aform;
1415
	Datum		indcollDatum;
1416
	Datum		indclassDatum;
1417
	Datum		indoptionDatum;
1418
	bool		isnull;
1419
	oidvector  *indcoll;
1420
	oidvector  *indclass;
Bruce Momjian's avatar
Bruce Momjian committed
1421
	int2vector *indoption;
1422
	MemoryContext indexcxt;
1423
	MemoryContext oldcontext;
1424 1425
	int			indnatts;
	int			indnkeyatts;
1426
	uint16		amsupport;
1427 1428

	/*
1429
	 * Make a copy of the pg_index entry for the index.  Since pg_index
1430 1431
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1432
	 */
1433
	tuple = SearchSysCache1(INDEXRELID,
Bruce Momjian's avatar
Bruce Momjian committed
1434
							ObjectIdGetDatum(RelationGetRelid(relation)));
1435
	if (!HeapTupleIsValid(tuple))
1436
		elog(ERROR, "cache lookup failed for index %u",
1437
			 RelationGetRelid(relation));
1438 1439 1440 1441
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1442 1443 1444
	ReleaseSysCache(tuple);

	/*
1445
	 * Look up the index's access method, save the OID of its handler function
1446
	 */
1447
	tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1448
	if (!HeapTupleIsValid(tuple))
1449
		elog(ERROR, "cache lookup failed for access method %u",
1450
			 relation->rd_rel->relam);
1451 1452
	aform = (Form_pg_am) GETSTRUCT(tuple);
	relation->rd_amhandler = aform->amhandler;
1453
	ReleaseSysCache(tuple);
1454

1455 1456
	indnatts = RelationGetNumberOfAttributes(relation);
	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
1457
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1458
			 RelationGetRelid(relation));
1459
	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
1460

1461
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1462
	 * Make the private context to hold index access info.  The reason we need
1463 1464
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1465
	 */
1466 1467 1468
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 "index info",
									 ALLOCSET_SMALL_SIZES);
1469
	relation->rd_indexcxt = indexcxt;
1470
	MemoryContextCopyAndSetIdentifier(indexcxt,
1471
									  RelationGetRelationName(relation));
1472 1473

	/*
1474
	 * Now we can fetch the index AM's API struct
1475
	 */
1476
	InitIndexAmRoutine(relation);
1477

1478
	/*
1479 1480
	 * Allocate arrays to hold data. Opclasses are not used for included
	 * columns, so allocate them for indnkeyatts only.
1481
	 */
1482
	relation->rd_opfamily = (Oid *)
1483
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1484
	relation->rd_opcintype = (Oid *)
1485
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1486

1487
	amsupport = relation->rd_indam->amsupport;
1488
	if (amsupport > 0)
1489
	{
1490
		int			nsupport = indnatts * amsupport;
1491

1492
		relation->rd_support = (RegProcedure *)
1493
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1494
		relation->rd_supportinfo = (FmgrInfo *)
1495
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1496 1497
	}
	else
1498
	{
1499 1500
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1501
	}
1502

1503
	relation->rd_indcollation = (Oid *)
1504
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1505

1506
	relation->rd_indoption = (int16 *)
1507
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
1508

1509
	/*
1510
	 * indcollation cannot be referenced directly through the C struct,
Bruce Momjian's avatar
Bruce Momjian committed
1511
	 * because it comes after the variable-width indkey field.  Must extract
1512
	 * the datum the hard way...
1513 1514 1515 1516 1517 1518 1519
	 */
	indcollDatum = fastgetattr(relation->rd_indextuple,
							   Anum_pg_index_indcollation,
							   GetPgIndexDescriptor(),
							   &isnull);
	Assert(!isnull);
	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1520
	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
1521

1522 1523
	/*
	 * indclass cannot be referenced directly through the C struct, because it
Bruce Momjian's avatar
Bruce Momjian committed
1524 1525
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1526 1527 1528 1529 1530 1531 1532
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1533

1534
	/*
1535
	 * Fill the support procedure OID array, as well as the info about
Bruce Momjian's avatar
Bruce Momjian committed
1536
	 * opfamilies and opclass input types.  (aminfo and supportinfo are left
1537
	 * as zeroes, and are filled on-the-fly when used)
1538
	 */
1539
	IndexSupportInitialize(indclass, relation->rd_support,
1540
						   relation->rd_opfamily, relation->rd_opcintype,
1541
						   amsupport, indnkeyatts);
1542

1543 1544 1545 1546 1547 1548 1549 1550 1551
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1552
	memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
1553

1554 1555
	(void) RelationGetIndexAttOptions(relation, false);

1556
	/*
1557
	 * expressions, predicate, exclusion caches will be filled later
1558 1559 1560
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1561 1562 1563
	relation->rd_exclops = NULL;
	relation->rd_exclprocs = NULL;
	relation->rd_exclstrats = NULL;
1564
	relation->rd_amcache = NULL;
1565 1566
}

1567
/*
1568
 * IndexSupportInitialize
1569
 *		Initializes an index's cached opclass information,
1570
 *		given the index's pg_index.indclass entry.
1571
 *
1572 1573
 * Data is returned into *indexSupport, *opFamily, and *opcInType,
 * which are arrays allocated by the caller.
1574
 *
1575 1576 1577 1578
 * The caller also passes maxSupportNumber and maxAttributeNumber, since these
 * indicate the size of the arrays it has allocated --- but in practice these
 * numbers must always match those obtainable from the system catalog entries
 * for the index and access method.
1579 1580
 */
static void
1581
IndexSupportInitialize(oidvector *indclass,
1582
					   RegProcedure *indexSupport,
1583 1584
					   Oid *opFamily,
					   Oid *opcInType,
1585 1586 1587 1588 1589 1590 1591 1592 1593
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1594
		if (!OidIsValid(indclass->values[attIndex]))
1595
			elog(ERROR, "bogus pg_index tuple");
1596 1597

		/* look up the info for this opclass, using a cache */
1598
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1599 1600
									 maxSupportNumber);

1601
		/* copy cached data into relcache entry */
1602 1603
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1604
		if (maxSupportNumber > 0)
1605
			memcpy(&indexSupport[attIndex * maxSupportNumber],
1606
				   opcentry->supportProcs,
1607
				   maxSupportNumber * sizeof(RegProcedure));
1608 1609 1610 1611 1612 1613 1614 1615 1616
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
1617
 * opclass in a single indexscan of pg_amproc.
1618
 *
1619
 * The information from pg_am about expected range of support function
1620 1621 1622
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1623 1624 1625
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
Bruce Momjian's avatar
Bruce Momjian committed
1626
 * a useless but harmless dead entry in the cache.  To support altering
1627 1628 1629
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1630 1631 1632 1633 1634 1635 1636
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1637 1638
	Relation	rel;
	SysScanDesc scan;
1639
	ScanKeyData skey[3];
1640 1641 1642 1643 1644 1645 1646 1647
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

1648 1649 1650 1651
		/* Also make sure CacheMemoryContext exists */
		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

1652 1653 1654
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
		OpClassCache = hash_create("Operator class cache", 64,
1655
								   &ctl, HASH_ELEM | HASH_BLOBS);
1656 1657 1658 1659 1660 1661
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1662 1663
	if (!found)
	{
1664
		/* Initialize new entry */
1665 1666
		opcentry->valid = false;	/* until known OK */
		opcentry->numSupport = numSupport;
1667
		opcentry->supportProcs = NULL;	/* filled below */
1668 1669
	}
	else
1670 1671 1672 1673
	{
		Assert(numSupport == opcentry->numSupport);
	}

1674
	/*
1675 1676 1677 1678 1679 1680
	 * When aggressively testing cache-flush hazards, we disable the operator
	 * class cache and force reloading of the info on each call.  This models
	 * no real-world behavior, since the cache entries are never invalidated
	 * otherwise.  However it can be helpful for detecting bugs in the cache
	 * loading logic itself, such as reliance on a non-nailed index.  Given
	 * the limited use-case and the fact that this adds a great deal of
1681
	 * expense, we enable it only for high values of debug_discard_caches.
1682
	 */
1683 1684
#ifdef DISCARD_CACHES_ENABLED
	if (debug_discard_caches > 2)
1685
		opcentry->valid = false;
1686
#endif
1687

1688 1689
	if (opcentry->valid)
		return opcentry;
1690 1691

	/*
1692 1693 1694 1695 1696 1697 1698 1699 1700
	 * Need to fill in new entry.  First allocate space, unless we already did
	 * so in some previous attempt.
	 */
	if (opcentry->supportProcs == NULL && numSupport > 0)
		opcentry->supportProcs = (RegProcedure *)
			MemoryContextAllocZero(CacheMemoryContext,
								   numSupport * sizeof(RegProcedure));

	/*
1701 1702 1703
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1704 1705 1706 1707 1708
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1709 1710
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
1711
	 * opcintype, which are needed to look up related operators and functions.
1712 1713 1714 1715
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
1716
				Anum_pg_opclass_oid,
1717 1718
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
1719
	rel = table_open(OperatorClassRelationId, AccessShareLock);
1720
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1721
							  NULL, 1, skey);
1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
1734
	table_close(rel, AccessShareLock);
1735

1736
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1737
	 * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1738
	 * the default ones (those with lefttype = righttype = opcintype).
1739 1740 1741
	 */
	if (numSupport > 0)
	{
1742
		ScanKeyInit(&skey[0],
1743
					Anum_pg_amproc_amprocfamily,
1744
					BTEqualStrategyNumber, F_OIDEQ,
1745
					ObjectIdGetDatum(opcentry->opcfamily));
1746
		ScanKeyInit(&skey[1],
1747
					Anum_pg_amproc_amproclefttype,
1748
					BTEqualStrategyNumber, F_OIDEQ,
1749 1750 1751 1752 1753
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1754
		rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
1755
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1756
								  NULL, 3, skey);
1757 1758

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1759 1760 1761
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

1762
			if (amprocform->amprocnum <= 0 ||
1763
				(StrategyNumber) amprocform->amprocnum > numSupport)
1764
				elog(ERROR, "invalid amproc number %d for opclass %u",
1765 1766
					 amprocform->amprocnum, operatorClassOid);

1767 1768
			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
1769 1770
		}

1771
		systable_endscan(scan);
1772
		table_close(rel, AccessShareLock);
1773 1774 1775 1776 1777 1778
	}

	opcentry->valid = true;
	return opcentry;
}

1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790
/*
 * Fill in the TableAmRoutine for a relation
 *
 * relation's rd_amhandler must be valid already.
 */
static void
InitTableAmRoutine(Relation relation)
{
	relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
}

/*
1791
 * Initialize table access method support for a table like relation
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805
 */
void
RelationInitTableAccessMethod(Relation relation)
{
	HeapTuple	tuple;
	Form_pg_am	aform;

	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
	{
		/*
		 * Sequences are currently accessed like heap tables, but it doesn't
		 * seem prudent to show that in the catalog. So just overwrite it
		 * here.
		 */
1806
		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
1807 1808 1809 1810 1811 1812 1813
	}
	else if (IsCatalogRelation(relation))
	{
		/*
		 * Avoid doing a syscache lookup for catalog tables.
		 */
		Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
1814
		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
	}
	else
	{
		/*
		 * Look up the table access method, save the OID of its handler
		 * function.
		 */
		Assert(relation->rd_rel->relam != InvalidOid);
		tuple = SearchSysCache1(AMOID,
								ObjectIdGetDatum(relation->rd_rel->relam));
		if (!HeapTupleIsValid(tuple))
			elog(ERROR, "cache lookup failed for access method %u",
				 relation->rd_rel->relam);
		aform = (Form_pg_am) GETSTRUCT(tuple);
		relation->rd_amhandler = aform->amhandler;
		ReleaseSysCache(tuple);
	}

	/*
	 * Now we can fetch the table AM's API struct
	 */
	InitTableAmRoutine(relation);
}
1838 1839 1840 1841

/*
 *		formrdesc
 *
1842 1843
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1844
 *		The relation descriptor is built just from the supplied parameters,
1845 1846
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1847 1848
 *		catalogs.
 *
1849
 * The catalogs this is used for can't have constraints (except attnotnull),
1850
 * default values, rules, or triggers, since we don't cope with any of that.
1851 1852 1853
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1854
 *
1855
 * NOTE: we assume we are already switched into CacheMemoryContext.
1856 1857
 */
static void
1858
formrdesc(const char *relationName, Oid relationReltype,
1859
		  bool isshared,
1860
		  int natts, const FormData_pg_attribute *attrs)
1861
{
1862
	Relation	relation;
1863
	int			i;
1864
	bool		has_not_null;
1865

1866
	/*
1867
	 * allocate new relation desc, clear all fields of reldesc
1868
	 */
1869
	relation = (Relation) palloc0(sizeof(RelationData));
1870 1871

	/* make sure relation is marked as having no open file yet */
1872
	relation->rd_smgr = NULL;
1873

1874
	/*
1875
	 * initialize reference count: 1 because it is nailed in cache
1876
	 */
1877
	relation->rd_refcnt = 1;
1878

1879
	/*
1880 1881
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1882
	 */
1883
	relation->rd_isnailed = true;
1884
	relation->rd_createSubid = InvalidSubTransactionId;
1885
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1886 1887
	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;
1888
	relation->rd_backend = InvalidBackendId;
1889
	relation->rd_islocaltemp = false;
1890

1891
	/*
1892
	 * initialize relation tuple form
1893
	 *
1894
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
1895
	 * get us launched.  RelationCacheInitializePhase3() will read the real
Bruce Momjian's avatar
Bruce Momjian committed
1896 1897 1898
	 * data from pg_class and replace what we've done here.  Note in
	 * particular that relowner is left as zero; this cues
	 * RelationCacheInitializePhase3 that the real data isn't there yet.
1899
	 */
1900
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1901

1902 1903
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1904
	relation->rd_rel->reltype = relationReltype;
1905 1906

	/*
1907
	 * It's important to distinguish between shared and non-shared relations,
1908
	 * even at bootstrap time, to make sure we know where they are stored.
1909
	 */
1910 1911 1912
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1913

1914 1915
	/* formrdesc is used only for permanent relations */
	relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1916

1917 1918 1919
	/* ... and they're always populated, too */
	relation->rd_rel->relispopulated = true;

1920
	relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1921
	relation->rd_rel->relpages = 0;
1922
	relation->rd_rel->reltuples = -1;
1923
	relation->rd_rel->relallvisible = 0;
1924
	relation->rd_rel->relkind = RELKIND_RELATION;
1925
	relation->rd_rel->relnatts = (int16) natts;
1926
	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
1927

1928
	/*
1929
	 * initialize attribute tuple form
1930
	 *
Bruce Momjian's avatar
Bruce Momjian committed
1931
	 * Unlike the case with the relation tuple, this data had better be right
1932 1933
	 * because it will never be replaced.  The data comes from
	 * src/include/catalog/ headers via genbki.pl.
1934
	 */
1935
	relation->rd_att = CreateTemplateTupleDesc(natts);
1936 1937
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1938
	relation->rd_att->tdtypeid = relationReltype;
1939
	relation->rd_att->tdtypmod = -1;	/* just to be sure */
1940

1941
	/*
1942
	 * initialize tuple desc info
1943
	 */
1944
	has_not_null = false;
1945 1946
	for (i = 0; i < natts; i++)
	{
1947
		memcpy(TupleDescAttr(relation->rd_att, i),
1948
			   &attrs[i],
1949
			   ATTRIBUTE_FIXED_PART_SIZE);
1950
		has_not_null |= attrs[i].attnotnull;
1951
		/* make sure attcacheoff is valid */
1952
		TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
1953 1954
	}

1955
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1956
	TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
1957

1958 1959 1960 1961 1962 1963 1964 1965 1966
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1967
	/*
1968
	 * initialize relation id from info in att array (my, this is ugly)
1969
	 */
1970
	RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982

	/*
	 * All relations made with formrdesc are mapped.  This is necessarily so
	 * because there is no other way to know what filenode they currently
	 * have.  In bootstrap mode, add them to the initial relation mapper data,
	 * specifying that the initial filenode is the same as the OID.
	 */
	relation->rd_rel->relfilenode = InvalidOid;
	if (IsBootstrapProcessingMode())
		RelationMapUpdateMap(RelationGetRelid(relation),
							 RelationGetRelid(relation),
							 isshared, true);
1983

1984
	/*
1985
	 * initialize the relation lock manager information
1986
	 */
Tom Lane's avatar
Tom Lane committed
1987
	RelationInitLockInfo(relation); /* see lmgr.c */
1988

1989 1990 1991 1992
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1993

1994 1995 1996 1997 1998 1999
	/*
	 * initialize the table am handler
	 */
	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
	relation->rd_tableam = GetHeapamTableAmRoutine();

2000
	/*
2001
	 * initialize the rel-has-index flag, using hardwired knowledge
2002
	 */
2003 2004 2005 2006 2007 2008
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
2009
	{
2010 2011
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
2012 2013
	}

2014
	/*
2015
	 * add new reldesc to relcache
2016
	 */
2017
	RelationCacheInsert(relation, false);
2018 2019 2020

	/* It's fully valid */
	relation->rd_isvalid = true;
2021 2022 2023 2024
}


/* ----------------------------------------------------------------
2025
 *				 Relation Descriptor Lookup Interface
2026 2027 2028
 * ----------------------------------------------------------------
 */

2029
/*
2030
 *		RelationIdGetRelation
2031
 *
2032
 *		Lookup a reldesc by OID; make one if not already in cache.
2033
 *
2034 2035 2036
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2037
 *
2038 2039 2040 2041
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2042 2043
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2044 2045
 */
Relation
2046
RelationIdGetRelation(Oid relationId)
2047
{
2048
	Relation	rd;
2049

2050 2051 2052
	/* Make sure we're in an xact, even if this ends up being a cache hit */
	Assert(IsTransactionState());

2053 2054 2055
	/*
	 * first try to find reldesc in the cache
	 */
2056 2057 2058
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2059
	{
2060 2061 2062 2063 2064 2065 2066
		/* return NULL for dropped relations */
		if (rd->rd_droppedSubid != InvalidSubTransactionId)
		{
			Assert(!rd->rd_isvalid);
			return NULL;
		}

2067
		RelationIncrementReferenceCount(rd);
2068
		/* revalidate cache entry if necessary */
2069
		if (!rd->rd_isvalid)
2070 2071 2072 2073 2074 2075
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
Alvaro Herrera's avatar
Alvaro Herrera committed
2076 2077
			if (rd->rd_rel->relkind == RELKIND_INDEX ||
				rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2078 2079 2080
				RelationReloadIndexInfo(rd);
			else
				RelationClearRelation(rd, true);
2081 2082 2083 2084 2085 2086 2087 2088 2089 2090

			/*
			 * Normally entries need to be valid here, but before the relcache
			 * has been initialized, not enough infrastructure exists to
			 * perform pg_class lookups. The structure of such entries doesn't
			 * change, but we still want to update the rd_rel entry. So
			 * rd_isvalid = false is left in place for a later lookup.
			 */
			Assert(rd->rd_isvalid ||
				   (rd->rd_isnailed && !criticalRelcachesBuilt));
2091
		}
2092
		return rd;
2093
	}
2094

2095
	/*
2096 2097
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2098
	 */
2099
	rd = RelationBuildDesc(relationId, true);
2100 2101
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2102 2103 2104 2105
	return rd;
}

/* ----------------------------------------------------------------
2106
 *				cache invalidation support routines
2107 2108 2109
 * ----------------------------------------------------------------
 */

2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
	Assert(rel->rd_refcnt > 0);
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2140
/*
2141 2142
 * RelationClose - close an open relation
 *
2143 2144 2145 2146 2147 2148 2149
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2150 2151 2152 2153
 */
void
RelationClose(Relation relation)
{
2154 2155
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2156

2157 2158 2159 2160 2161
	/*
	 * If the relation is no longer open in this session, we can clean up any
	 * stale partition descriptors it has.  This is unlikely, so check to see
	 * if there are child contexts before expending a call to mcxt.c.
	 */
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
	if (RelationHasReferenceCountZero(relation))
	{
		if (relation->rd_pdcxt != NULL &&
			relation->rd_pdcxt->firstchild != NULL)
			MemoryContextDeleteChildren(relation->rd_pdcxt);

		if (relation->rd_pddcxt != NULL &&
			relation->rd_pddcxt->firstchild != NULL)
			MemoryContextDeleteChildren(relation->rd_pddcxt);
	}
2172

2173
#ifdef RELCACHE_FORCE_RELEASE
2174
	if (RelationHasReferenceCountZero(relation) &&
2175
		relation->rd_createSubid == InvalidSubTransactionId &&
2176
		relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
2177 2178
		RelationClearRelation(relation, false);
#endif
2179 2180
}

2181
/*
2182
 * RelationReloadIndexInfo - reload minimal information for an open index
2183
 *
2184 2185 2186 2187 2188 2189 2190
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2191
 *
2192
 *	We can't necessarily reread the catalog rows right away; we might be
2193 2194
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2195
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2196
 *	is next needed.
2197 2198 2199 2200
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2201 2202 2203 2204 2205 2206
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2207 2208
 */
static void
2209
RelationReloadIndexInfo(Relation relation)
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2210
{
2211
	bool		indexOK;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2212
	HeapTuple	pg_class_tuple;
2213
	Form_pg_class relp;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2214

2215
	/* Should be called only for invalidated, live indexes */
Alvaro Herrera's avatar
Alvaro Herrera committed
2216 2217
	Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
			relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2218 2219
		   !relation->rd_isvalid &&
		   relation->rd_droppedSubid == InvalidSubTransactionId);
2220 2221 2222

	/* Ensure it's closed at smgr level */
	RelationCloseSmgr(relation);
2223

2224
	/* Must free any AM cached data upon relcache flush */
2225 2226 2227 2228 2229
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;

	/*
Bruce Momjian's avatar
Bruce Momjian committed
2230 2231 2232 2233 2234
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
2235 2236 2237 2238 2239 2240 2241
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2242
	/*
2243 2244
	 * Read the pg_class row
	 *
2245 2246
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2247
	 */
2248
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
Robert Haas's avatar
Robert Haas committed
2249
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2250
	if (!HeapTupleIsValid(pg_class_tuple))
2251
		elog(ERROR, "could not find pg_class tuple for index %u",
2252
			 RelationGetRelid(relation));
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2253
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2254
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2255
	/* Reload reloptions in case they changed */
2256 2257
	if (relation->rd_options)
		pfree(relation->rd_options);
2258 2259
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2260
	heap_freetuple(pg_class_tuple);
2261 2262
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2263

2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

2277
		tuple = SearchSysCache1(INDEXRELID,
Bruce Momjian's avatar
Bruce Momjian committed
2278
								ObjectIdGetDatum(RelationGetRelid(relation)));
2279
		if (!HeapTupleIsValid(tuple))
Bruce Momjian's avatar
Bruce Momjian committed
2280 2281
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2282 2283
		index = (Form_pg_index) GETSTRUCT(tuple);

2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisexclusion = index->indisexclusion;
		relation->rd_index->indimmediate = index->indimmediate;
		relation->rd_index->indisclustered = index->indisclustered;
2295
		relation->rd_index->indisvalid = index->indisvalid;
2296 2297
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2298 2299 2300
		relation->rd_index->indislive = index->indislive;

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2301 2302
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2303 2304 2305 2306

		ReleaseSysCache(tuple);
	}

2307
	/* Okay, now it's valid again */
2308
	relation->rd_isvalid = true;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2309
}
2310

2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
/*
 * RelationReloadNailed - reload minimal information for nailed relations.
 *
 * The structure of a nailed relation can never change (which is good, because
 * we rely on knowing their structure to be able to read catalog content). But
 * some parts, e.g. pg_class.relfrozenxid, are still important to have
 * accurate content for. Therefore those need to be reloaded after the arrival
 * of invalidations.
 */
static void
RelationReloadNailed(Relation relation)
{
	Assert(relation->rd_isnailed);

	/*
	 * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
	 * mapping changed.
	 */
	RelationInitPhysicalAddr(relation);

	/* flag as needing to be revalidated */
	relation->rd_isvalid = false;

	/*
	 * Can only reread catalog contents if in a transaction.  If the relation
	 * is currently open (not counting the nailed refcount), do so
	 * immediately. Otherwise we've already marked the entry as possibly
	 * invalid, and it'll be fixed when next opened.
	 */
	if (!IsTransactionState() || relation->rd_refcnt <= 1)
		return;

	if (relation->rd_rel->relkind == RELKIND_INDEX)
	{
		/*
		 * If it's a nailed-but-not-mapped index, then we need to re-read the
		 * pg_class row to see if its relfilenode changed.
		 */
		RelationReloadIndexInfo(relation);
	}
	else
	{
		/*
		 * Reload a non-index entry.  We can't easily do so if relcaches
		 * aren't yet built, but that's fine because at that stage the
		 * attributes that need to be current (like relfrozenxid) aren't yet
		 * accessed.  To ensure the entry will later be revalidated, we leave
		 * it in invalid state, but allow use (cf. RelationIdGetRelation()).
		 */
		if (criticalRelcachesBuilt)
		{
			HeapTuple	pg_class_tuple;
			Form_pg_class relp;

			/*
			 * NB: Mark the entry as valid before starting to scan, to avoid
			 * self-recursion when re-building pg_class.
			 */
			relation->rd_isvalid = true;

			pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
											true, false);
			relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
			memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
			heap_freetuple(pg_class_tuple);

			/*
			 * Again mark as valid, to protect against concurrently arriving
			 * invalidations.
			 */
			relation->rd_isvalid = true;
		}
	}
}

2386 2387 2388 2389 2390 2391 2392
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
2393
RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
Bruce Momjian's avatar
Bruce Momjian committed
2405 2406
	 * Free all the subsidiary data structures of the relcache entry, then the
	 * entry itself.
2407 2408 2409 2410 2411 2412
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
2413 2414 2415
	{
		/*
		 * If we Rebuilt a relcache entry during a transaction then its
Bruce Momjian's avatar
Bruce Momjian committed
2416 2417 2418 2419 2420
		 * possible we did that because the TupDesc changed as the result of
		 * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
		 * possible someone copied that TupDesc, in which case the copy would
		 * point to free'd memory. So if we rebuild an entry we keep the
		 * TupDesc around until end of transaction, to be safe.
2421 2422 2423 2424 2425 2426
		 */
		if (remember_tupdesc)
			RememberToFreeTupleDescAtEOX(relation->rd_att);
		else
			FreeTupleDesc(relation->rd_att);
	}
2427 2428
	FreeTriggerDesc(relation->trigdesc);
	list_free_deep(relation->rd_fkeylist);
2429
	list_free(relation->rd_indexlist);
2430
	list_free(relation->rd_statlist);
2431
	bms_free(relation->rd_indexattr);
2432
	bms_free(relation->rd_keyattr);
Peter Eisentraut's avatar
Peter Eisentraut committed
2433
	bms_free(relation->rd_pkattr);
2434
	bms_free(relation->rd_idattr);
Peter Eisentraut's avatar
Peter Eisentraut committed
2435 2436
	if (relation->rd_pubactions)
		pfree(relation->rd_pubactions);
2437 2438 2439 2440
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
2441 2442 2443 2444
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	if (relation->rd_fdwroutine)
		pfree(relation->rd_fdwroutine);
2445 2446 2447 2448
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
2449 2450
	if (relation->rd_rsdesc)
		MemoryContextDelete(relation->rd_rsdesc->rscxt);
2451 2452 2453 2454
	if (relation->rd_partkeycxt)
		MemoryContextDelete(relation->rd_partkeycxt);
	if (relation->rd_pdcxt)
		MemoryContextDelete(relation->rd_pdcxt);
2455 2456
	if (relation->rd_pddcxt)
		MemoryContextDelete(relation->rd_pddcxt);
2457 2458
	if (relation->rd_partcheckcxt)
		MemoryContextDelete(relation->rd_partcheckcxt);
2459 2460 2461
	pfree(relation);
}

2462
/*
2463
 * RelationClearRelation
2464
 *
2465 2466
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2467 2468
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2469
 *
2470 2471
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
Bruce Momjian's avatar
Bruce Momjian committed
2472
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2473
 *	 a sinval reset could happen while we're accessing the catalogs, and
2474 2475 2476 2477 2478 2479
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
 *
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2480
 */
2481
static void
2482
RelationClearRelation(Relation relation, bool rebuild)
2483
{
2484
	/*
2485
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2486 2487 2488
	 * course it would be an equally bad idea to blow away one with nonzero
	 * refcnt, since that would leave someone somewhere with a dangling
	 * pointer.  All callers are expected to have verified that this holds.
2489 2490 2491 2492 2493
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));

2494
	/*
2495
	 * Make sure smgr and lower levels close the relation's files, if they
2496
	 * weren't closed already.  If the relation is not getting deleted, the
Bruce Momjian's avatar
Bruce Momjian committed
2497
	 * next smgr access should reopen the files automatically.  This ensures
2498 2499
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2500
	 */
2501
	RelationCloseSmgr(relation);
2502

2503 2504 2505 2506 2507
	/* Free AM cached data, if any */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;

2508
	/*
2509 2510
	 * Treat nailed-in system relations separately, they always need to be
	 * accessible, so we can't blow them away.
2511 2512
	 */
	if (relation->rd_isnailed)
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2513
	{
2514
		RelationReloadNailed(relation);
2515
		return;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
2516
	}
2517

2518 2519 2520 2521 2522 2523 2524
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;

	/* See RelationForgetRelation(). */
	if (relation->rd_droppedSubid != InvalidSubTransactionId)
		return;

2525 2526 2527 2528
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
Bruce Momjian's avatar
Bruce Momjian committed
2529
	 * re-read the pg_class row to handle possible physical relocation of the
2530
	 * index, and we check for pg_index updates too.
2531
	 */
Alvaro Herrera's avatar
Alvaro Herrera committed
2532 2533
	if ((relation->rd_rel->relkind == RELKIND_INDEX ||
		 relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2534 2535 2536
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
2537 2538
		if (IsTransactionState())
			RelationReloadIndexInfo(relation);
2539 2540 2541
		return;
	}

2542
	/*
2543
	 * If we're really done with the relcache entry, blow it away. But if
2544 2545 2546
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2547
	 */
2548
	if (!rebuild)
2549
	{
2550 2551 2552 2553
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
2554
		RelationDestroyRelation(relation, false);
2555
	}
2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
	else if (!IsTransactionState())
	{
		/*
		 * If we're not inside a valid transaction, we can't do any catalog
		 * access so it's not possible to rebuild yet.  Just exit, leaving
		 * rd_isvalid = false so that the rebuild will occur when the entry is
		 * next opened.
		 *
		 * Note: it's possible that we come here during subtransaction abort,
		 * and the reason for wanting to rebuild is that the rel is open in
		 * the outer transaction.  In that case it might seem unsafe to not
		 * rebuild immediately, since whatever code has the rel already open
		 * will keep on using the relcache entry as-is.  However, in such a
		 * case the outer transaction should be holding a lock that's
		 * sufficient to prevent any significant change in the rel's schema,
		 * so the existing entry contents should be good enough for its
		 * purposes; at worst we might be behind on statistics updates or the
		 * like.  (See also CheckTableNotInUse() and its callers.)	These same
		 * remarks also apply to the cases above where we exit without having
		 * done RelationReloadIndexInfo() yet.
		 */
		return;
	}
2579 2580
	else
	{
2581
		/*
Bruce Momjian's avatar
Bruce Momjian committed
2582 2583 2584 2585 2586
		 * Our strategy for rebuilding an open relcache entry is to build a
		 * new entry from scratch, swap its contents with the old entry, and
		 * finally delete the new entry (along with any infrastructure swapped
		 * over from the old entry).  This is to avoid trouble in case an
		 * error causes us to lose control partway through.  The old entry
2587
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
Bruce Momjian's avatar
Bruce Momjian committed
2588
		 * on next access.  Meanwhile it's not any less valid than it was
2589 2590
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
2591
		 * subtransaction that ALTERs a table and then gets canceled partway
2592 2593
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
Bruce Momjian's avatar
Bruce Momjian committed
2594 2595 2596
		 * consequence of an error is leaking the necessarily-unreferenced new
		 * entry, and this shouldn't happen often enough for that to be a big
		 * problem.
2597
		 *
2598
		 * When rebuilding an open relcache entry, we must preserve ref count,
2599 2600 2601 2602 2603 2604 2605
		 * rd_*Subid, and rd_toastoid state.  Also attempt to preserve the
		 * pg_class entry (rd_rel), tupledesc, rewrite-rule, partition key,
		 * and partition descriptor substructures in place, because various
		 * places assume that these structures won't move while they are
		 * working with an open relcache entry.  (Note:  the refcount
		 * mechanism for tupledescs might someday allow us to remove this hack
		 * for the tupledesc.)
2606
		 *
2607 2608
		 * Note that this process does not touch CurrentResourceOwner; which
		 * is good because whatever ref counts the entry may have do not
Bruce Momjian's avatar
Bruce Momjian committed
2609
		 * necessarily belong to that resource owner.
2610
		 */
2611
		Relation	newrel;
2612
		Oid			save_relid = RelationGetRelid(relation);
2613 2614
		bool		keep_tupdesc;
		bool		keep_rules;
2615
		bool		keep_policies;
2616
		bool		keep_partkey;
2617 2618 2619

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
2620 2621 2622 2623 2624 2625 2626 2627

		/*
		 * Between here and the end of the swap, don't add code that does or
		 * reasonably could read system catalogs.  That range must be free
		 * from invalidation processing.  See RelationBuildDesc() manipulation
		 * of in_progress_list.
		 */

2628
		if (newrel == NULL)
2629
		{
2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641
			/*
			 * We can validly get here, if we're using a historic snapshot in
			 * which a relation, accessed from outside logical decoding, is
			 * still invisible. In that case it's fine to just mark the
			 * relation as invalid and return - it'll fully get reloaded by
			 * the cache reset at the end of logical decoding (or at the next
			 * access).  During normal processing we don't want to ignore this
			 * case as it shouldn't happen there, as explained below.
			 */
			if (HistoricSnapshotActive())
				return;

2642 2643
			/*
			 * This shouldn't happen as dropping a relation is intended to be
Peter Eisentraut's avatar
Peter Eisentraut committed
2644
			 * impossible if still referenced (cf. CheckTableNotInUse()). But
2645 2646 2647 2648
			 * if we get here anyway, we can't just delete the relcache entry,
			 * as it possibly could get accessed later (as e.g. the error
			 * might get trapped and handled via a subtransaction rollback).
			 */
2649
			elog(ERROR, "relation %u deleted while still in use", save_relid);
2650
		}
2651

2652 2653
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2654
		keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2655
		/* partkey is immutable once set up, so we can always keep it */
2656
		keep_partkey = (relation->rd_partkey != NULL);
2657 2658 2659

		/*
		 * Perform swapping of the relcache entry contents.  Within this
Bruce Momjian's avatar
Bruce Momjian committed
2660 2661 2662
		 * process the old entry is momentarily invalid, so there *must* be no
		 * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
		 * all-in-line code for safety.
2663
		 *
Bruce Momjian's avatar
Bruce Momjian committed
2664 2665 2666
		 * Since the vast majority of fields should be swapped, our method is
		 * to swap the whole structures and then re-swap those few fields we
		 * didn't want swapped.
2667 2668 2669 2670 2671 2672 2673 2674 2675
		 */
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
2676
		{
2677 2678 2679 2680 2681
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2682
		}
2683 2684 2685 2686 2687 2688 2689 2690 2691 2692

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2693 2694
		SWAPFIELD(SubTransactionId, rd_firstRelfilenodeSubid);
		SWAPFIELD(SubTransactionId, rd_droppedSubid);
2695 2696 2697 2698
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2699
		/* preserve old tupledesc, rules, policies if no logical change */
2700 2701 2702
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
2703
		{
2704 2705
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
2706
		}
2707
		if (keep_policies)
2708
			SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2709 2710
		/* toast OID override must be preserved */
		SWAPFIELD(Oid, rd_toastoid);
2711 2712
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2713
		/* preserve old partition key if we have one */
2714 2715 2716 2717 2718
		if (keep_partkey)
		{
			SWAPFIELD(PartitionKey, rd_partkey);
			SWAPFIELD(MemoryContext, rd_partkeycxt);
		}
2719
		if (newrel->rd_pdcxt != NULL || newrel->rd_pddcxt != NULL)
2720 2721 2722
		{
			/*
			 * We are rebuilding a partitioned relation with a non-zero
2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740
			 * reference count, so we must keep the old partition descriptor
			 * around, in case there's a PartitionDirectory with a pointer to
			 * it.  This means we can't free the old rd_pdcxt yet.  (This is
			 * necessary because RelationGetPartitionDesc hands out direct
			 * pointers to the relcache's data structure, unlike our usual
			 * practice which is to hand out copies.  We'd have the same
			 * problem with rd_partkey, except that we always preserve that
			 * once created.)
			 *
			 * To ensure that it's not leaked completely, re-attach it to the
			 * new reldesc, or make it a child of the new reldesc's rd_pdcxt
			 * in the unlikely event that there is one already.  (Compare hack
			 * in RelationBuildPartitionDesc.)  RelationClose will clean up
			 * any such contexts once the reference count reaches zero.
			 *
			 * In the case where the reference count is zero, this code is not
			 * reached, which should be OK because in that case there should
			 * be no PartitionDirectory with a pointer to the old entry.
2741
			 *
Tom Lane's avatar
Tom Lane committed
2742 2743 2744
			 * Note that newrel and relation have already been swapped, so the
			 * "old" partition descriptor is actually the one hanging off of
			 * newrel.
2745
			 */
2746
			relation->rd_partdesc = NULL;	/* ensure rd_partdesc is invalid */
2747 2748
			relation->rd_partdesc_nodetached = NULL;
			relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
2749 2750 2751 2752
			if (relation->rd_pdcxt != NULL) /* probably never happens */
				MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
			else
				relation->rd_pdcxt = newrel->rd_pdcxt;
2753 2754 2755 2756
			if (relation->rd_pddcxt != NULL)
				MemoryContextSetParent(newrel->rd_pddcxt, relation->rd_pddcxt);
			else
				relation->rd_pddcxt = newrel->rd_pddcxt;
2757
			/* drop newrel's pointers so we don't destroy it below */
2758
			newrel->rd_partdesc = NULL;
2759 2760
			newrel->rd_partdesc_nodetached = NULL;
			newrel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
2761
			newrel->rd_pdcxt = NULL;
2762
			newrel->rd_pddcxt = NULL;
2763
		}
2764 2765 2766 2767

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
2768
		RelationDestroyRelation(newrel, !keep_tupdesc);
2769
	}
2770 2771
}

2772
/*
2773 2774 2775
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
2776
 *	 This is used when we receive a cache invalidation event for the rel.
2777 2778
 */
static void
2779
RelationFlushRelation(Relation relation)
2780
{
2781
	if (relation->rd_createSubid != InvalidSubTransactionId ||
2782
		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2783 2784
	{
		/*
2785
		 * New relcache entries are always rebuilt, not flushed; else we'd
2786 2787
		 * forget the "new" status of the relation.  Ditto for the
		 * new-relfilenode status.
2788
		 *
2789 2790 2791
		 * The rel could have zero refcnt here, so temporarily increment the
		 * refcnt to ensure it's safe to rebuild it.  We can assume that the
		 * current transaction has some lock on the rel already.
2792
		 */
2793 2794 2795
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2796 2797 2798 2799
	}
	else
	{
		/*
2800
		 * Pre-existing rels can be dropped from the relcache if not open.
2801
		 */
2802
		bool		rebuild = !RelationHasReferenceCountZero(relation);
2803

2804 2805
		RelationClearRelation(relation, rebuild);
	}
2806 2807
}

2808
/*
2809
 * RelationForgetRelation - caller reports that it dropped the relation
2810 2811
 */
void
2812
RelationForgetRelation(Oid rid)
2813
{
2814
	Relation	relation;
2815 2816 2817

	RelationIdCacheLookup(rid, relation);

2818 2819 2820 2821
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2822
		elog(ERROR, "relation %u is still open", rid);
2823

2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836
	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
	{
		/*
		 * In the event of subtransaction rollback, we must not forget
		 * rd_*Subid.  Mark the entry "dropped" so RelationClearRelation()
		 * invalidates it in lieu of destroying it.  (If we're in a top
		 * transaction, we could opt to destroy the entry.)
		 */
		relation->rd_droppedSubid = GetCurrentSubTransactionId();
	}

2837
	RelationClearRelation(relation, false);
2838 2839
}

2840
/*
2841
 *		RelationCacheInvalidateEntry
2842 2843 2844
 *
 *		This routine is invoked for SI cache flush messages.
 *
2845 2846
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2847
 * relation.)
2848 2849 2850 2851 2852 2853
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2854 2855
 */
void
2856
RelationCacheInvalidateEntry(Oid relationId)
2857
{
2858
	Relation	relation;
2859 2860 2861

	RelationIdCacheLookup(relationId, relation);

2862
	if (PointerIsValid(relation))
2863
	{
2864
		relcacheInvalsReceived++;
2865
		RelationFlushRelation(relation);
2866
	}
2867 2868 2869 2870 2871 2872 2873 2874
	else
	{
		int			i;

		for (i = 0; i < in_progress_list_len; i++)
			if (in_progress_list[i].reloid == relationId)
				in_progress_list[i].invalidated = true;
	}
2875 2876 2877 2878
}

/*
 * RelationCacheInvalidate
2879
 *	 Blow away cached relation descriptors that have zero reference counts,
Bruce Momjian's avatar
Bruce Momjian committed
2880
 *	 and rebuild those with positive reference counts.  Also reset the smgr
2881
 *	 relation cache and re-read relation mapping data.
2882
 *
2883 2884 2885 2886 2887
 *	 Apart from debug_discard_caches, this is currently used only to recover
 *	 from SI message buffer overflow, so we do not touch relations having
 *	 new-in-transaction relfilenodes; they cannot be targets of cross-backend
 *	 SI updates (and our own updates now go through a separate linked list
 *	 that isn't limited by the SI message buffer size).
2888 2889 2890
 *
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2891
 *	 safety, because hash_seq_search only copes with concurrent deletion of
Bruce Momjian's avatar
Bruce Momjian committed
2892
 *	 the element it is currently visiting.  If a second SI overflow were to
2893 2894 2895 2896
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2897
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2898
 *	 only hold onto pointers to nondeletable entries.
2899
 *
2900 2901 2902 2903 2904
 *	 The two-phase approach also makes it easy to update relfilenodes for
 *	 mapped relations before we do anything else, and to ensure that the
 *	 second pass processes nailed-in-cache items before other nondeletable
 *	 items.  This should ensure that system catalogs are up to date before
 *	 we attempt to use them to reload information about other open relations.
2905 2906 2907 2908 2909
 *
 *	 After those two phases of work having immediate effects, we normally
 *	 signal any RelationBuildDesc() on the stack to start over.  However, we
 *	 don't do this if called as part of debug_discard_caches.  Otherwise,
 *	 RelationBuildDesc() would become an infinite loop.
2910 2911
 */
void
2912
RelationCacheInvalidate(bool debug_discard)
2913
{
2914
	HASH_SEQ_STATUS status;
2915
	RelIdCacheEnt *idhentry;
2916
	Relation	relation;
2917
	List	   *rebuildFirstList = NIL;
2918
	List	   *rebuildList = NIL;
2919
	ListCell   *l;
2920
	int			i;
2921

2922 2923 2924 2925 2926
	/*
	 * Reload relation mapping data before starting to reconstruct cache.
	 */
	RelationMapInvalidateAll();

2927
	/* Phase 1 */
2928
	hash_seq_init(&status, RelationIdCache);
2929

2930
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2931
	{
2932
		relation = idhentry->reldesc;
2933

2934
		/* Must close all smgr references to avoid leaving dangling ptrs */
2935
		RelationCloseSmgr(relation);
2936

2937 2938 2939 2940 2941 2942 2943
		/*
		 * Ignore new relations; no other backend will manipulate them before
		 * we commit.  Likewise, before replacing a relation's relfilenode, we
		 * shall have acquired AccessExclusiveLock and drained any applicable
		 * pending invalidations.
		 */
		if (relation->rd_createSubid != InvalidSubTransactionId ||
2944
			relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2945
			continue;
2946

2947 2948
		relcacheInvalsReceived++;

2949
		if (RelationHasReferenceCountZero(relation))
2950 2951
		{
			/* Delete this entry immediately */
2952
			Assert(!relation->rd_isnailed);
2953 2954 2955 2956
			RelationClearRelation(relation, false);
		}
		else
		{
2957 2958 2959 2960 2961 2962 2963 2964 2965 2966
			/*
			 * If it's a mapped relation, immediately update its rd_node in
			 * case its relfilenode changed.  We must do this during phase 1
			 * in case the relation is consulted during rebuild of other
			 * relcache entries in phase 2.  It's safe since consulting the
			 * map doesn't involve any access to relcache entries.
			 */
			if (RelationIsMapped(relation))
				RelationInitPhysicalAddr(relation);

2967 2968
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
2969 2970 2971 2972 2973 2974
			 * pg_class goes to the front of rebuildFirstList while
			 * pg_class_oid_index goes to the back of rebuildFirstList, so
			 * they are done first and second respectively.  Other nailed
			 * relations go to the front of rebuildList, so they'll be done
			 * next in no particular order; and everything else goes to the
			 * back of rebuildList.
2975
			 */
2976 2977 2978 2979 2980
			if (RelationGetRelid(relation) == RelationRelationId)
				rebuildFirstList = lcons(relation, rebuildFirstList);
			else if (RelationGetRelid(relation) == ClassOidIndexId)
				rebuildFirstList = lappend(rebuildFirstList, relation);
			else if (relation->rd_isnailed)
2981
				rebuildList = lcons(relation, rebuildList);
2982 2983
			else
				rebuildList = lappend(rebuildList, relation);
2984
		}
2985
	}
2986

2987
	/*
2988 2989 2990
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2991 2992 2993
	 */
	smgrcloseall();

2994
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2995 2996 2997 2998 2999 3000
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
3001
	foreach(l, rebuildList)
3002
	{
3003 3004
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
3005
	}
3006
	list_free(rebuildList);
3007 3008 3009 3010 3011

	if (!debug_discard)
		/* Any RelationBuildDesc() on the stack must start over. */
		for (i = 0; i < in_progress_list_len; i++)
			in_progress_list[i].invalidated = true;
3012
}
3013

3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032
/*
 * RelationCloseSmgrByOid - close a relcache entry's smgr link
 *
 * Needed in some cases where we are changing a relation's physical mapping.
 * The link will be automatically reopened on next use.
 */
void
RelationCloseSmgrByOid(Oid relationId)
{
	Relation	relation;

	RelationIdCacheLookup(relationId, relation);

	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	RelationCloseSmgr(relation);
}

3033
static void
3034 3035 3036 3037
RememberToFreeTupleDescAtEOX(TupleDesc td)
{
	if (EOXactTupleDescArray == NULL)
	{
Bruce Momjian's avatar
Bruce Momjian committed
3038 3039
		MemoryContext oldcxt;

3040 3041 3042 3043 3044 3045 3046 3047 3048
		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

		EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
		EOXactTupleDescArrayLen = 16;
		NextEOXactTupleDescNum = 0;
		MemoryContextSwitchTo(oldcxt);
	}
	else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
	{
Bruce Momjian's avatar
Bruce Momjian committed
3049
		int32		newlen = EOXactTupleDescArrayLen * 2;
3050 3051 3052 3053

		Assert(EOXactTupleDescArrayLen > 0);

		EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
Tom Lane's avatar
Tom Lane committed
3054
													  newlen * sizeof(TupleDesc));
3055 3056 3057 3058 3059 3060
		EOXactTupleDescArrayLen = newlen;
	}

	EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
}

3061 3062 3063 3064 3065
#ifdef USE_ASSERT_CHECKING
static void
AssertPendingSyncConsistency(Relation relation)
{
	bool		relcache_verdict =
3066
	RelationIsPermanent(relation) &&
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138
	((relation->rd_createSubid != InvalidSubTransactionId &&
	  RELKIND_HAS_STORAGE(relation->rd_rel->relkind)) ||
	 relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId);

	Assert(relcache_verdict == RelFileNodeSkippingWAL(relation->rd_node));

	if (relation->rd_droppedSubid != InvalidSubTransactionId)
		Assert(!relation->rd_isvalid &&
			   (relation->rd_createSubid != InvalidSubTransactionId ||
				relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId));
}

/*
 * AssertPendingSyncs_RelationCache
 *
 *	Assert that relcache.c and storage.c agree on whether to skip WAL.
 */
void
AssertPendingSyncs_RelationCache(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
	Relation   *rels;
	int			maxrels;
	int			nrels;
	RelIdCacheEnt *idhentry;
	int			i;

	/*
	 * Open every relation that this transaction has locked.  If, for some
	 * relation, storage.c is skipping WAL and relcache.c is not skipping WAL,
	 * a CommandCounterIncrement() typically yields a local invalidation
	 * message that destroys the relcache entry.  By recreating such entries
	 * here, we detect the problem.
	 */
	PushActiveSnapshot(GetTransactionSnapshot());
	maxrels = 1;
	rels = palloc(maxrels * sizeof(*rels));
	nrels = 0;
	hash_seq_init(&status, GetLockMethodLocalHash());
	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		Oid			relid;
		Relation	r;

		if (locallock->nLocks <= 0)
			continue;
		if ((LockTagType) locallock->tag.lock.locktag_type !=
			LOCKTAG_RELATION)
			continue;
		relid = ObjectIdGetDatum(locallock->tag.lock.locktag_field2);
		r = RelationIdGetRelation(relid);
		if (!RelationIsValid(r))
			continue;
		if (nrels >= maxrels)
		{
			maxrels *= 2;
			rels = repalloc(rels, maxrels * sizeof(*rels));
		}
		rels[nrels++] = r;
	}

	hash_seq_init(&status, RelationIdCache);
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		AssertPendingSyncConsistency(idhentry->reldesc);

	for (i = 0; i < nrels; i++)
		RelationClose(rels[i]);
	PopActiveSnapshot();
}
#endif

3139
/*
3140
 * AtEOXact_RelationCache
3141
 *
3142
 *	Clean up the relcache at main-transaction commit or abort.
3143 3144 3145 3146 3147
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
3148 3149 3150 3151 3152 3153
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
3154 3155
 */
void
3156
AtEOXact_RelationCache(bool isCommit)
3157
{
3158
	HASH_SEQ_STATUS status;
3159
	RelIdCacheEnt *idhentry;
3160
	int			i;
3161

3162 3163 3164 3165 3166 3167 3168
	/*
	 * Forget in_progress_list.  This is relevant when we're aborting due to
	 * an error during RelationBuildDesc().
	 */
	Assert(in_progress_list_len == 0 || !isCommit);
	in_progress_list_len = 0;

3169
	/*
3170 3171 3172 3173 3174 3175
	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
	 * listed in it.  Otherwise fall back on a hash_seq_search scan.
	 *
	 * For simplicity, eoxact_list[] entries are not deleted till end of
	 * top-level transaction, even though we could remove them at
	 * subtransaction end in some cases, or remove relations from the list if
Bruce Momjian's avatar
Bruce Momjian committed
3176
	 * they are cleared for other reasons.  Therefore we should expect the
3177 3178 3179 3180
	 * case that list entries are not found in the hashtable; if not, there's
	 * nothing to do for them.
	 */
	if (eoxact_list_overflowed)
3181
	{
3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199
		hash_seq_init(&status, RelationIdCache);
		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		{
			AtEOXact_cleanup(idhentry->reldesc, isCommit);
		}
	}
	else
	{
		for (i = 0; i < eoxact_list_len; i++)
		{
			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
													 (void *) &eoxact_list[i],
													 HASH_FIND,
													 NULL);
			if (idhentry != NULL)
				AtEOXact_cleanup(idhentry->reldesc, isCommit);
		}
	}
3200

3201 3202 3203 3204 3205 3206 3207 3208 3209 3210
	if (EOXactTupleDescArrayLen > 0)
	{
		Assert(EOXactTupleDescArray != NULL);
		for (i = 0; i < NextEOXactTupleDescNum; i++)
			FreeTupleDesc(EOXactTupleDescArray[i]);
		pfree(EOXactTupleDescArray);
		EOXactTupleDescArray = NULL;
	}

	/* Now we're out of the transaction and can clear the lists */
3211 3212
	eoxact_list_len = 0;
	eoxact_list_overflowed = false;
3213 3214
	NextEOXactTupleDescNum = 0;
	EOXactTupleDescArrayLen = 0;
3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227
}

/*
 * AtEOXact_cleanup
 *
 *	Clean up a single rel at main-transaction commit or abort
 *
 * NB: this processing must be idempotent, because EOXactListAdd() doesn't
 * bother to prevent duplicate entries in eoxact_list[].
 */
static void
AtEOXact_cleanup(Relation relation, bool isCommit)
{
3228 3229
	bool		clear_relcache = false;

Bruce Momjian's avatar
Bruce Momjian committed
3230 3231 3232 3233 3234 3235 3236 3237 3238
	/*
	 * The relcache entry's ref count should be back to its normal
	 * not-in-a-transaction state: 0 unless it's nailed in cache.
	 *
	 * In bootstrap mode, this is NOT true, so don't check it --- the
	 * bootstrap code expects relations to stay open across start/commit
	 * transaction calls.  (That seems bogus, but it's not worth fixing.)
	 *
	 * Note: ideally this check would be applied to every relcache entry, not
Bruce Momjian's avatar
Bruce Momjian committed
3239
	 * just those that have eoxact work to do.  But it's not worth forcing a
Bruce Momjian's avatar
Bruce Momjian committed
3240 3241 3242 3243
	 * scan of the whole relcache just for this.  (Moreover, doing so would
	 * mean that assert-enabled testing never tests the hash_search code path
	 * above, which seems a bad idea.)
	 */
3244
#ifdef USE_ASSERT_CHECKING
Bruce Momjian's avatar
Bruce Momjian committed
3245 3246 3247
	if (!IsBootstrapProcessingMode())
	{
		int			expected_refcnt;
3248

Bruce Momjian's avatar
Bruce Momjian committed
3249 3250 3251
		expected_refcnt = relation->rd_isnailed ? 1 : 0;
		Assert(relation->rd_refcnt == expected_refcnt);
	}
3252
#endif
3253

Bruce Momjian's avatar
Bruce Momjian committed
3254
	/*
3255
	 * Is the relation live after this transaction ends?
Bruce Momjian's avatar
Bruce Momjian committed
3256
	 *
3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270
	 * During commit, clear the relcache entry if it is preserved after
	 * relation drop, in order not to orphan the entry.  During rollback,
	 * clear the relcache entry if the relation is created in the current
	 * transaction since it isn't interesting any longer once we are out of
	 * the transaction.
	 */
	clear_relcache =
		(isCommit ?
		 relation->rd_droppedSubid != InvalidSubTransactionId :
		 relation->rd_createSubid != InvalidSubTransactionId);

	/*
	 * Since we are now out of the transaction, reset the subids to zero. That
	 * also lets RelationClearRelation() drop the relcache entry.
Bruce Momjian's avatar
Bruce Momjian committed
3271
	 */
3272 3273 3274 3275 3276 3277
	relation->rd_createSubid = InvalidSubTransactionId;
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;

	if (clear_relcache)
Bruce Momjian's avatar
Bruce Momjian committed
3278
	{
3279
		if (RelationHasReferenceCountZero(relation))
3280
		{
Bruce Momjian's avatar
Bruce Momjian committed
3281 3282
			RelationClearRelation(relation, false);
			return;
3283
		}
3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296
		else
		{
			/*
			 * Hmm, somewhere there's a (leaked?) reference to the relation.
			 * We daren't remove the entry for fear of dereferencing a
			 * dangling pointer later.  Bleat, and mark it as not belonging to
			 * the current transaction.  Hopefully it'll get cleaned up
			 * eventually.  This must be just a WARNING to avoid
			 * error-during-error-recovery loops.
			 */
			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
				 RelationGetRelationName(relation));
		}
Bruce Momjian's avatar
Bruce Momjian committed
3297
	}
3298
}
3299

3300 3301 3302 3303 3304 3305 3306 3307
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
3308 3309
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
3310 3311 3312
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
3313
	int			i;
3314

3315 3316 3317 3318 3319 3320 3321 3322
	/*
	 * Forget in_progress_list.  This is relevant when we're aborting due to
	 * an error during RelationBuildDesc().  We don't commit subtransactions
	 * during RelationBuildDesc().
	 */
	Assert(in_progress_list_len == 0 || !isCommit);
	in_progress_list_len = 0;

3323
	/*
3324 3325 3326
	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
	 * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
	 * logic as in AtEOXact_RelationCache.
3327
	 */
3328
	if (eoxact_list_overflowed)
3329
	{
3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349
		hash_seq_init(&status, RelationIdCache);
		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		{
			AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
								mySubid, parentSubid);
		}
	}
	else
	{
		for (i = 0; i < eoxact_list_len; i++)
		{
			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
													 (void *) &eoxact_list[i],
													 HASH_FIND,
													 NULL);
			if (idhentry != NULL)
				AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
									mySubid, parentSubid);
		}
	}
3350

3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365
	/* Don't reset the list; we still need more cleanup later */
}

/*
 * AtEOSubXact_cleanup
 *
 *	Clean up a single rel at subtransaction commit or abort
 *
 * NB: this processing must be idempotent, because EOXactListAdd() doesn't
 * bother to prevent duplicate entries in eoxact_list[].
 */
static void
AtEOSubXact_cleanup(Relation relation, bool isCommit,
					SubTransactionId mySubid, SubTransactionId parentSubid)
{
Bruce Momjian's avatar
Bruce Momjian committed
3366 3367 3368
	/*
	 * Is it a relation created in the current subtransaction?
	 *
3369 3370 3371
	 * During subcommit, mark it as belonging to the parent, instead, as long
	 * as it has not been dropped. Otherwise simply delete the relcache entry.
	 * --- it isn't interesting any longer.
Bruce Momjian's avatar
Bruce Momjian committed
3372 3373 3374
	 */
	if (relation->rd_createSubid == mySubid)
	{
3375 3376 3377 3378 3379 3380 3381 3382
		/*
		 * Valid rd_droppedSubid means the corresponding relation is dropped
		 * but the relcache entry is preserved for at-commit pending sync. We
		 * need to drop it explicitly here not to make the entry orphan.
		 */
		Assert(relation->rd_droppedSubid == mySubid ||
			   relation->rd_droppedSubid == InvalidSubTransactionId);
		if (isCommit && relation->rd_droppedSubid == InvalidSubTransactionId)
Bruce Momjian's avatar
Bruce Momjian committed
3383
			relation->rd_createSubid = parentSubid;
3384
		else if (RelationHasReferenceCountZero(relation))
3385
		{
3386 3387 3388 3389 3390
			/* allow the entry to be removed */
			relation->rd_createSubid = InvalidSubTransactionId;
			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
			relation->rd_droppedSubid = InvalidSubTransactionId;
Bruce Momjian's avatar
Bruce Momjian committed
3391 3392
			RelationClearRelation(relation, false);
			return;
3393
		}
3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406
		else
		{
			/*
			 * Hmm, somewhere there's a (leaked?) reference to the relation.
			 * We daren't remove the entry for fear of dereferencing a
			 * dangling pointer later.  Bleat, and transfer it to the parent
			 * subtransaction so we can try again later.  This must be just a
			 * WARNING to avoid error-during-error-recovery loops.
			 */
			relation->rd_createSubid = parentSubid;
			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
				 RelationGetRelationName(relation));
		}
Bruce Momjian's avatar
Bruce Momjian committed
3407
	}
3408

Bruce Momjian's avatar
Bruce Momjian committed
3409
	/*
3410 3411
	 * Likewise, update or drop any new-relfilenode-in-subtransaction record
	 * or drop record.
Bruce Momjian's avatar
Bruce Momjian committed
3412 3413 3414 3415 3416 3417 3418 3419
	 */
	if (relation->rd_newRelfilenodeSubid == mySubid)
	{
		if (isCommit)
			relation->rd_newRelfilenodeSubid = parentSubid;
		else
			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
	}
3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435

	if (relation->rd_firstRelfilenodeSubid == mySubid)
	{
		if (isCommit)
			relation->rd_firstRelfilenodeSubid = parentSubid;
		else
			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	}

	if (relation->rd_droppedSubid == mySubid)
	{
		if (isCommit)
			relation->rd_droppedSubid = parentSubid;
		else
			relation->rd_droppedSubid = InvalidSubTransactionId;
	}
3436 3437
}

3438

3439
/*
3440 3441 3442
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
3443
 */
3444 3445
Relation
RelationBuildLocalRelation(const char *relname,
3446
						   Oid relnamespace,
3447
						   TupleDesc tupDesc,
3448
						   Oid relid,
3449
						   Oid accessmtd,
3450
						   Oid relfilenode,
3451
						   Oid reltablespace,
3452
						   bool shared_relation,
3453
						   bool mapped_relation,
Robert Haas's avatar
Robert Haas committed
3454 3455
						   char relpersistence,
						   char relkind)
3456
{
3457
	Relation	rel;
3458
	MemoryContext oldcxt;
3459 3460
	int			natts = tupDesc->natts;
	int			i;
3461
	bool		has_not_null;
3462
	bool		nailit;
3463

3464
	AssertArg(natts >= 0);
3465

3466 3467 3468
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
3469 3470
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
3471 3472 3473
	 */
	switch (relid)
	{
3474
		case DatabaseRelationId:
3475 3476
		case AuthIdRelationId:
		case AuthMemRelationId:
3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

3488 3489
	/*
	 * check that hardwired list of shared rels matches what's in the
Bruce Momjian's avatar
Bruce Momjian committed
3490 3491 3492
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
3493 3494 3495 3496 3497
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

3498 3499 3500
	/* Shared relations had better be mapped, too */
	Assert(mapped_relation || !shared_relation);

3501 3502 3503 3504 3505
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3506

3507 3508
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3509
	/*
3510
	 * allocate a new relation descriptor and fill in basic state fields.
3511
	 */
3512
	rel = (Relation) palloc0(sizeof(RelationData));
3513

3514
	/* make sure relation is marked as having no open file yet */
3515
	rel->rd_smgr = NULL;
3516

3517 3518 3519
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3520
	rel->rd_refcnt = nailit ? 1 : 0;
3521

3522
	/* it's being created in this transaction */
3523
	rel->rd_createSubid = GetCurrentSubTransactionId();
3524
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3525 3526
	rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
	rel->rd_droppedSubid = InvalidSubTransactionId;
3527

3528
	/*
3529
	 * create a new tuple descriptor from the one passed in.  We do this
3530 3531 3532 3533
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3534
	 */
3535
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3536
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3537
	has_not_null = false;
3538
	for (i = 0; i < natts; i++)
3539
	{
3540 3541 3542 3543
		Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
		Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);

		datt->attidentity = satt->attidentity;
Peter Eisentraut's avatar
Peter Eisentraut committed
3544
		datt->attgenerated = satt->attgenerated;
3545 3546
		datt->attnotnull = satt->attnotnull;
		has_not_null |= satt->attnotnull;
3547 3548 3549 3550 3551 3552 3553 3554 3555
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3556 3557 3558 3559

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3560
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3561

3562 3563
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3564

Robert Haas's avatar
Robert Haas committed
3565
	rel->rd_rel->relkind = relkind;
3566 3567
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3568 3569
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3570

3571
	/* set up persistence and relcache fields dependent on it */
3572 3573 3574
	rel->rd_rel->relpersistence = relpersistence;
	switch (relpersistence)
	{
Robert Haas's avatar
Robert Haas committed
3575
		case RELPERSISTENCE_UNLOGGED:
3576 3577
		case RELPERSISTENCE_PERMANENT:
			rel->rd_backend = InvalidBackendId;
3578
			rel->rd_islocaltemp = false;
3579 3580
			break;
		case RELPERSISTENCE_TEMP:
3581
			Assert(isTempOrTempToastNamespace(relnamespace));
3582
			rel->rd_backend = BackendIdForTempRelations();
3583
			rel->rd_islocaltemp = true;
3584 3585 3586 3587 3588 3589
			break;
		default:
			elog(ERROR, "invalid relpersistence: %c", relpersistence);
			break;
	}

3590 3591 3592 3593 3594 3595
	/* if it's a materialized view, it's not populated initially */
	if (relkind == RELKIND_MATVIEW)
		rel->rd_rel->relispopulated = false;
	else
		rel->rd_rel->relispopulated = true;

3596 3597
	/* set replica identity -- system catalogs and non-tables don't have one */
	if (!IsCatalogNamespace(relnamespace) &&
3598 3599 3600
		(relkind == RELKIND_RELATION ||
		 relkind == RELKIND_MATVIEW ||
		 relkind == RELKIND_PARTITIONED_TABLE))
3601 3602 3603 3604
		rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
	else
		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;

3605
	/*
3606
	 * Insert relation physical and logical identifiers (OIDs) into the right
Bruce Momjian's avatar
Bruce Momjian committed
3607
	 * places.  For a mapped relation, we set relfilenode to zero and rely on
3608
	 * RelationInitPhysicalAddr to consult the map.
3609
	 */
3610
	rel->rd_rel->relisshared = shared_relation;
3611

3612 3613 3614
	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
3615
		TupleDescAttr(rel->rd_att, i)->attrelid = relid;
3616

3617
	rel->rd_rel->reltablespace = reltablespace;
3618

3619 3620 3621 3622
	if (mapped_relation)
	{
		rel->rd_rel->relfilenode = InvalidOid;
		/* Add it to the active mapping information */
3623
		RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3624 3625
	}
	else
3626
		rel->rd_rel->relfilenode = relfilenode;
3627

3628
	RelationInitLockInfo(rel);	/* see lmgr.c */
3629

3630 3631
	RelationInitPhysicalAddr(rel);

3632 3633
	rel->rd_rel->relam = accessmtd;

3634 3635 3636 3637 3638 3639 3640
	/*
	 * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
	 * run it in CacheMemoryContext.  Fortunately, the remaining steps don't
	 * require a long-lived current context.
	 */
	MemoryContextSwitchTo(oldcxt);

3641 3642 3643 3644 3645 3646
	if (relkind == RELKIND_RELATION ||
		relkind == RELKIND_SEQUENCE ||
		relkind == RELKIND_TOASTVALUE ||
		relkind == RELKIND_MATVIEW)
		RelationInitTableAccessMethod(rel);

3647
	/*
3648 3649 3650 3651 3652 3653 3654
	 * Okay to insert into the relcache hash table.
	 *
	 * Ordinarily, there should certainly not be an existing hash entry for
	 * the same OID; but during bootstrap, when we create a "real" relcache
	 * entry for one of the bootstrap relations, we'll be overwriting the
	 * phony one created with formrdesc.  So allow that to happen for nailed
	 * rels.
3655
	 */
3656
	RelationCacheInsert(rel, nailit);
3657

3658
	/*
Bruce Momjian's avatar
Bruce Momjian committed
3659 3660
	 * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
	 * can't do this before storing relid in it.
3661 3662 3663
	 */
	EOXactListAdd(rel);

3664 3665 3666
	/* It's fully valid */
	rel->rd_isvalid = true;

3667 3668 3669 3670 3671
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3672
	return rel;
3673 3674
}

3675 3676 3677 3678

/*
 * RelationSetNewRelfilenode
 *
3679 3680
 * Assign a new relfilenode (physical file name), and possibly a new
 * persistence setting, to the relation.
3681 3682 3683 3684 3685 3686 3687 3688 3689 3690
 *
 * This allows a full rewrite of the relation to be done with transactional
 * safety (since the filenode assignment can be rolled back).  Note however
 * that there is no simple way to access the relation's old data for the
 * remainder of the current transaction.  This limits the usefulness to cases
 * such as TRUNCATE or rebuilding an index from scratch.
 *
 * Caller must already hold exclusive lock on the relation.
 */
void
3691
RelationSetNewRelfilenode(Relation relation, char persistence)
3692 3693 3694 3695 3696
{
	Oid			newrelfilenode;
	Relation	pg_class;
	HeapTuple	tuple;
	Form_pg_class classform;
3697 3698
	MultiXactId minmulti = InvalidMultiXactId;
	TransactionId freezeXid = InvalidTransactionId;
3699
	RelFileNode newrnode;
3700 3701

	/* Allocate a new relfilenode */
3702
	newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3703
									   persistence);
3704 3705

	/*
3706
	 * Get a writable copy of the pg_class tuple for the given relation.
3707
	 */
3708
	pg_class = table_open(RelationRelationId, RowExclusiveLock);
3709

3710 3711
	tuple = SearchSysCacheCopy1(RELOID,
								ObjectIdGetDatum(RelationGetRelid(relation)));
3712 3713 3714 3715 3716 3717 3718 3719 3720 3721
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find tuple for relation %u",
			 RelationGetRelid(relation));
	classform = (Form_pg_class) GETSTRUCT(tuple);

	/*
	 * Schedule unlinking of the old storage at transaction commit.
	 */
	RelationDropStorage(relation);

3722
	/*
3723 3724 3725
	 * Create storage for the main fork of the new relfilenode.  If it's a
	 * table-like object, call into the table AM to do so, which'll also
	 * create the table's init fork if needed.
3726
	 *
3727 3728
	 * NOTE: If relevant for the AM, any conflict in relfilenode value will be
	 * caught here, if GetNewRelFileNode messes up for any reason.
3729
	 */
3730 3731
	newrnode = relation->rd_node;
	newrnode.relNode = newrelfilenode;
3732 3733 3734 3735 3736

	switch (relation->rd_rel->relkind)
	{
		case RELKIND_INDEX:
		case RELKIND_SEQUENCE:
3737
			{
3738
				/* handle these directly, at least for now */
3739 3740 3741 3742 3743
				SMgrRelation srel;

				srel = RelationCreateStorage(newrnode, persistence);
				smgrclose(srel);
			}
3744 3745 3746 3747 3748
			break;

		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
		case RELKIND_MATVIEW:
3749 3750
			table_relation_set_new_filenode(relation, &newrnode,
											persistence,
3751 3752
											&freezeXid, &minmulti);
			break;
3753 3754 3755 3756 3757 3758

		default:
			/* we shouldn't be called for anything else */
			elog(ERROR, "relation \"%s\" does not have storage",
				 RelationGetRelationName(relation));
			break;
3759
	}
3760

3761
	/*
3762 3763 3764 3765 3766 3767 3768
	 * If we're dealing with a mapped index, pg_class.relfilenode doesn't
	 * change; instead we have to send the update to the relation mapper.
	 *
	 * For mapped indexes, we don't actually change the pg_class entry at all;
	 * this is essential when reindexing pg_class itself.  That leaves us with
	 * possibly-inaccurate values of relpages etc, but those will be fixed up
	 * later.
3769 3770
	 */
	if (RelationIsMapped(relation))
3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788
	{
		/* This case is only supported for indexes */
		Assert(relation->rd_rel->relkind == RELKIND_INDEX);

		/* Since we're not updating pg_class, these had better not change */
		Assert(classform->relfrozenxid == freezeXid);
		Assert(classform->relminmxid == minmulti);
		Assert(classform->relpersistence == persistence);

		/*
		 * In some code paths it's possible that the tuple update we'd
		 * otherwise do here is the only thing that would assign an XID for
		 * the current transaction.  However, we must have an XID to delete
		 * files, so make sure one is assigned.
		 */
		(void) GetCurrentTransactionId();

		/* Do the deed */
3789 3790 3791 3792
		RelationMapUpdateMap(RelationGetRelid(relation),
							 newrelfilenode,
							 relation->rd_rel->relisshared,
							 false);
3793 3794 3795 3796

		/* Since we're not updating pg_class, must trigger inval manually */
		CacheInvalidateRelcache(relation);
	}
3797
	else
3798 3799
	{
		/* Normal case, update the pg_class entry */
3800 3801
		classform->relfilenode = newrelfilenode;

3802 3803 3804 3805
		/* relpages etc. never change for sequences */
		if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
		{
			classform->relpages = 0;	/* it's empty until further notice */
3806
			classform->reltuples = -1;
3807 3808 3809 3810 3811
			classform->relallvisible = 0;
		}
		classform->relfrozenxid = freezeXid;
		classform->relminmxid = minmulti;
		classform->relpersistence = persistence;
3812

3813 3814
		CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
	}
3815 3816 3817

	heap_freetuple(tuple);

3818
	table_close(pg_class, RowExclusiveLock);
3819 3820

	/*
3821 3822
	 * Make the pg_class row change or relation map change visible.  This will
	 * cause the relcache entry to get updated, too.
3823 3824 3825
	 */
	CommandCounterIncrement();

3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843
	RelationAssumeNewRelfilenode(relation);
}

/*
 * RelationAssumeNewRelfilenode
 *
 * Code that modifies pg_class.reltablespace or pg_class.relfilenode must call
 * this.  The call shall precede any code that might insert WAL records whose
 * replay would modify bytes in the new RelFileNode, and the call shall follow
 * any WAL modifying bytes in the prior RelFileNode.  See struct RelationData.
 * Ideally, call this as near as possible to the CommandCounterIncrement()
 * that makes the pg_class change visible (before it or after it); that
 * minimizes the chance of future development adding a forbidden WAL insertion
 * between RelationAssumeNewRelfilenode() and CommandCounterIncrement().
 */
void
RelationAssumeNewRelfilenode(Relation relation)
{
3844
	relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3845 3846
	if (relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
		relation->rd_firstRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
3847

3848
	/* Flag relation as needing eoxact cleanup (to clear these fields) */
3849
	EOXactListAdd(relation);
3850 3851 3852
}


3853
/*
3854
 *		RelationCacheInitialize
3855
 *
3856 3857
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3858 3859 3860 3861 3862
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3863 3864
 */

3865
#define INITRELCACHESIZE		400
3866 3867

void
3868
RelationCacheInitialize(void)
3869
{
3870
	HASHCTL		ctl;
3871
	int			allocsize;
3872

3873
	/*
3874
	 * make sure cache memory context exists
3875
	 */
3876 3877
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3878

3879
	/*
3880
	 * create hashtable that indexes the relcache
3881 3882
	 */
	ctl.keysize = sizeof(Oid);
3883
	ctl.entrysize = sizeof(RelIdCacheEnt);
3884
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3885
								  &ctl, HASH_ELEM | HASH_BLOBS);
3886

3887 3888 3889 3890 3891 3892 3893 3894 3895
	/*
	 * reserve enough in_progress_list slots for many cases
	 */
	allocsize = 4;
	in_progress_list =
		MemoryContextAlloc(CacheMemoryContext,
						   allocsize * sizeof(*in_progress_list));
	in_progress_list_maxlen = allocsize;

3896
	/*
3897
	 * relation mapper needs to be initialized too
3898 3899
	 */
	RelationMapInitialize();
3900 3901 3902 3903 3904
}

/*
 *		RelationCacheInitializePhase2
 *
3905 3906
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
3907 3908 3909 3910 3911
 *		pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
 *		for their indexes, too.  We attempt to load this information from the
 *		shared relcache init file.  If that's missing or broken, just make
 *		phony entries for the catalogs themselves.
 *		RelationCacheInitializePhase3 will clean up as needed.
3912 3913 3914
 */
void
RelationCacheInitializePhase2(void)
3915 3916 3917
{
	MemoryContext oldcxt;

3918 3919 3920 3921 3922
	/*
	 * relation mapper needs initialized too
	 */
	RelationMapInitializePhase2();

3923
	/*
3924 3925
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
3926 3927 3928 3929 3930 3931 3932 3933 3934 3935
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
Bruce Momjian's avatar
Bruce Momjian committed
3936
	 * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3937
	 * the cache with pre-made descriptors for the critical shared catalogs.
3938 3939 3940
	 */
	if (!load_relcache_init_file(true))
	{
3941
		formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3942
				  Natts_pg_database, Desc_pg_database);
3943
		formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3944
				  Natts_pg_authid, Desc_pg_authid);
3945
		formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3946
				  Natts_pg_auth_members, Desc_pg_auth_members);
3947
		formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3948
				  Natts_pg_shseclabel, Desc_pg_shseclabel);
Peter Eisentraut's avatar
Peter Eisentraut committed
3949
		formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3950
				  Natts_pg_subscription, Desc_pg_subscription);
3951

Peter Eisentraut's avatar
Peter Eisentraut committed
3952
#define NUM_CRITICAL_SHARED_RELS	5	/* fix if you change list above */
3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3974 3975 3976 3977
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3978
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3979

3980 3981 3982 3983 3984
	/*
	 * relation mapper needs initialized too
	 */
	RelationMapInitializePhase3();

3985
	/*
3986 3987 3988 3989 3990
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
Bruce Momjian's avatar
Bruce Momjian committed
3991 3992 3993
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
	 * catalogs.
3994
	 */
3995
	if (IsBootstrapProcessingMode() ||
3996
		!load_relcache_init_file(false))
3997
	{
3998 3999
		needNewCacheFile = true;

4000
		formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
4001
				  Natts_pg_class, Desc_pg_class);
4002
		formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
4003
				  Natts_pg_attribute, Desc_pg_attribute);
4004
		formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
4005
				  Natts_pg_proc, Desc_pg_proc);
4006
		formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
4007
				  Natts_pg_type, Desc_pg_type);
4008

Tom Lane's avatar
Tom Lane committed
4009
#define NUM_CRITICAL_LOCAL_RELS 4	/* fix if you change list above */
4010
	}
4011 4012

	MemoryContextSwitchTo(oldcxt);
4013

4014
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
4015 4016 4017
	if (IsBootstrapProcessingMode())
		return;

4018
	/*
4019
	 * If we didn't get the critical system indexes loaded into relcache, do
Bruce Momjian's avatar
Bruce Momjian committed
4020
	 * so now.  These are critical because the catcache and/or opclass cache
4021
	 * depend on them for fetches done during relcache load.  Thus, we have an
Bruce Momjian's avatar
Bruce Momjian committed
4022
	 * infinite-recursion problem.  We can break the recursion by doing
4023 4024 4025 4026 4027
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
4028
	 *
4029 4030 4031 4032 4033 4034
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
4035 4036 4037 4038
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
Bruce Momjian's avatar
Bruce Momjian committed
4039
	 * rebuilt without inducing recursion.  However they are used during
4040 4041
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
4042
	 */
Bruce Momjian's avatar
Bruce Momjian committed
4043
	if (!criticalRelcachesBuilt)
4044
	{
4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);
4059

4060
#define NUM_CRITICAL_LOCAL_INDEXES	7	/* fix if you change list above */
4061 4062 4063 4064

		criticalRelcachesBuilt = true;
	}

4065 4066 4067
	/*
	 * Process critical shared indexes too.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
4068 4069
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
Bruce Momjian's avatar
Bruce Momjian committed
4070
	 * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
4071 4072
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
4073 4074 4075
	 * during client authentication.  SharedSecLabelObjectIndexId isn't
	 * critical for the core system, but authentication hooks might be
	 * interested in it.
4076 4077 4078
	 */
	if (!criticalSharedRelcachesBuilt)
	{
4079 4080 4081 4082
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
4083 4084 4085 4086 4087 4088
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);
4089 4090
		load_critical_index(SharedSecLabelObjectIndexId,
							SharedSecLabelRelationId);
4091

4092
#define NUM_CRITICAL_SHARED_INDEXES 6	/* fix if you change list above */
4093 4094 4095 4096

		criticalSharedRelcachesBuilt = true;
	}

4097
	/*
4098 4099 4100 4101
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
4102 4103
	 * relcache entries have rules, triggers, or security policies, load that
	 * info the hard way since it isn't recorded in the cache file.
4104
	 *
Bruce Momjian's avatar
Bruce Momjian committed
4105 4106
	 * Whenever we access the catalogs to read data, there is a possibility of
	 * a shared-inval cache flush causing relcache entries to be removed.
4107 4108 4109 4110 4111
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
4112
	 */
4113
	hash_seq_init(&status, RelationIdCache);
4114

4115
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4116
	{
4117
		Relation	relation = idhentry->reldesc;
4118 4119 4120 4121 4122 4123
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
4124

4125
		/*
4126
		 * If it's a faked-up entry, read the real pg_class tuple.
4127
		 */
4128
		if (relation->rd_rel->relowner == InvalidOid)
4129 4130 4131
		{
			HeapTuple	htup;
			Form_pg_class relp;
4132

4133
			htup = SearchSysCache1(RELOID,
Tom Lane's avatar
Tom Lane committed
4134
								   ObjectIdGetDatum(RelationGetRelid(relation)));
4135
			if (!HeapTupleIsValid(htup))
4136 4137
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
4138
			relp = (Form_pg_class) GETSTRUCT(htup);
Bruce Momjian's avatar
Bruce Momjian committed
4139

4140 4141 4142 4143 4144
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
4145

4146 4147 4148 4149 4150
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

4151
			/*
4152
			 * Check the values in rd_att were set up correctly.  (We cannot
Bruce Momjian's avatar
Bruce Momjian committed
4153 4154 4155
			 * just copy them over now: formrdesc must have set up the rd_att
			 * data correctly to start with, because it may already have been
			 * copied into one or more catcache entries.)
4156
			 */
4157 4158
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
4159

4160
			ReleaseSysCache(htup);
4161 4162 4163 4164 4165 4166 4167

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
4168 4169 4170 4171
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
4172 4173 4174 4175 4176
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
4177 4178
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
4179
		{
4180
			RelationBuildRuleLock(relation);
4181 4182 4183 4184
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
4185
		if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
4186
		{
4187
			RelationBuildTriggers(relation);
4188 4189 4190 4191 4192
			if (relation->trigdesc == NULL)
				relation->rd_rel->relhastriggers = false;
			restart = true;
		}

4193 4194 4195
		/*
		 * Re-load the row security policies if the relation has them, since
		 * they are not preserved in the cache.  Note that we can never NOT
4196
		 * have a policy while relrowsecurity is true,
4197
		 * RelationBuildRowSecurity will create a single default-deny policy
4198
		 * if there is no policy defined in pg_policy.
4199
		 */
4200
		if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
4201 4202 4203
		{
			RelationBuildRowSecurity(relation);

Bruce Momjian's avatar
Bruce Momjian committed
4204
			Assert(relation->rd_rsdesc != NULL);
4205 4206 4207
			restart = true;
		}

4208
		/* Reload tableam data if needed */
4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220
		if (relation->rd_tableam == NULL &&
			(relation->rd_rel->relkind == RELKIND_RELATION ||
			 relation->rd_rel->relkind == RELKIND_SEQUENCE ||
			 relation->rd_rel->relkind == RELKIND_TOASTVALUE ||
			 relation->rd_rel->relkind == RELKIND_MATVIEW))
		{
			RelationInitTableAccessMethod(relation);
			Assert(relation->rd_tableam != NULL);

			restart = true;
		}

4221 4222 4223 4224 4225 4226 4227 4228 4229
		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
4230
	}
4231

4232
	/*
4233 4234
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
4235
	 */
4236 4237 4238
	if (needNewCacheFile)
	{
		/*
4239 4240 4241
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
4242
		 * that the init files will be most useful for future backends.
4243 4244 4245
		 */
		InitCatalogCachePhase2();

4246 4247 4248
		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
4249 4250 4251
	}
}

4252 4253
/*
 * Load one critical system index into the relcache
4254 4255 4256
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
4257 4258
 */
static void
4259
load_critical_index(Oid indexoid, Oid heapoid)
4260 4261 4262
{
	Relation	ird;

4263 4264 4265 4266 4267 4268 4269
	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
4270
	LockRelationOid(indexoid, AccessShareLock);
4271
	ird = RelationBuildDesc(indexoid, true);
4272 4273 4274 4275 4276
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
4277
	UnlockRelationOid(heapoid, AccessShareLock);
4278 4279

	(void) RelationGetIndexAttOptions(ird, false);
4280 4281
}

4282
/*
4283
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
4284 4285 4286
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
4287 4288 4289 4290 4291 4292
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
4293 4294
 */
static TupleDesc
4295
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
4296
{
4297
	TupleDesc	result;
4298 4299 4300 4301 4302
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

4303
	result = CreateTemplateTupleDesc(natts);
Tom Lane's avatar
Tom Lane committed
4304
	result->tdtypeid = RECORDOID;	/* not right, but we don't care */
4305
	result->tdtypmod = -1;
4306

4307
	for (i = 0; i < natts; i++)
4308
	{
4309
		memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
4310
		/* make sure attcacheoff is valid */
4311
		TupleDescAttr(result, i)->attcacheoff = -1;
4312 4313 4314
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
4315
	TupleDescAttr(result, 0)->attcacheoff = 0;
4316 4317 4318 4319 4320

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
4332
											   Desc_pg_class);
4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
4345
											   Desc_pg_index);
4346

4347 4348 4349
	return pgindexdesc;
}

4350 4351
/*
 * Load any default attribute value definitions for the relation.
4352 4353 4354 4355 4356 4357
 *
 * ndef is the number of attributes that were marked atthasdef.
 *
 * Note: we don't make it a hard error to be missing some pg_attrdef records.
 * We can limp along as long as nothing needs to use the default value.  Code
 * that fails to find an expected AttrDefault record should throw an error.
4358
 */
4359
static void
4360
AttrDefaultFetch(Relation relation, int ndef)
4361
{
4362
	AttrDefault *attrdef;
4363
	Relation	adrel;
4364
	SysScanDesc adscan;
4365
	ScanKeyData skey;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
4366
	HeapTuple	htup;
4367 4368 4369 4370 4371 4372
	int			found = 0;

	/* Allocate array with room for as many entries as expected */
	attrdef = (AttrDefault *)
		MemoryContextAllocZero(CacheMemoryContext,
							   ndef * sizeof(AttrDefault));
4373

4374
	/* Search pg_attrdef for relevant entries */
4375 4376 4377 4378
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
4379

4380
	adrel = table_open(AttrDefaultRelationId, AccessShareLock);
4381
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
4382
								NULL, 1, &skey);
4383

4384
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
4385
	{
4386
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
4387 4388
		Datum		val;
		bool		isnull;
4389

4390 4391
		/* protect limited size of array */
		if (found >= ndef)
4392
		{
4393 4394
			elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
				 adform->adnum, RelationGetRelationName(relation));
4395 4396
			break;
		}
4397

4398 4399 4400 4401 4402
		val = fastgetattr(htup,
						  Anum_pg_attrdef_adbin,
						  adrel->rd_att, &isnull);
		if (isnull)
			elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
4403
				 adform->adnum, RelationGetRelationName(relation));
4404 4405 4406 4407 4408 4409 4410 4411 4412 4413
		else
		{
			/* detoast and convert to cstring in caller's context */
			char	   *s = TextDatumGetCString(val);

			attrdef[found].adnum = adform->adnum;
			attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
			pfree(s);
			found++;
		}
4414 4415
	}

4416
	systable_endscan(adscan);
4417
	table_close(adrel, AccessShareLock);
4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445

	if (found != ndef)
		elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
			 ndef - found, RelationGetRelationName(relation));

	/*
	 * Sort the AttrDefault entries by adnum, for the convenience of
	 * equalTupleDescs().  (Usually, they already will be in order, but this
	 * might not be so if systable_getnext isn't using an index.)
	 */
	if (found > 1)
		qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);

	/* Install array only after it's fully valid */
	relation->rd_att->constr->defval = attrdef;
	relation->rd_att->constr->num_defval = found;
}

/*
 * qsort comparator to sort AttrDefault entries by adnum
 */
static int
AttrDefaultCmp(const void *a, const void *b)
{
	const AttrDefault *ada = (const AttrDefault *) a;
	const AttrDefault *adb = (const AttrDefault *) b;

	return ada->adnum - adb->adnum;
4446 4447
}

4448 4449
/*
 * Load any check constraints for the relation.
4450 4451 4452
 *
 * As with defaults, if we don't find the expected number of them, just warn
 * here.  The executor should throw an error if an INSERT/UPDATE is attempted.
4453
 */
4454
static void
4455
CheckConstraintFetch(Relation relation)
4456
{
4457 4458
	ConstrCheck *check;
	int			ncheck = relation->rd_rel->relchecks;
4459 4460 4461
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey[1];
Hiroshi Inoue's avatar
Hiroshi Inoue committed
4462
	HeapTuple	htup;
4463
	int			found = 0;
4464

4465 4466 4467 4468 4469 4470
	/* Allocate array with room for as many entries as expected */
	check = (ConstrCheck *)
		MemoryContextAllocZero(CacheMemoryContext,
							   ncheck * sizeof(ConstrCheck));

	/* Search pg_constraint for relevant entries */
4471 4472 4473 4474
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
4475

4476
	conrel = table_open(ConstraintRelationId, AccessShareLock);
4477
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4478
								 NULL, 1, skey);
4479

4480
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4481
	{
4482
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4483 4484
		Datum		val;
		bool		isnull;
4485 4486 4487 4488 4489

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

4490
		/* protect limited size of array */
4491
		if (found >= ncheck)
4492 4493
		{
			elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
4494
				 RelationGetRelationName(relation));
4495 4496
			break;
		}
4497

4498
		check[found].ccvalid = conform->convalidated;
4499
		check[found].ccnoinherit = conform->connoinherit;
4500
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4501
												  NameStr(conform->conname));
4502 4503

		/* Grab and test conbin is actually set */
4504
		val = fastgetattr(htup,
4505 4506
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
4507
		if (isnull)
4508
			elog(WARNING, "null conbin for relation \"%s\"",
4509
				 RelationGetRelationName(relation));
4510 4511 4512 4513
		else
		{
			/* detoast and convert to cstring in caller's context */
			char	   *s = TextDatumGetCString(val);
4514

4515 4516 4517 4518
			check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
			pfree(s);
			found++;
		}
4519 4520
	}

4521
	systable_endscan(conscan);
4522
	table_close(conrel, AccessShareLock);
4523 4524

	if (found != ncheck)
4525
		elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
4526
			 ncheck - found, RelationGetRelationName(relation));
4527

4528 4529 4530 4531 4532 4533 4534 4535 4536 4537
	/*
	 * Sort the records by name.  This ensures that CHECKs are applied in a
	 * deterministic order, and it also makes equalTupleDescs() faster.
	 */
	if (found > 1)
		qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);

	/* Install array only after it's fully valid */
	relation->rd_att->constr->check = check;
	relation->rd_att->constr->num_check = found;
4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549
}

/*
 * qsort comparator to sort ConstrCheck entries by name
 */
static int
CheckConstraintCmp(const void *a, const void *b)
{
	const ConstrCheck *ca = (const ConstrCheck *) a;
	const ConstrCheck *cb = (const ConstrCheck *) b;

	return strcmp(ca->ccname, cb->ccname);
4550 4551
}

4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580
/*
 * RelationGetFKeyList -- get a list of foreign key info for the relation
 *
 * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
 * the given relation.  This data is a direct copy of relevant fields from
 * pg_constraint.  The list items are in no particular order.
 *
 * CAUTION: the returned list is part of the relcache's data, and could
 * vanish in a relcache entry reset.  Callers must inspect or copy it
 * before doing anything that might trigger a cache flush, such as
 * system catalog accesses.  copyObject() can be used if desired.
 * (We define it this way because current callers want to filter and
 * modify the list entries anyway, so copying would be a waste of time.)
 */
List *
RelationGetFKeyList(Relation relation)
{
	List	   *result;
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey;
	HeapTuple	htup;
	List	   *oldlist;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
	if (relation->rd_fkeyvalid)
		return relation->rd_fkeylist;

4581 4582 4583
	/* Fast path: non-partitioned tables without triggers can't have FKs */
	if (!relation->rd_rel->relhastriggers &&
		relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597 4598 4599
		return NIL;

	/*
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.  After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
	 */
	result = NIL;

	/* Prepare to scan pg_constraint for entries having conrelid = this rel. */
	ScanKeyInit(&skey,
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

4600
	conrel = table_open(ConstraintRelationId, AccessShareLock);
4601
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612 4613
								 NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
	{
		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
		ForeignKeyCacheInfo *info;

		/* consider only foreign keys */
		if (constraint->contype != CONSTRAINT_FOREIGN)
			continue;

		info = makeNode(ForeignKeyCacheInfo);
4614
		info->conoid = constraint->oid;
4615 4616 4617
		info->conrelid = constraint->conrelid;
		info->confrelid = constraint->confrelid;

4618 4619 4620 4621 4622
		DeconstructFkConstraintRow(htup, &info->nkeys,
								   info->conkey,
								   info->confkey,
								   info->conpfeqop,
								   NULL, NULL);
4623 4624 4625 4626 4627 4628

		/* Add FK's node to the result list */
		result = lappend(result, info);
	}

	systable_endscan(conscan);
4629
	table_close(conrel, AccessShareLock);
4630 4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643

	/* Now save a copy of the completed list in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	oldlist = relation->rd_fkeylist;
	relation->rd_fkeylist = copyObject(result);
	relation->rd_fkeyvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* Don't leak the old list, if there is one */
	list_free_deep(oldlist);

	return result;
}

4644 4645 4646 4647 4648 4649
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
Tom Lane's avatar
Tom Lane committed
4650
 * relcache entry will delete the old list and set rd_indexvalid to false,
4651 4652 4653
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
4654
 * Indexes that are marked not indislive are omitted from the returned list.
4655 4656 4657
 * Such indexes are expected to be dropped momentarily, and should not be
 * touched at all by any caller of this function.
 *
4658 4659 4660 4661 4662 4663
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
4664 4665
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
4666
 * may list_free() the returned list after scanning it. This is necessary
4667 4668
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
4669
 *
Tom Lane's avatar
Tom Lane committed
4670 4671 4672 4673
 * In exactly the same way, we update rd_pkindex, which is the OID of the
 * relation's primary key index if any, else InvalidOid; and rd_replidindex,
 * which is the pg_class OID of an index to be used as the relation's
 * replication identity index, or InvalidOid if there is no such index.
4674 4675 4676 4677 4678
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
Bruce Momjian's avatar
Bruce Momjian committed
4679
	SysScanDesc indscan;
4680
	ScanKeyData skey;
4681
	HeapTuple	htup;
4682
	List	   *result;
4683
	List	   *oldlist;
4684 4685 4686
	char		replident = relation->rd_rel->relreplident;
	Oid			pkeyIndex = InvalidOid;
	Oid			candidateIndex = InvalidOid;
4687 4688 4689
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
Tom Lane's avatar
Tom Lane committed
4690
	if (relation->rd_indexvalid)
4691
		return list_copy(relation->rd_indexlist);
4692 4693

	/*
4694
	 * We build the list we intend to return (in the caller's context) while
Bruce Momjian's avatar
Bruce Momjian committed
4695
	 * doing the scan.  After successfully completing the scan, we copy that
4696 4697
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
4698 4699
	 */
	result = NIL;
4700

4701
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
4702 4703 4704 4705
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
4706

4707
	indrel = table_open(IndexRelationId, AccessShareLock);
4708
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4709
								 NULL, 1, &skey);
4710

4711 4712 4713
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4714

4715
		/*
4716 4717 4718 4719
		 * Ignore any indexes that are currently being dropped.  This will
		 * prevent them from being searched, inserted into, or considered in
		 * HOT-safety decisions.  It's unsafe to touch such an index at all
		 * since its catalog entries could disappear at any instant.
4720
		 */
4721
		if (!index->indislive)
4722 4723
			continue;

4724 4725
		/* add index's OID to result list */
		result = lappend_oid(result, index->indexrelid);
4726

4727 4728
		/*
		 * Invalid, non-unique, non-immediate or predicate indexes aren't
4729 4730
		 * interesting for either oid indexes or replication identity indexes,
		 * so don't check them.
4731
		 */
4732
		if (!index->indisvalid || !index->indisunique ||
4733
			!index->indimmediate ||
4734
			!heap_attisnull(htup, Anum_pg_index_indpred, NULL))
4735 4736
			continue;

4737
		/* remember primary key index if any */
4738 4739 4740
		if (index->indisprimary)
			pkeyIndex = index->indexrelid;

4741
		/* remember explicitly chosen replica index */
4742 4743
		if (index->indisreplident)
			candidateIndex = index->indexrelid;
4744 4745
	}

4746
	systable_endscan(indscan);
4747

4748
	table_close(indrel, AccessShareLock);
4749

4750 4751 4752
	/* Sort the result list into OID order, per API spec. */
	list_sort(result, list_oid_cmp);

4753
	/* Now save a copy of the completed list in the relcache entry. */
4754
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4755
	oldlist = relation->rd_indexlist;
4756
	relation->rd_indexlist = list_copy(result);
Peter Eisentraut's avatar
Peter Eisentraut committed
4757
	relation->rd_pkindex = pkeyIndex;
4758 4759 4760 4761 4762 4763
	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
		relation->rd_replidindex = pkeyIndex;
	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
		relation->rd_replidindex = candidateIndex;
	else
		relation->rd_replidindex = InvalidOid;
Tom Lane's avatar
Tom Lane committed
4764
	relation->rd_indexvalid = true;
4765 4766
	MemoryContextSwitchTo(oldcxt);

4767 4768 4769
	/* Don't leak the old list, if there is one */
	list_free(oldlist);

4770 4771 4772
	return result;
}

4773 4774
/*
 * RelationGetStatExtList
4775
 *		get a list of OIDs of statistics objects on this relation
4776 4777 4778 4779 4780 4781 4782
 *
 * The statistics list is created only if someone requests it, in a way
 * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
 * relevant statistics, and add the list to the relcache entry so that we
 * won't have to compute it again.  Note that shared cache inval of a
 * relcache entry will delete the old list and set rd_statvalid to 0,
 * so that we must recompute the statistics list on next request.  This
4783
 * handles creation or deletion of a statistics object.
4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816
 *
 * The returned list is guaranteed to be sorted in order by OID, although
 * this is not currently needed.
 *
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
 * may list_free() the returned list after scanning it. This is necessary
 * since the caller will typically be doing syscache lookups on the relevant
 * statistics, and syscache lookup could cause SI messages to be processed!
 */
List *
RelationGetStatExtList(Relation relation)
{
	Relation	indrel;
	SysScanDesc indscan;
	ScanKeyData skey;
	HeapTuple	htup;
	List	   *result;
	List	   *oldlist;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
	if (relation->rd_statvalid != 0)
		return list_copy(relation->rd_statlist);

	/*
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.  After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
	 */
	result = NIL;

4817 4818 4819 4820
	/*
	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
	 * rel.
	 */
4821
	ScanKeyInit(&skey,
4822
				Anum_pg_statistic_ext_stxrelid,
4823 4824 4825
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

4826
	indrel = table_open(StatisticExtRelationId, AccessShareLock);
4827 4828 4829 4830
	indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
								 NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4831
	{
4832
		Oid			oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
4833

4834
		result = lappend_oid(result, oid);
4835
	}
4836 4837 4838

	systable_endscan(indscan);

4839
	table_close(indrel, AccessShareLock);
4840

4841 4842 4843
	/* Sort the result list into OID order, per API spec. */
	list_sort(result, list_oid_cmp);

4844 4845 4846 4847 4848 4849 4850 4851 4852 4853 4854 4855 4856 4857
	/* Now save a copy of the completed list in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	oldlist = relation->rd_statlist;
	relation->rd_statlist = list_copy(result);

	relation->rd_statvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* Don't leak the old list, if there is one */
	list_free(oldlist);

	return result;
}

Peter Eisentraut's avatar
Peter Eisentraut committed
4858 4859 4860 4861 4862 4863 4864 4865 4866 4867
/*
 * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetPrimaryKeyIndex(Relation relation)
{
	List	   *ilist;

Tom Lane's avatar
Tom Lane committed
4868
	if (!relation->rd_indexvalid)
Peter Eisentraut's avatar
Peter Eisentraut committed
4869 4870 4871 4872
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
Tom Lane's avatar
Tom Lane committed
4873
		Assert(relation->rd_indexvalid);
Peter Eisentraut's avatar
Peter Eisentraut committed
4874 4875 4876 4877 4878
	}

	return relation->rd_pkindex;
}

4879 4880 4881 4882 4883 4884 4885 4886 4887 4888
/*
 * RelationGetReplicaIndex -- get OID of the relation's replica identity index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetReplicaIndex(Relation relation)
{
	List	   *ilist;

Tom Lane's avatar
Tom Lane committed
4889
	if (!relation->rd_indexvalid)
4890 4891 4892 4893
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
Tom Lane's avatar
Tom Lane committed
4894
		Assert(relation->rd_indexvalid);
4895 4896 4897 4898 4899
	}

	return relation->rd_replidindex;
}

4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
4920
		return copyObject(relation->rd_indexprs);
4921 4922 4923

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
4924
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4925 4926 4927
		return NIL;

	/*
4928 4929 4930
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4931 4932 4933 4934 4935
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
4936
	Assert(!isnull);
4937
	exprsString = TextDatumGetCString(exprsDatum);
4938 4939 4940 4941
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
4942 4943 4944
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
4945 4946
	 * matches without this.  We must not use canonicalize_qual, however,
	 * since these aren't qual expressions.
4947
	 */
4948
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4949 4950 4951 4952 4953

	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4954
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4955
	relation->rd_indexprs = copyObject(result);
4956 4957 4958 4959 4960
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972 4973 4974 4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011
/*
 * RelationGetDummyIndexExpressions -- get dummy expressions for an index
 *
 * Return a list of dummy expressions (just Const nodes) with the same
 * types/typmods/collations as the index's real expressions.  This is
 * useful in situations where we don't want to run any user-defined code.
 */
List *
RelationGetDummyIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	List	   *rawExprs;
	ListCell   *lc;

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
		return NIL;

	/* Extract raw node tree(s) from index tuple. */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
	Assert(!isnull);
	exprsString = TextDatumGetCString(exprsDatum);
	rawExprs = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/* Construct null Consts; the typlen and typbyval are arbitrary. */
	result = NIL;
	foreach(lc, rawExprs)
	{
		Node	   *rawExpr = (Node *) lfirst(lc);

		result = lappend(result,
						 makeConst(exprType(rawExpr),
								   exprTypmod(rawExpr),
								   exprCollation(rawExpr),
								   1,
								   (Datum) 0,
								   true,
								   true));
	}

	return result;
}

5012 5013 5014
/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
5015
 * We cache the result of transforming pg_index.indpred into an implicit-AND
Tom Lane's avatar
Tom Lane committed
5016
 * node tree (suitable for use in planning).
5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
5033
		return copyObject(relation->rd_indpred);
5034 5035 5036

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
5037
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
5038 5039 5040
		return NIL;

	/*
5041 5042 5043
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
5044 5045 5046 5047 5048
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
5049
	Assert(!isnull);
5050
	predString = TextDatumGetCString(predDatum);
5051 5052 5053 5054
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
5055 5056 5057 5058 5059
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
5060 5061
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
5062
	 */
5063
	result = (List *) eval_const_expressions(NULL, (Node *) result);
5064

5065
	result = (List *) canonicalize_qual((Expr *) result, false);
5066

5067 5068 5069
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

5070 5071 5072 5073
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
5074
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
5075
	relation->rd_indpred = copyObject(result);
5076 5077 5078 5079 5080
	MemoryContextSwitchTo(oldcxt);

	return result;
}

5081 5082 5083 5084 5085 5086 5087 5088
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
5089
 * Depending on attrKind, a bitmap covering the attnums for all index columns,
5090 5091
 * for all potential foreign key columns, or for all columns in the configured
 * replica identity index is returned.
5092
 *
5093 5094 5095
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
5096
 * Caller had better hold at least RowExclusiveLock on the target relation
5097 5098 5099 5100
 * to ensure it is safe (deadlock-free) for us to take locks on the relation's
 * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
 * that lock level doesn't guarantee a stable set of indexes, so we have to
 * be prepared to retry here in case of a change in the set of indexes.
5101
 *
5102 5103 5104 5105
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
5106
RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
5107
{
5108
	Bitmapset  *indexattrs;		/* indexed columns */
Bruce Momjian's avatar
Bruce Momjian committed
5109
	Bitmapset  *uindexattrs;	/* columns in unique indexes */
Peter Eisentraut's avatar
Peter Eisentraut committed
5110
	Bitmapset  *pkindexattrs;	/* columns in the primary index */
Bruce Momjian's avatar
Bruce Momjian committed
5111
	Bitmapset  *idindexattrs;	/* columns in the replica identity */
Bruce Momjian's avatar
Bruce Momjian committed
5112
	List	   *indexoidlist;
5113
	List	   *newindexoidlist;
Peter Eisentraut's avatar
Peter Eisentraut committed
5114
	Oid			relpkindex;
5115
	Oid			relreplindex;
Bruce Momjian's avatar
Bruce Momjian committed
5116
	ListCell   *l;
5117 5118 5119 5120
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
5121
	{
Bruce Momjian's avatar
Bruce Momjian committed
5122
		switch (attrKind)
5123
		{
5124
			case INDEX_ATTR_BITMAP_ALL:
5125
				return bms_copy(relation->rd_indexattr);
5126 5127
			case INDEX_ATTR_BITMAP_KEY:
				return bms_copy(relation->rd_keyattr);
Peter Eisentraut's avatar
Peter Eisentraut committed
5128 5129
			case INDEX_ATTR_BITMAP_PRIMARY_KEY:
				return bms_copy(relation->rd_pkattr);
5130 5131
			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
				return bms_copy(relation->rd_idattr);
5132 5133 5134
			default:
				elog(ERROR, "unknown attrKind %u", attrKind);
		}
5135
	}
5136 5137 5138 5139 5140 5141

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
5142
	 * Get cached list of index OIDs. If we have to start over, we do so here.
5143
	 */
5144
restart:
5145 5146 5147 5148 5149 5150
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

5151
	/*
Tom Lane's avatar
Tom Lane committed
5152
	 * Copy the rd_pkindex and rd_replidindex values computed by
Peter Eisentraut's avatar
Peter Eisentraut committed
5153 5154
	 * RelationGetIndexList before proceeding.  This is needed because a
	 * relcache flush could occur inside index_open below, resetting the
5155 5156
	 * fields managed by RelationGetIndexList.  We need to do the work with
	 * stable values of these fields.
5157
	 */
Peter Eisentraut's avatar
Peter Eisentraut committed
5158
	relpkindex = relation->rd_pkindex;
5159 5160
	relreplindex = relation->rd_replidindex;

5161 5162
	/*
	 * For each index, add referenced attributes to indexattrs.
5163 5164 5165 5166 5167 5168 5169
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
	 * CONCURRENTLY is far enough along that we should ignore the index, it
	 * won't be returned at all by RelationGetIndexList.
5170 5171
	 */
	indexattrs = NULL;
5172
	uindexattrs = NULL;
Peter Eisentraut's avatar
Peter Eisentraut committed
5173
	pkindexattrs = NULL;
5174
	idindexattrs = NULL;
5175 5176 5177 5178
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
5179 5180 5181 5182
		Datum		datum;
		bool		isnull;
		Node	   *indexExpressions;
		Node	   *indexPredicate;
Bruce Momjian's avatar
Bruce Momjian committed
5183
		int			i;
Bruce Momjian's avatar
Bruce Momjian committed
5184
		bool		isKey;		/* candidate key */
Peter Eisentraut's avatar
Peter Eisentraut committed
5185
		bool		isPK;		/* primary key */
Bruce Momjian's avatar
Bruce Momjian committed
5186
		bool		isIDKey;	/* replica identity index */
5187

5188 5189
		indexDesc = index_open(indexOid, AccessShareLock);

5190 5191 5192 5193 5194 5195 5196 5197 5198 5199 5200 5201 5202 5203 5204 5205 5206 5207 5208 5209 5210 5211
		/*
		 * Extract index expressions and index predicate.  Note: Don't use
		 * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
		 * those might run constant expressions evaluation, which needs a
		 * snapshot, which we might not have here.  (Also, it's probably more
		 * sound to collect the bitmaps before any transformations that might
		 * eliminate columns, but the practical impact of this is limited.)
		 */

		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
							 GetPgIndexDescriptor(), &isnull);
		if (!isnull)
			indexExpressions = stringToNode(TextDatumGetCString(datum));
		else
			indexExpressions = NULL;

		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
							 GetPgIndexDescriptor(), &isnull);
		if (!isnull)
			indexPredicate = stringToNode(TextDatumGetCString(datum));
		else
			indexPredicate = NULL;
5212

5213
		/* Can this index be referenced by a foreign key? */
5214 5215 5216
		isKey = indexDesc->rd_index->indisunique &&
			indexExpressions == NULL &&
			indexPredicate == NULL;
5217

Peter Eisentraut's avatar
Peter Eisentraut committed
5218 5219 5220
		/* Is this a primary key? */
		isPK = (indexOid == relpkindex);

5221
		/* Is this index the configured (or default) replica identity? */
5222
		isIDKey = (indexOid == relreplindex);
5223

5224
		/* Collect simple attribute references */
5225
		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
5226
		{
5227
			int			attrnum = indexDesc->rd_index->indkey.values[i];
5228

5229 5230 5231 5232 5233 5234 5235 5236
			/*
			 * Since we have covering indexes with non-key columns, we must
			 * handle them accurately here. non-key columns must be added into
			 * indexattrs, since they are in index, and HOT-update shouldn't
			 * miss them. Obviously, non-key columns couldn't be referenced by
			 * foreign key or identity key. Hence we do not include them into
			 * uindexattrs, pkindexattrs and idindexattrs bitmaps.
			 */
5237
			if (attrnum != 0)
5238
			{
5239
				indexattrs = bms_add_member(indexattrs,
Tom Lane's avatar
Tom Lane committed
5240
											attrnum - FirstLowInvalidHeapAttributeNumber);
5241

5242
				if (isKey && i < indexDesc->rd_index->indnkeyatts)
5243
					uindexattrs = bms_add_member(uindexattrs,
Tom Lane's avatar
Tom Lane committed
5244
												 attrnum - FirstLowInvalidHeapAttributeNumber);
5245

5246
				if (isPK && i < indexDesc->rd_index->indnkeyatts)
Peter Eisentraut's avatar
Peter Eisentraut committed
5247
					pkindexattrs = bms_add_member(pkindexattrs,
Tom Lane's avatar
Tom Lane committed
5248
												  attrnum - FirstLowInvalidHeapAttributeNumber);
Peter Eisentraut's avatar
Peter Eisentraut committed
5249

5250
				if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
5251
					idindexattrs = bms_add_member(idindexattrs,
Tom Lane's avatar
Tom Lane committed
5252
												  attrnum - FirstLowInvalidHeapAttributeNumber);
5253
			}
5254 5255
		}

5256
		/* Collect all attributes used in expressions, too */
5257
		pull_varattnos(indexExpressions, 1, &indexattrs);
5258

5259
		/* Collect all attributes in the index predicate, too */
5260
		pull_varattnos(indexPredicate, 1, &indexattrs);
5261 5262 5263 5264

		index_close(indexDesc, AccessShareLock);
	}

5265 5266 5267 5268 5269 5270 5271 5272 5273 5274 5275 5276 5277 5278 5279 5280 5281 5282 5283 5284 5285 5286 5287 5288 5289 5290 5291
	/*
	 * During one of the index_opens in the above loop, we might have received
	 * a relcache flush event on this relcache entry, which might have been
	 * signaling a change in the rel's index list.  If so, we'd better start
	 * over to ensure we deliver up-to-date attribute bitmaps.
	 */
	newindexoidlist = RelationGetIndexList(relation);
	if (equal(indexoidlist, newindexoidlist) &&
		relpkindex == relation->rd_pkindex &&
		relreplindex == relation->rd_replidindex)
	{
		/* Still the same index set, so proceed */
		list_free(newindexoidlist);
		list_free(indexoidlist);
	}
	else
	{
		/* Gotta do it over ... might as well not leak memory */
		list_free(newindexoidlist);
		list_free(indexoidlist);
		bms_free(uindexattrs);
		bms_free(pkindexattrs);
		bms_free(idindexattrs);
		bms_free(indexattrs);

		goto restart;
	}
5292

5293 5294 5295 5296 5297
	/* Don't leak the old values of these bitmaps, if any */
	bms_free(relation->rd_indexattr);
	relation->rd_indexattr = NULL;
	bms_free(relation->rd_keyattr);
	relation->rd_keyattr = NULL;
Peter Eisentraut's avatar
Peter Eisentraut committed
5298 5299
	bms_free(relation->rd_pkattr);
	relation->rd_pkattr = NULL;
5300 5301 5302
	bms_free(relation->rd_idattr);
	relation->rd_idattr = NULL;

5303 5304 5305 5306 5307 5308 5309
	/*
	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
	 * set rd_indexattr last, because that's the one that signals validity of
	 * the values; if we run out of memory before making that copy, we won't
	 * leave the relcache entry looking like the other ones are valid but
	 * empty.
	 */
5310
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5311
	relation->rd_keyattr = bms_copy(uindexattrs);
Peter Eisentraut's avatar
Peter Eisentraut committed
5312
	relation->rd_pkattr = bms_copy(pkindexattrs);
5313
	relation->rd_idattr = bms_copy(idindexattrs);
5314
	relation->rd_indexattr = bms_copy(indexattrs);
5315 5316 5317
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
Bruce Momjian's avatar
Bruce Momjian committed
5318
	switch (attrKind)
5319
	{
5320
		case INDEX_ATTR_BITMAP_ALL:
5321
			return indexattrs;
5322 5323
		case INDEX_ATTR_BITMAP_KEY:
			return uindexattrs;
Peter Eisentraut's avatar
Peter Eisentraut committed
5324
		case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5325
			return pkindexattrs;
5326 5327
		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
			return idindexattrs;
5328 5329
		default:
			elog(ERROR, "unknown attrKind %u", attrKind);
5330
			return NULL;
5331
	}
5332 5333
}

5334 5335 5336 5337 5338 5339 5340 5341 5342 5343 5344 5345 5346 5347 5348 5349 5350 5351 5352 5353 5354
/*
 * RelationGetIdentityKeyBitmap -- get a bitmap of replica identity attribute
 * numbers
 *
 * A bitmap of index attribute numbers for the configured replica identity
 * index is returned.
 *
 * See also comments of RelationGetIndexAttrBitmap().
 *
 * This is a special purpose function used during logical replication. Here,
 * unlike RelationGetIndexAttrBitmap(), we don't acquire a lock on the required
 * index as we build the cache entry using a historic snapshot and all the
 * later changes are absorbed while decoding WAL. Due to this reason, we don't
 * need to retry here in case of a change in the set of indexes.
 */
Bitmapset *
RelationGetIdentityKeyBitmap(Relation relation)
{
	Bitmapset  *idindexattrs = NULL;	/* columns in the replica identity */
	Relation	indexDesc;
	int			i;
5355
	Oid			replidindex;
5356 5357 5358 5359 5360 5361 5362 5363 5364 5365 5366 5367 5368
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result */
	if (relation->rd_idattr != NULL)
		return bms_copy(relation->rd_idattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/* Historic snapshot must be set. */
	Assert(HistoricSnapshotActive());

5369
	replidindex = RelationGetReplicaIndex(relation);
5370

5371
	/* Fall out if there is no replica identity index */
5372
	if (!OidIsValid(replidindex))
5373 5374 5375
		return NULL;

	/* Look up the description for the replica identity index */
5376
	indexDesc = RelationIdGetRelation(replidindex);
5377 5378 5379 5380 5381 5382

	if (!RelationIsValid(indexDesc))
		elog(ERROR, "could not open relation with OID %u",
			 relation->rd_replidindex);

	/* Add referenced attributes to idindexattrs */
5383 5384 5385 5386 5387 5388 5389 5390 5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413
	for (i = 0; i < indexDesc->rd_index->indnatts; i++)
	{
		int			attrnum = indexDesc->rd_index->indkey.values[i];

		/*
		 * We don't include non-key columns into idindexattrs bitmaps. See
		 * RelationGetIndexAttrBitmap.
		 */
		if (attrnum != 0)
		{
			if (i < indexDesc->rd_index->indnkeyatts)
				idindexattrs = bms_add_member(idindexattrs,
											  attrnum - FirstLowInvalidHeapAttributeNumber);
		}
	}

	RelationClose(indexDesc);

	/* Don't leak the old values of these bitmaps, if any */
	bms_free(relation->rd_idattr);
	relation->rd_idattr = NULL;

	/* Now save copy of the bitmap in the relcache entry */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_idattr = bms_copy(idindexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return idindexattrs;
}

5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426 5427 5428
/*
 * RelationGetExclusionInfo -- get info about index's exclusion constraint
 *
 * This should be called only for an index that is known to have an
 * associated exclusion constraint.  It returns arrays (palloc'd in caller's
 * context) of the exclusion operator OIDs, their underlying functions'
 * OIDs, and their strategy numbers in the index's opclasses.  We cache
 * all this information since it requires a fair amount of work to get.
 */
void
RelationGetExclusionInfo(Relation indexRelation,
						 Oid **operators,
						 Oid **procs,
						 uint16 **strategies)
{
5429
	int			indnkeyatts;
5430 5431 5432 5433
	Oid		   *ops;
	Oid		   *funcs;
	uint16	   *strats;
	Relation	conrel;
Bruce Momjian's avatar
Bruce Momjian committed
5434 5435
	SysScanDesc conscan;
	ScanKeyData skey[1];
5436 5437 5438 5439 5440
	HeapTuple	htup;
	bool		found;
	MemoryContext oldcxt;
	int			i;

5441 5442
	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);

5443
	/* Allocate result space in caller context */
5444 5445 5446
	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5447 5448 5449 5450

	/* Quick exit if we have the data cached already */
	if (indexRelation->rd_exclstrats != NULL)
	{
5451 5452 5453
		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
5454 5455 5456 5457
		return;
	}

	/*
Bruce Momjian's avatar
Bruce Momjian committed
5458 5459 5460
	 * Search pg_constraint for the constraint associated with the index. To
	 * make this not too painfully slow, we use the index on conrelid; that
	 * will hold the parent relation's OID not the index's own OID.
5461 5462 5463 5464
	 *
	 * Note: if we wanted to rely on the constraint name matching the index's
	 * name, we could just do a direct lookup using pg_constraint's unique
	 * index.  For the moment it doesn't seem worth requiring that.
5465 5466 5467 5468 5469 5470
	 */
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(indexRelation->rd_index->indrelid));

5471
	conrel = table_open(ConstraintRelationId, AccessShareLock);
5472
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
5473
								 NULL, 1, skey);
5474 5475 5476 5477
	found = false;

	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
	{
Bruce Momjian's avatar
Bruce Momjian committed
5478
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493 5494 5495 5496 5497 5498 5499 5500 5501 5502 5503 5504 5505
		Datum		val;
		bool		isnull;
		ArrayType  *arr;
		int			nelem;

		/* We want the exclusion constraint owning the index */
		if (conform->contype != CONSTRAINT_EXCLUSION ||
			conform->conindid != RelationGetRelid(indexRelation))
			continue;

		/* There should be only one */
		if (found)
			elog(ERROR, "unexpected exclusion constraint record found for rel %s",
				 RelationGetRelationName(indexRelation));
		found = true;

		/* Extract the operator OIDS from conexclop */
		val = fastgetattr(htup,
						  Anum_pg_constraint_conexclop,
						  conrel->rd_att, &isnull);
		if (isnull)
			elog(ERROR, "null conexclop for rel %s",
				 RelationGetRelationName(indexRelation));

		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
		nelem = ARR_DIMS(arr)[0];
		if (ARR_NDIM(arr) != 1 ||
5506
			nelem != indnkeyatts ||
5507 5508 5509 5510
			ARR_HASNULL(arr) ||
			ARR_ELEMTYPE(arr) != OIDOID)
			elog(ERROR, "conexclop is not a 1-D Oid array");

5511
		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
5512 5513 5514
	}

	systable_endscan(conscan);
5515
	table_close(conrel, AccessShareLock);
5516 5517 5518 5519 5520 5521

	if (!found)
		elog(ERROR, "exclusion constraint record missing for rel %s",
			 RelationGetRelationName(indexRelation));

	/* We need the func OIDs and strategy numbers too */
5522
	for (i = 0; i < indnkeyatts; i++)
5523 5524 5525 5526 5527 5528 5529 5530 5531 5532 5533 5534
	{
		funcs[i] = get_opcode(ops[i]);
		strats[i] = get_op_opfamily_strategy(ops[i],
											 indexRelation->rd_opfamily[i]);
		/* shouldn't fail, since it was checked at index creation */
		if (strats[i] == InvalidStrategy)
			elog(ERROR, "could not find strategy for operator %u in family %u",
				 ops[i], indexRelation->rd_opfamily[i]);
	}

	/* Save a copy of the results in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
5535 5536 5537 5538 5539 5540
	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
5541 5542 5543
	MemoryContextSwitchTo(oldcxt);
}

Peter Eisentraut's avatar
Peter Eisentraut committed
5544 5545 5546 5547 5548 5549 5550 5551
/*
 * Get publication actions for the given relation.
 */
struct PublicationActions *
GetRelationPublicationActions(Relation relation)
{
	List	   *puboids;
	ListCell   *lc;
5552
	MemoryContext oldcxt;
Peter Eisentraut's avatar
Peter Eisentraut committed
5553 5554
	PublicationActions *pubactions = palloc0(sizeof(PublicationActions));

5555 5556 5557 5558 5559 5560 5561
	/*
	 * If not publishable, it publishes no actions.  (pgoutput_change() will
	 * ignore it.)
	 */
	if (!is_publishable_relation(relation))
		return pubactions;

Peter Eisentraut's avatar
Peter Eisentraut committed
5562 5563 5564 5565 5566 5567
	if (relation->rd_pubactions)
		return memcpy(pubactions, relation->rd_pubactions,
					  sizeof(PublicationActions));

	/* Fetch the publication membership info. */
	puboids = GetRelationPublications(RelationGetRelid(relation));
5568 5569 5570
	if (relation->rd_rel->relispartition)
	{
		/* Add publications that the ancestors are in too. */
5571 5572
		List	   *ancestors = get_partition_ancestors(RelationGetRelid(relation));
		ListCell   *lc;
5573 5574 5575

		foreach(lc, ancestors)
		{
5576
			Oid			ancestor = lfirst_oid(lc);
5577 5578 5579 5580 5581

			puboids = list_concat_unique_oid(puboids,
											 GetRelationPublications(ancestor));
		}
	}
Peter Eisentraut's avatar
Peter Eisentraut committed
5582 5583 5584 5585 5586 5587 5588 5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599
	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());

	foreach(lc, puboids)
	{
		Oid			pubid = lfirst_oid(lc);
		HeapTuple	tup;
		Form_pg_publication pubform;

		tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));

		if (!HeapTupleIsValid(tup))
			elog(ERROR, "cache lookup failed for publication %u", pubid);

		pubform = (Form_pg_publication) GETSTRUCT(tup);

		pubactions->pubinsert |= pubform->pubinsert;
		pubactions->pubupdate |= pubform->pubupdate;
		pubactions->pubdelete |= pubform->pubdelete;
5600
		pubactions->pubtruncate |= pubform->pubtruncate;
Peter Eisentraut's avatar
Peter Eisentraut committed
5601 5602 5603 5604

		ReleaseSysCache(tup);

		/*
5605 5606
		 * If we know everything is replicated, there is no point to check for
		 * other publications.
Peter Eisentraut's avatar
Peter Eisentraut committed
5607 5608
		 */
		if (pubactions->pubinsert && pubactions->pubupdate &&
5609
			pubactions->pubdelete && pubactions->pubtruncate)
Peter Eisentraut's avatar
Peter Eisentraut committed
5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622 5623 5624 5625 5626
			break;
	}

	if (relation->rd_pubactions)
	{
		pfree(relation->rd_pubactions);
		relation->rd_pubactions = NULL;
	}

	/* Now save copy of the actions in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_pubactions = palloc(sizeof(PublicationActions));
	memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
	MemoryContextSwitchTo(oldcxt);

	return pubactions;
}
5627

5628 5629 5630 5631 5632 5633 5634 5635 5636 5637 5638 5639 5640
/*
 * RelationGetIndexRawAttOptions -- get AM/opclass-specific options for the index
 */
Datum *
RelationGetIndexRawAttOptions(Relation indexrel)
{
	Oid			indexrelid = RelationGetRelid(indexrel);
	int16		natts = RelationGetNumberOfAttributes(indexrel);
	Datum	   *options = NULL;
	int16		attnum;

	for (attnum = 1; attnum <= natts; attnum++)
	{
5641 5642 5643
		if (indexrel->rd_indam->amoptsprocnum == 0)
			continue;

5644 5645 5646 5647 5648 5649 5650 5651 5652 5653 5654 5655 5656 5657 5658 5659 5660 5661 5662 5663 5664 5665 5666 5667 5668 5669 5670 5671 5672 5673 5674 5675 5676
		if (!OidIsValid(index_getprocid(indexrel, attnum,
										indexrel->rd_indam->amoptsprocnum)))
			continue;

		if (!options)
			options = palloc0(sizeof(Datum) * natts);

		options[attnum - 1] = get_attoptions(indexrelid, attnum);
	}

	return options;
}

static bytea **
CopyIndexAttOptions(bytea **srcopts, int natts)
{
	bytea	  **opts = palloc(sizeof(*opts) * natts);

	for (int i = 0; i < natts; i++)
	{
		bytea	   *opt = srcopts[i];

		opts[i] = !opt ? NULL : (bytea *)
			DatumGetPointer(datumCopy(PointerGetDatum(opt), false, -1));
	}

	return opts;
}

/*
 * RelationGetIndexAttOptions
 *		get AM/opclass-specific options for an index parsed into a binary form
 */
5677
bytea	  **
5678 5679 5680 5681 5682
RelationGetIndexAttOptions(Relation relation, bool copy)
{
	MemoryContext oldcxt;
	bytea	  **opts = relation->rd_opcoptions;
	Oid			relid = RelationGetRelid(relation);
5683 5684
	int			natts = RelationGetNumberOfAttributes(relation);	/* XXX
																	 * IndexRelationGetNumberOfKeyAttributes */
5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710 5711 5712 5713 5714 5715 5716 5717 5718 5719 5720 5721 5722 5723 5724 5725
	int			i;

	/* Try to copy cached options. */
	if (opts)
		return copy ? CopyIndexAttOptions(opts, natts) : opts;

	/* Get and parse opclass options. */
	opts = palloc0(sizeof(*opts) * natts);

	for (i = 0; i < natts; i++)
	{
		if (criticalRelcachesBuilt && relid != AttributeRelidNumIndexId)
		{
			Datum		attoptions = get_attoptions(relid, i + 1);

			opts[i] = index_opclass_options(relation, i + 1, attoptions, false);

			if (attoptions != (Datum) 0)
				pfree(DatumGetPointer(attoptions));
		}
	}

	/* Copy parsed options to the cache. */
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
	relation->rd_opcoptions = CopyIndexAttOptions(opts, natts);
	MemoryContextSwitchTo(oldcxt);

	if (copy)
		return opts;

	for (i = 0; i < natts; i++)
	{
		if (opts[i])
			pfree(opts[i]);
	}

	pfree(opts);

	return relation->rd_opcoptions;
}

5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744
/*
 * Routines to support ereport() reports of relation-related errors
 *
 * These could have been put into elog.c, but it seems like a module layering
 * violation to have elog.c calling relcache or syscache stuff --- and we
 * definitely don't want elog.h including rel.h.  So we put them here.
 */

/*
 * errtable --- stores schema_name and table_name of a table
 * within the current errordata.
 */
int
errtable(Relation rel)
{
	err_generic_string(PG_DIAG_SCHEMA_NAME,
					   get_namespace_name(RelationGetNamespace(rel)));
	err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));

Bruce Momjian's avatar
Bruce Momjian committed
5745
	return 0;					/* return value does not matter */
5746 5747 5748 5749 5750 5751 5752 5753 5754 5755 5756 5757 5758 5759 5760 5761 5762
}

/*
 * errtablecol --- stores schema_name, table_name and column_name
 * of a table column within the current errordata.
 *
 * The column is specified by attribute number --- for most callers, this is
 * easier and less error-prone than getting the column name for themselves.
 */
int
errtablecol(Relation rel, int attnum)
{
	TupleDesc	reldesc = RelationGetDescr(rel);
	const char *colname;

	/* Use reldesc if it's a user attribute, else consult the catalogs */
	if (attnum > 0 && attnum <= reldesc->natts)
5763
		colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
5764
	else
5765
		colname = get_attname(RelationGetRelid(rel), attnum, false);
5766 5767 5768 5769 5770 5771 5772 5773 5774 5775

	return errtablecolname(rel, colname);
}

/*
 * errtablecolname --- stores schema_name, table_name and column_name
 * of a table column within the current errordata, where the column name is
 * given directly rather than extracted from the relation's catalog data.
 *
 * Don't use this directly unless errtablecol() is inconvenient for some
Bruce Momjian's avatar
Bruce Momjian committed
5776
 * reason.  This might possibly be needed during intermediate states in ALTER
5777 5778 5779 5780 5781 5782 5783 5784
 * TABLE, for instance.
 */
int
errtablecolname(Relation rel, const char *colname)
{
	errtable(rel);
	err_generic_string(PG_DIAG_COLUMN_NAME, colname);

Bruce Momjian's avatar
Bruce Momjian committed
5785
	return 0;					/* return value does not matter */
5786 5787 5788 5789 5790 5791 5792 5793 5794 5795 5796 5797
}

/*
 * errtableconstraint --- stores schema_name, table_name and constraint_name
 * of a table-related constraint within the current errordata.
 */
int
errtableconstraint(Relation rel, const char *conname)
{
	errtable(rel);
	err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);

Bruce Momjian's avatar
Bruce Momjian committed
5798
	return 0;					/* return value does not matter */
5799 5800 5801
}


5802
/*
5803
 *	load_relcache_init_file, write_relcache_init_file
5804
 *
5805 5806 5807
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
5808
 *
5809 5810 5811 5812
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
5813
 *
5814
 *		In order to get around the problem, we do the following:
5815
 *
5816
 *		   +  When the database system is initialized (at initdb time), we
5817
 *			  don't use indexes.  We do sequential scans.
5818
 *
5819 5820 5821
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
5822
 *
5823
 *		   +  If the initialization file isn't there, then we create the
5824
 *			  relation descriptors using sequential scans and write 'em to
5825
 *			  the initialization file for use by subsequent backends.
5826
 *
5827
 *		As of Postgres 9.0, there is one local initialization file in each
5828 5829 5830
 *		database, plus one shared initialization file for shared catalogs.
 *
 *		We could dispense with the initialization files and just build the
5831
 *		critical reldescs the hard way on every backend startup, but that
5832 5833 5834 5835 5836 5837
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
5838
 *		by catcaches are stored in the initialization files.
5839
 *
Tom Lane's avatar
Tom Lane committed
5840 5841
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
5842 5843
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
5844 5845
 */

5846
/*
5847 5848
 * load_relcache_init_file -- attempt to load cache from the shared
 * or local cache init file
5849
 *
5850
 * If successful, return true and set criticalRelcachesBuilt or
5851
 * criticalSharedRelcachesBuilt to true.
5852
 * If not successful, return false.
5853 5854 5855 5856
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
5857
load_relcache_init_file(bool shared)
5858
{
5859 5860 5861 5862 5863 5864 5865
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
5866 5867
				nailed_indexes,
				magic;
5868
	int			i;
5869

5870 5871 5872 5873 5874 5875
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
5876 5877 5878 5879

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
5880

5881
	/*
5882 5883 5884
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
5885 5886 5887 5888 5889 5890
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;

5891 5892 5893 5894 5895 5896
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

Bruce Momjian's avatar
Bruce Momjian committed
5897
	for (relno = 0;; relno++)
5898
	{
5899 5900 5901 5902
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
5903
		bool		has_not_null;
5904

5905
		/* first read the relation descriptor length */
5906 5907
		nread = fread(&len, 1, sizeof(len), fp);
		if (nread != sizeof(len))
5908 5909 5910
		{
			if (nread == 0)
				break;			/* end of file */
5911
			goto read_failed;
5912
		}
5913

5914 5915
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
5916
			goto read_failed;
5917

5918 5919 5920 5921 5922 5923
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
5924

5925
		rel = rels[num_rels++] = (Relation) palloc(len);
5926

5927
		/* then, read the Relation structure */
5928
		if (fread(rel, 1, len, fp) != len)
5929
			goto read_failed;
5930 5931

		/* next read the relation tuple form */
5932
		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5933
			goto read_failed;
5934 5935

		relform = (Form_pg_class) palloc(len);
5936
		if (fread(relform, 1, len, fp) != len)
5937
			goto read_failed;
5938

5939
		rel->rd_rel = relform;
5940 5941

		/* initialize attribute tuple forms */
5942
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
5943 5944
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

5945 5946
		rel->rd_att->tdtypeid = relform->reltype ? relform->reltype : RECORDOID;
		rel->rd_att->tdtypmod = -1; /* just to be sure */
5947 5948

		/* next read all the attribute tuple form data entries */
5949
		has_not_null = false;
5950 5951
		for (i = 0; i < relform->relnatts; i++)
		{
5952 5953
			Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);

5954
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5955
				goto read_failed;
5956
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
5957
				goto read_failed;
5958
			if (fread(attr, 1, len, fp) != len)
5959
				goto read_failed;
5960

5961
			has_not_null |= attr->attnotnull;
5962 5963
		}

5964
		/* next read the access method specific field */
5965
		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5966 5967 5968 5969
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
5970
			if (fread(rel->rd_options, 1, len, fp) != len)
5971
				goto read_failed;
5972
			if (len != VARSIZE(rel->rd_options))
Tom Lane's avatar
Tom Lane committed
5973
				goto read_failed;	/* sanity check */
5974 5975 5976 5977 5978 5979
		}
		else
		{
			rel->rd_options = NULL;
		}

5980 5981 5982 5983 5984 5985 5986
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
5987 5988
		}

Alvaro Herrera's avatar
Alvaro Herrera committed
5989 5990 5991 5992
		/*
		 * If it's an index, there's more to do.  Note we explicitly ignore
		 * partitioned indexes here.
		 */
5993 5994 5995
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			MemoryContext indexcxt;
5996 5997
			Oid		   *opfamily;
			Oid		   *opcintype;
5998
			RegProcedure *support;
5999
			int			nsupport;
6000
			int16	   *indoption;
6001
			Oid		   *indcollation;
6002 6003 6004 6005 6006

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

6007
			/* next, read the pg_index tuple */
6008
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6009
				goto read_failed;
6010

6011
			rel->rd_indextuple = (HeapTuple) palloc(len);
6012
			if (fread(rel->rd_indextuple, 1, len, fp) != len)
6013
				goto read_failed;
6014

6015 6016 6017 6018
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

6019 6020 6021 6022
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
6023 6024 6025
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 "index info",
											 ALLOCSET_SMALL_SIZES);
6026
			rel->rd_indexcxt = indexcxt;
6027
			MemoryContextCopyAndSetIdentifier(indexcxt,
6028
											  RelationGetRelationName(rel));
6029

6030 6031 6032 6033 6034 6035 6036 6037
			/*
			 * Now we can fetch the index AM's API struct.  (We can't store
			 * that in the init file, since it contains function pointers that
			 * might vary across server executions.  Fortunately, it should be
			 * safe to call the amhandler even while bootstrapping indexes.)
			 */
			InitIndexAmRoutine(rel);

6038
			/* next, read the vector of opfamily OIDs */
6039
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6040 6041 6042
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
6043
			if (fread(opfamily, 1, len, fp) != len)
6044 6045 6046 6047 6048
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
6049
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6050 6051 6052
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
6053
			if (fread(opcintype, 1, len, fp) != len)
6054 6055 6056 6057
				goto read_failed;

			rel->rd_opcintype = opcintype;

6058
			/* next, read the vector of support procedure OIDs */
6059
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6060 6061
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
6062
			if (fread(support, 1, len, fp) != len)
6063 6064 6065 6066
				goto read_failed;

			rel->rd_support = support;

6067 6068 6069 6070 6071 6072 6073 6074 6075 6076
			/* next, read the vector of collation OIDs */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
			if (fread(indcollation, 1, len, fp) != len)
				goto read_failed;

			rel->rd_indcollation = indcollation;

6077
			/* finally, read the vector of indoption values */
6078
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6079 6080 6081
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
6082
			if (fread(indoption, 1, len, fp) != len)
6083 6084 6085 6086
				goto read_failed;

			rel->rd_indoption = indoption;

6087 6088 6089 6090 6091 6092 6093 6094 6095 6096 6097 6098 6099 6100 6101 6102 6103
			/* finally, read the vector of opcoptions values */
			rel->rd_opcoptions = (bytea **)
				MemoryContextAllocZero(indexcxt, sizeof(*rel->rd_opcoptions) * relform->relnatts);

			for (i = 0; i < relform->relnatts; i++)
			{
				if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
					goto read_failed;

				if (len > 0)
				{
					rel->rd_opcoptions[i] = (bytea *) MemoryContextAlloc(indexcxt, len);
					if (fread(rel->rd_opcoptions[i], 1, len, fp) != len)
						goto read_failed;
				}
			}

6104
			/* set up zeroed fmgr-info vector */
6105
			nsupport = relform->relnatts * rel->rd_indam->amsupport;
6106
			rel->rd_supportinfo = (FmgrInfo *)
6107
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
6108 6109 6110 6111 6112 6113 6114
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

6115 6116 6117 6118 6119 6120 6121
			/* Load table AM data */
			if (rel->rd_rel->relkind == RELKIND_RELATION ||
				rel->rd_rel->relkind == RELKIND_SEQUENCE ||
				rel->rd_rel->relkind == RELKIND_TOASTVALUE ||
				rel->rd_rel->relkind == RELKIND_MATVIEW)
				RelationInitTableAccessMethod(rel);

6122
			Assert(rel->rd_index == NULL);
6123
			Assert(rel->rd_indextuple == NULL);
6124
			Assert(rel->rd_indexcxt == NULL);
6125
			Assert(rel->rd_indam == NULL);
6126 6127
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
6128 6129
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
6130
			Assert(rel->rd_indoption == NULL);
6131
			Assert(rel->rd_indcollation == NULL);
6132
			Assert(rel->rd_opcoptions == NULL);
6133 6134 6135 6136
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
6137
		 * format is complex and subject to change).  They must be rebuilt if
6138
		 * needed by RelationCacheInitializePhase3.  This is not expected to
6139
		 * be a big performance hit since few system catalogs have such. Ditto
6140 6141
		 * for RLS policy data, partition info, index expressions, predicates,
		 * exclusion info, and FDW info.
6142 6143 6144 6145
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
6146
		rel->rd_rsdesc = NULL;
6147
		rel->rd_partkey = NULL;
6148
		rel->rd_partkeycxt = NULL;
6149
		rel->rd_partdesc = NULL;
6150 6151
		rel->rd_partdesc_nodetached = NULL;
		rel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
6152
		rel->rd_pdcxt = NULL;
6153
		rel->rd_pddcxt = NULL;
6154
		rel->rd_partcheck = NIL;
6155 6156
		rel->rd_partcheckvalid = false;
		rel->rd_partcheckcxt = NULL;
6157 6158
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
6159 6160 6161
		rel->rd_exclops = NULL;
		rel->rd_exclprocs = NULL;
		rel->rd_exclstrats = NULL;
6162
		rel->rd_fdwroutine = NULL;
6163 6164 6165 6166

		/*
		 * Reset transient-state fields in the relcache entry
		 */
6167
		rel->rd_smgr = NULL;
6168
		if (rel->rd_isnailed)
6169
			rel->rd_refcnt = 1;
6170
		else
6171
			rel->rd_refcnt = 0;
Tom Lane's avatar
Tom Lane committed
6172
		rel->rd_indexvalid = false;
6173
		rel->rd_indexlist = NIL;
Peter Eisentraut's avatar
Peter Eisentraut committed
6174
		rel->rd_pkindex = InvalidOid;
6175 6176 6177
		rel->rd_replidindex = InvalidOid;
		rel->rd_indexattr = NULL;
		rel->rd_keyattr = NULL;
Peter Eisentraut's avatar
Peter Eisentraut committed
6178
		rel->rd_pkattr = NULL;
6179
		rel->rd_idattr = NULL;
Peter Eisentraut's avatar
Peter Eisentraut committed
6180
		rel->rd_pubactions = NULL;
6181 6182
		rel->rd_statvalid = false;
		rel->rd_statlist = NIL;
Tom Lane's avatar
Tom Lane committed
6183 6184
		rel->rd_fkeyvalid = false;
		rel->rd_fkeylist = NIL;
6185
		rel->rd_createSubid = InvalidSubTransactionId;
6186
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
6187 6188
		rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
		rel->rd_droppedSubid = InvalidSubTransactionId;
6189
		rel->rd_amcache = NULL;
6190
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
6191

6192
		/*
6193
		 * Recompute lock and physical addressing info.  This is needed in
6194 6195
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
6196 6197
		 */
		RelationInitLockInfo(rel);
6198
		RelationInitPhysicalAddr(rel);
6199 6200 6201
	}

	/*
6202 6203 6204 6205
	 * We reached the end of the init file without apparent problem.  Did we
	 * get the right number of nailed items?  This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.  However, that should
	 * not happen in a normally-running system, so let's bleat if it does.
6206 6207 6208 6209 6210 6211
	 *
	 * For the shared init file, we're called before client authentication is
	 * done, which means that elog(WARNING) will go only to the postmaster
	 * log, where it's easily missed.  To ensure that developers notice bad
	 * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
	 * an Assert(false) there.
6212
	 */
6213 6214 6215 6216
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
6217 6218 6219 6220
		{
			elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
				 nailed_rels, nailed_indexes,
				 NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
6221 6222 6223
			/* Make sure we get developers' attention about this */
			Assert(false);
			/* In production builds, recover by bootstrapping the relcache */
6224
			goto read_failed;
6225
		}
6226 6227 6228 6229 6230
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
6231 6232 6233 6234
		{
			elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
				 nailed_rels, nailed_indexes,
				 NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
6235
			/* We don't need an Assert() in this case */
6236
			goto read_failed;
6237
		}
6238
	}
6239 6240 6241 6242 6243 6244 6245 6246

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
6247
		RelationCacheInsert(rels[relno], false);
6248
	}
6249

6250 6251 6252
	pfree(rels);
	FreeFile(fp);

6253 6254 6255 6256
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
6257
	return true;
6258

6259
	/*
Bruce Momjian's avatar
Bruce Momjian committed
6260
	 * init file is broken, so do it the hard way.  We don't bother trying to
6261 6262
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
6263
	 */
6264
read_failed:
6265 6266 6267 6268
	pfree(rels);
	FreeFile(fp);

	return false;
6269 6270
}

6271 6272
/*
 * Write out a new initialization file with the current contents
6273
 * of the relcache (either shared rels or local rels, as indicated).
6274
 */
6275
static void
6276
write_relcache_init_file(bool shared)
6277
{
6278
	FILE	   *fp;
6279 6280
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
6281
	int			magic;
6282
	HASH_SEQ_STATUS status;
6283
	RelIdCacheEnt *idhentry;
6284
	int			i;
6285

6286 6287 6288 6289 6290 6291 6292
	/*
	 * If we have already received any relcache inval events, there's no
	 * chance of succeeding so we may as well skip the whole thing.
	 */
	if (relcacheInvalsReceived != 0L)
		return;

6293
	/*
6294
	 * We must write a temporary file and rename it into place. Otherwise,
6295 6296
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
6297
	 */
6298 6299 6300 6301 6302 6303 6304 6305 6306 6307 6308 6309 6310 6311
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
6312

6313 6314 6315 6316
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
6317 6318 6319 6320 6321
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
6322 6323
		ereport(WARNING,
				(errcode_for_file_access(),
6324
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
6325
						tempfilename),
Tom Lane's avatar
Tom Lane committed
6326
				 errdetail("Continuing anyway, but there's something wrong.")));
6327 6328
		return;
	}
6329

6330
	/*
Bruce Momjian's avatar
Bruce Momjian committed
6331
	 * Write a magic number to serve as a file version identifier.  We can
6332 6333 6334 6335 6336 6337
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

6338
	/*
6339
	 * Write all the appropriate reldescs (in no particular order).
Hiroshi Inoue's avatar
Hiroshi Inoue committed
6340
	 */
6341
	hash_seq_init(&status, RelationIdCache);
6342

6343
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
6344
	{
6345
		Relation	rel = idhentry->reldesc;
6346
		Form_pg_class relform = rel->rd_rel;
6347

6348 6349 6350 6351
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

6352 6353 6354
		/*
		 * Ignore if not supposed to be in init file.  We can allow any shared
		 * relation that's been loaded so far to be in the shared init file,
6355 6356 6357 6358 6359 6360
		 * but unshared relations must be ones that should be in the local
		 * file per RelationIdIsInInitFile.  (Note: if you want to change the
		 * criterion for rels to be kept in the init file, see also inval.c.
		 * The reason for filtering here is to be sure that we don't put
		 * anything into the local init file for which a relcache inval would
		 * not cause invalidation of that init file.)
6361
		 */
6362 6363 6364 6365
		if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
		{
			/* Nailed rels had better get stored. */
			Assert(!rel->rd_isnailed);
6366
			continue;
6367
		}
6368

6369 6370
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
6371 6372

		/* next write the relation tuple form */
6373
		write_item(relform, CLASS_TUPLE_SIZE, fp);
6374 6375 6376 6377

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
6378 6379
			write_item(TupleDescAttr(rel->rd_att, i),
					   ATTRIBUTE_FIXED_PART_SIZE, fp);
6380 6381
		}

6382 6383
		/* next, do the access method specific field */
		write_item(rel->rd_options,
6384
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
6385
				   fp);
6386

Alvaro Herrera's avatar
Alvaro Herrera committed
6387 6388 6389 6390
		/*
		 * If it's an index, there's more to do. Note we explicitly ignore
		 * partitioned indexes here.
		 */
6391 6392
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
6393 6394
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
6395
			write_item(rel->rd_indextuple,
6396 6397
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
6398

6399 6400 6401 6402 6403 6404 6405 6406 6407 6408
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

6409
			/* next, write the vector of support procedure OIDs */
6410
			write_item(rel->rd_support,
6411
					   relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
6412
					   fp);
6413

6414 6415 6416 6417 6418
			/* next, write the vector of collation OIDs */
			write_item(rel->rd_indcollation,
					   relform->relnatts * sizeof(Oid),
					   fp);

6419 6420 6421 6422
			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
6423 6424 6425 6426 6427 6428 6429 6430 6431 6432

			Assert(rel->rd_opcoptions);

			/* finally, write the vector of opcoptions values */
			for (i = 0; i < relform->relnatts; i++)
			{
				bytea	   *opt = rel->rd_opcoptions[i];

				write_item(opt, opt ? VARSIZE(opt) : 0, fp);
			}
6433
		}
6434
	}
6435

6436 6437
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
6438

6439
	/*
6440
	 * Now we have to check whether the data we've so painstakingly
6441 6442 6443 6444 6445
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
6446
	 *
6447 6448
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
6449
	 */
6450 6451 6452 6453 6454 6455
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
6456 6457
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
6458 6459
	 */
	if (relcacheInvalsReceived == 0L)
6460 6461
	{
		/*
6462 6463
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
6464
		 *
6465 6466 6467 6468
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
6469
		 */
6470 6471
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
6472 6473 6474 6475
	}
	else
	{
		/* Delete the already-obsolete temp file */
6476 6477
		unlink(tempfilename);
	}
6478 6479

	LWLockRelease(RelCacheInitLock);
6480 6481
}

6482 6483 6484 6485 6486 6487 6488 6489 6490 6491
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

6492 6493
/*
 * Determine whether a given relation (identified by OID) is one of the ones
6494
 * we should store in a relcache init file.
6495 6496 6497
 *
 * We must cache all nailed rels, and for efficiency we should cache every rel
 * that supports a syscache.  The former set is almost but not quite a subset
6498 6499 6500
 * of the latter. The special cases are relations where
 * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
 * which do not support any syscache.
6501 6502 6503 6504
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
6505 6506 6507 6508
	if (relationId == SharedSecLabelRelationId ||
		relationId == TriggerRelidNameIndexId ||
		relationId == DatabaseNameIndexId ||
		relationId == SharedSecLabelObjectIndexId)
6509
	{
6510 6511 6512 6513
		/*
		 * If this Assert fails, we don't need the applicable special case
		 * anymore.
		 */
6514 6515 6516 6517 6518 6519
		Assert(!RelationSupportsSysCache(relationId));
		return true;
	}
	return RelationSupportsSysCache(relationId);
}

6520 6521 6522
/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
6523
 * local init file.
6524
 *
6525 6526 6527 6528 6529 6530 6531 6532 6533 6534 6535 6536
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
6537
 *
6538 6539 6540
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
6541 6542
 */
void
6543
RelationCacheInitFilePreInvalidate(void)
6544
{
6545 6546
	char		localinitfname[MAXPGPATH];
	char		sharedinitfname[MAXPGPATH];
6547

6548 6549 6550 6551 6552
	if (DatabasePath)
		snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
			 RELCACHE_INIT_FILENAME);
6553

6554 6555
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

6556 6557 6558 6559 6560 6561 6562 6563 6564
	/*
	 * The files might not be there if no backend has been started since the
	 * last removal.  But complain about failures other than ENOENT with
	 * ERROR.  Fortunately, it's not too late to abort the transaction if we
	 * can't get rid of the would-be-obsolete init file.
	 */
	if (DatabasePath)
		unlink_initfile(localinitfname, ERROR);
	unlink_initfile(sharedinitfname, ERROR);
6565
}
6566

6567 6568 6569 6570 6571 6572
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

6573
/*
6574
 * Remove the init files during postmaster startup.
6575
 *
6576
 * We used to keep the init files across restarts, but that is unsafe in PITR
6577
 * scenarios, and even in simple crash-recovery cases there are windows for
Bruce Momjian's avatar
Bruce Momjian committed
6578
 * the init files to become out-of-sync with the database.  So now we just
6579 6580
 * remove them during startup and expect the first backend launch to rebuild
 * them.  Of course, this has to happen in each database of the cluster.
6581 6582
 */
void
6583 6584 6585 6586 6587
RelationCacheInitFileRemove(void)
{
	const char *tblspcdir = "pg_tblspc";
	DIR		   *dir;
	struct dirent *de;
6588
	char		path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
6589 6590 6591

	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
6592
	unlink_initfile(path, LOG);
6593 6594 6595 6596 6597 6598 6599

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");

	/* Scan the tablespace link directory to find non-default tablespaces */
	dir = AllocateDir(tblspcdir);

6600
	while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
6601 6602 6603 6604
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Scan the tablespace dir for per-database dirs */
6605 6606
			snprintf(path, sizeof(path), "%s/%s/%s",
					 tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
6607 6608 6609 6610 6611 6612 6613 6614 6615 6616
			RelationCacheInitFileRemoveInDir(path);
		}
	}

	FreeDir(dir);
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
6617
{
6618 6619
	DIR		   *dir;
	struct dirent *de;
6620
	char		initfilename[MAXPGPATH * 2];
6621

6622 6623 6624
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);

6625
	while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
6626 6627 6628 6629 6630 6631
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
6632
			unlink_initfile(initfilename, LOG);
6633 6634 6635 6636 6637 6638 6639
		}
	}

	FreeDir(dir);
}

static void
6640
unlink_initfile(const char *initfilename, int elevel)
6641 6642 6643 6644 6645
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
6646 6647 6648 6649
			ereport(elevel,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
6650
	}
6651
}