analyze.c 19.7 KB
Newer Older
1 2 3 4 5
/*-------------------------------------------------------------------------
 *
 * analyze.c
 *	  the postgres optimizer analyzer
 *
6
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 8 9 10
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.16 2001/03/22 06:16:11 momjian Exp $
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15 16
#include "postgres.h"

17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
#include <sys/types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/indexing.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_type.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "parser/parse_oper.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/syscache.h"

#define swapLong(a,b)	{long tmp; tmp=a; a=b; b=tmp;}
#define swapInt(a,b)	{int tmp; tmp=a; a=b; b=tmp;}
#define swapDatum(a,b)	{Datum tmp; tmp=a; a=b; b=tmp;}
#define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL )
#define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \
								   stats->f_cmpgt.fn_addr != NULL && \
								   RegProcedureIsValid(stats->outfunc) )


static void attr_stats(Relation onerel, int attr_cnt, VacAttrStats *vacattrstats, HeapTuple tuple);
static void bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int *bucket_len);
static void update_attstats(Oid relid, int natts, VacAttrStats *vacattrstats);
static void del_stats(Oid relid, int attcnt, int *attnums);


/*
 *	analyze_rel() -- analyze relation
 */
void
analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
{
60
	HeapTuple	tuple;
61 62 63 64 65 66 67 68 69 70 71 72 73 74
	Relation	onerel;
	int32		i;
	int			attr_cnt,
			   *attnums = NULL;
	Form_pg_attribute *attr;
	VacAttrStats *vacattrstats;
	HeapScanDesc scan;

	StartTransactionCommand();

	/*
	 * Check for user-requested abort.	Note we want this to be inside a
	 * transaction, so xact.c doesn't issue useless NOTICE.
	 */
75
	CHECK_FOR_INTERRUPTS();
76 77 78 79 80

	/*
	 * Race condition -- if the pg_class tuple has gone away since the
	 * last time we saw it, we don't need to vacuum it.
	 */
81 82 83 84 85 86 87 88
	tuple = SearchSysCache(RELOID,
						   ObjectIdGetDatum(relid),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
	{
		CommitTransactionCommand();
		return;
	}
89

90
	/*
91 92
	 * We can VACUUM ANALYZE any table except pg_statistic. see
	 * update_relstats
93
	 */
94 95
	if (strcmp(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname),
			   StatisticRelationName) == 0)
96
	{
97
		ReleaseSysCache(tuple);
98 99 100
		CommitTransactionCommand();
		return;
	}
101
	ReleaseSysCache(tuple);
102 103 104

	onerel = heap_open(relid, AccessShareLock);

105
	if (!pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
106 107
					   RELNAME))
	{
108 109 110 111 112 113

		/*
		 * we already did an elog during vacuum elog(NOTICE, "Skipping
		 * \"%s\" --- only table owner can VACUUM it",
		 * RelationGetRelationName(onerel));
		 */
114
		heap_close(onerel, NoLock);
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
		CommitTransactionCommand();
		return;
	}

	elog(MESSAGE_LEVEL, "Analyzing...");

	attr_cnt = onerel->rd_att->natts;
	attr = onerel->rd_att->attrs;

	if (anal_cols2 != NIL)
	{
		int			tcnt = 0;
		List	   *le;

		if (length(anal_cols2) > attr_cnt)
			elog(ERROR, "vacuum: too many attributes specified for relation %s",
				 RelationGetRelationName(onerel));
		attnums = (int *) palloc(attr_cnt * sizeof(int));
		foreach(le, anal_cols2)
		{
			char	   *col = (char *) lfirst(le);

			for (i = 0; i < attr_cnt; i++)
			{
				if (namestrcmp(&(attr[i]->attname), col) == 0)
					break;
			}
142
			if (i < attr_cnt)	/* found */
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
				attnums[tcnt++] = i;
			else
			{
				elog(ERROR, "vacuum: there is no attribute %s in %s",
					 col, RelationGetRelationName(onerel));
			}
		}
		attr_cnt = tcnt;
	}

	vacattrstats = (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));

	for (i = 0; i < attr_cnt; i++)
	{
		Operator	func_operator;
		VacAttrStats *stats;

		stats = &vacattrstats[i];
		stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
162 163
		memcpy(stats->attr, attr[((attnums) ? attnums[i] : i)],
			   ATTRIBUTE_TUPLE_SIZE);
164 165 166 167 168 169 170 171
		stats->best = stats->guess1 = stats->guess2 = 0;
		stats->max = stats->min = 0;
		stats->best_len = stats->guess1_len = stats->guess2_len = 0;
		stats->max_len = stats->min_len = 0;
		stats->initialized = false;
		stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
		stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;

172 173 174 175
		func_operator = compatible_oper("=",
										stats->attr->atttypid,
										stats->attr->atttypid,
										true);
176 177
		if (func_operator != NULL)
		{
178
			fmgr_info(oprfuncid(func_operator), &(stats->f_cmpeq));
179
			ReleaseSysCache(func_operator);
180 181 182 183
		}
		else
			stats->f_cmpeq.fn_addr = NULL;

184 185 186 187
		func_operator = compatible_oper("<",
										stats->attr->atttypid,
										stats->attr->atttypid,
										true);
188 189
		if (func_operator != NULL)
		{
190
			fmgr_info(oprfuncid(func_operator), &(stats->f_cmplt));
191
			stats->op_cmplt = oprid(func_operator);
192
			ReleaseSysCache(func_operator);
193 194 195 196 197 198 199
		}
		else
		{
			stats->f_cmplt.fn_addr = NULL;
			stats->op_cmplt = InvalidOid;
		}

200 201 202 203
		func_operator = compatible_oper(">",
										stats->attr->atttypid,
										stats->attr->atttypid,
										true);
204 205
		if (func_operator != NULL)
		{
206
			fmgr_info(oprfuncid(func_operator), &(stats->f_cmpgt));
207
			ReleaseSysCache(func_operator);
208 209 210 211
		}
		else
			stats->f_cmpgt.fn_addr = NULL;

212 213 214 215
		tuple = SearchSysCache(TYPEOID,
							   ObjectIdGetDatum(stats->attr->atttypid),
							   0, 0, 0);
		if (HeapTupleIsValid(tuple))
216
		{
217 218 219
			stats->outfunc = ((Form_pg_type) GETSTRUCT(tuple))->typoutput;
			stats->typelem = ((Form_pg_type) GETSTRUCT(tuple))->typelem;
			ReleaseSysCache(tuple);
220 221 222 223 224 225 226 227 228 229
		}
		else
		{
			stats->outfunc = InvalidOid;
			stats->typelem = InvalidOid;
		}
	}
	/* delete existing pg_statistic rows for relation */
	del_stats(relid, ((attnums) ? attr_cnt : 0), attnums);

230
	/* scan relation to gather statistics */
231 232 233 234 235 236 237
	scan = heap_beginscan(onerel, false, SnapshotNow, 0, NULL);

	while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
		attr_stats(onerel, attr_cnt, vacattrstats, tuple);

	heap_endscan(scan);

238 239
	/* close rel, but keep lock so it doesn't go away before commit */
	heap_close(onerel, NoLock);
240 241 242 243 244 245 246 247

	/* update statistics in pg_class */
	update_attstats(relid, attr_cnt, vacattrstats);

	CommitTransactionCommand();
}

/*
248
 *	attr_stats() -- compute column statistics used by the planner
249 250 251 252 253 254
 *
 *	We compute the column min, max, null and non-null counts.
 *	Plus we attempt to find the count of the value that occurs most
 *	frequently in each column.	These figures are used to compute
 *	the selectivity of the column.
 *
255
 *	We use a three-bucket cache to get the most frequent item.
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
 *	The 'guess' buckets count hits.  A cache miss causes guess1
 *	to get the most hit 'guess' item in the most recent cycle, and
 *	the new item goes into guess2.	Whenever the total count of hits
 *	of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
 *
 *	This method works perfectly for columns with unique values, and columns
 *	with only two unique values, plus nulls.
 *
 *	It becomes less perfect as the number of unique values increases and
 *	their distribution in the table becomes more random.
 *
 */
static void
attr_stats(Relation onerel, int attr_cnt, VacAttrStats *vacattrstats, HeapTuple tuple)
{
	int			i;
	TupleDesc	tupDesc = onerel->rd_att;

	for (i = 0; i < attr_cnt; i++)
	{
		VacAttrStats *stats = &vacattrstats[i];
277
		Datum		origvalue;
278 279 280 281 282 283
		Datum		value;
		bool		isnull;
		bool		value_hit;

		if (!VacAttrStatsEqValid(stats))
			continue;
284 285 286 287 288

#ifdef	_DROP_COLUMN_HACK__
		if (COLUMN_IS_DROPPED(stats->attr))
			continue;
#endif	 /* _DROP_COLUMN_HACK__ */
289

290 291
		origvalue = heap_getattr(tuple, stats->attr->attnum,
								 tupDesc, &isnull);
292 293

		if (isnull)
294
		{
295
			stats->null_cnt++;
296 297 298
			continue;
		}
		stats->nonnull_cnt++;
299 300

		/*
301 302 303
		 * If the value is toasted, detoast it to avoid repeated
		 * detoastings and resultant memory leakage inside the comparison
		 * routines.
304 305 306 307 308 309
		 */
		if (!stats->attr->attbyval && stats->attr->attlen == -1)
			value = PointerGetDatum(PG_DETOAST_DATUM(origvalue));
		else
			value = origvalue;

310
		if (!stats->initialized)
311
		{
312 313 314 315 316 317
			bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
			/* best_cnt gets incremented below */
			bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
			stats->guess1_cnt = stats->guess1_hits = 1;
			bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
			stats->guess2_hits = 1;
318 319
			if (VacAttrStatsLtGtValid(stats))
			{
320 321 322
				bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
				bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
				/* min_cnt, max_cnt get incremented below */
323
			}
324 325
			stats->initialized = true;
		}
326

327 328 329 330
		if (VacAttrStatsLtGtValid(stats))
		{
			if (DatumGetBool(FunctionCall2(&stats->f_cmplt,
										   value, stats->min)))
331
			{
332 333
				bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
				stats->min_cnt = 1;
334
			}
335 336 337 338 339 340
			else if (DatumGetBool(FunctionCall2(&stats->f_cmpeq,
												value, stats->min)))
				stats->min_cnt++;

			if (DatumGetBool(FunctionCall2(&stats->f_cmpgt,
										   value, stats->max)))
341
			{
342 343
				bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
				stats->max_cnt = 1;
344
			}
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
			else if (DatumGetBool(FunctionCall2(&stats->f_cmpeq,
												value, stats->max)))
				stats->max_cnt++;
		}

		value_hit = true;
		if (DatumGetBool(FunctionCall2(&stats->f_cmpeq,
									   value, stats->best)))
			stats->best_cnt++;
		else if (DatumGetBool(FunctionCall2(&stats->f_cmpeq,
											value, stats->guess1)))
		{
			stats->guess1_cnt++;
			stats->guess1_hits++;
		}
		else if (DatumGetBool(FunctionCall2(&stats->f_cmpeq,
											value, stats->guess2)))
			stats->guess2_hits++;
		else
			value_hit = false;

		if (stats->guess2_hits > stats->guess1_hits)
		{
			swapDatum(stats->guess1, stats->guess2);
			swapInt(stats->guess1_len, stats->guess2_len);
			swapLong(stats->guess1_hits, stats->guess2_hits);
			stats->guess1_cnt = stats->guess1_hits;
		}
		if (stats->guess1_cnt > stats->best_cnt)
		{
			swapDatum(stats->best, stats->guess1);
			swapInt(stats->best_len, stats->guess1_len);
			swapLong(stats->best_cnt, stats->guess1_cnt);
			stats->guess1_hits = 1;
			stats->guess2_hits = 1;
		}
		if (!value_hit)
		{
			bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
			stats->guess1_hits = 1;
			stats->guess2_hits = 1;
386
		}
387 388 389 390

		/* Clean up detoasted copy, if any */
		if (value != origvalue)
			pfree(DatumGetPointer(value));
391 392 393 394
	}
}

/*
395
 *	bucketcpy() -- copy a new value into one of the statistics buckets
396 397 398 399
 */
static void
bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int *bucket_len)
{
400
	if (attr->attbyval)
401 402 403 404 405
		*bucket = value;
	else
	{
		int			len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));

406
		/* Avoid unnecessary palloc() traffic... */
407 408 409 410 411 412 413
		if (len > *bucket_len)
		{
			if (*bucket_len != 0)
				pfree(DatumGetPointer(*bucket));
			*bucket = PointerGetDatum(palloc(len));
			*bucket_len = len;
		}
414
		memcpy(DatumGetPointer(*bucket), DatumGetPointer(value), len);
415 416 417 418 419 420 421
	}
}


/*
 *	update_attstats() -- update attribute statistics for one relation
 *
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
 *		Statistics are stored in several places: the pg_class row for the
 *		relation has stats about the whole relation, the pg_attribute rows
 *		for each attribute store "dispersion", and there is a pg_statistic
 *		row for each (non-system) attribute.  (Dispersion probably ought to
 *		be moved to pg_statistic, but it's not worth doing unless there's
 *		another reason to have to change pg_attribute.)  The pg_class values
 *		are updated by VACUUM, not here.
 *
 *		We violate no-overwrite semantics here by storing new values for
 *		the dispersion column directly into the pg_attribute tuple that's
 *		already on the page.  The reason for this is that if we updated
 *		these tuples in the usual way, vacuuming pg_attribute itself
 *		wouldn't work very well --- by the time we got done with a vacuum
 *		cycle, most of the tuples in pg_attribute would've been obsoleted.
 *		Updating pg_attribute's own statistics would be especially tricky.
 *		Of course, this only works for fixed-size never-null columns, but
 *		dispersion is.
 *
440
 *		pg_statistic rows are just added normally.	This means that
441 442
 *		pg_statistic will probably contain some deleted rows at the
 *		completion of a vacuum cycle, unless it happens to get vacuumed last.
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
 *
 *		To keep things simple, we punt for pg_statistic, and don't try
 *		to compute or store rows for pg_statistic itself in pg_statistic.
 *		This could possibly be made to work, but it's not worth the trouble.
 */
static void
update_attstats(Oid relid, int natts, VacAttrStats *vacattrstats)
{
	Relation	ad,
				sd;
	HeapScanDesc scan;
	HeapTuple	atup,
				stup;
	ScanKeyData askey;
	Form_pg_attribute attp;

	ad = heap_openr(AttributeRelationName, RowExclusiveLock);
	sd = heap_openr(StatisticRelationName, RowExclusiveLock);

	/* Find pg_attribute rows for this relation */
	ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
						   F_INT4EQ, relid);

	scan = heap_beginscan(ad, false, SnapshotNow, 1, &askey);

	while (HeapTupleIsValid(atup = heap_getnext(scan, 0)))
	{
		int			i;
		VacAttrStats *stats;

		attp = (Form_pg_attribute) GETSTRUCT(atup);
474
		if (attp->attnum <= 0)	/* skip system attributes for now */
475 476 477 478 479 480 481 482
			continue;

		for (i = 0; i < natts; i++)
		{
			if (attp->attnum == vacattrstats[i].attr->attnum)
				break;
		}
		if (i >= natts)
483
			continue;			/* skip attr if no stats collected */
484 485 486 487
		stats = &(vacattrstats[i]);

		if (VacAttrStatsEqValid(stats))
		{
488 489
			float4		selratio;		/* average ratio of rows selected
										 * for a random constant */
490

491
			/* Compute dispersion */
492 493 494 495
			if (stats->nonnull_cnt == 0 && stats->null_cnt == 0)
			{

				/*
496
				 * empty relation, so put a dummy value in attdispersion
497 498 499 500 501
				 */
				selratio = 0;
			}
			else if (stats->null_cnt <= 1 && stats->best_cnt == 1)
			{
502

503
				/*
504 505
				 * looks like we have a unique-key attribute --- flag this
				 * with special -1.0 flag value.
506
				 *
507 508 509 510
				 * The correct dispersion is 1.0/numberOfRows, but since the
				 * relation row count can get updated without recomputing
				 * dispersion, we want to store a "symbolic" value and
				 * figure 1.0/numberOfRows on the fly.
511 512 513 514 515 516
				 */
				selratio = -1;
			}
			else
			{
				if (VacAttrStatsLtGtValid(stats) &&
517
					stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
518 519 520
				{

					/*
521
					 * exact result when there are just 1 or 2 values...
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
					 */
					double		min_cnt_d = stats->min_cnt,
								max_cnt_d = stats->max_cnt,
								null_cnt_d = stats->null_cnt;
					double		total = ((double) stats->nonnull_cnt) + null_cnt_d;

					selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) / (total * total);
				}
				else
				{
					double		most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
					double		total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);

					/*
					 * we assume count of other values are 20% of best
					 * count in table
					 */
					selratio = (most * most + 0.20 * most * (total - most)) / (total * total);
				}
				/* Make sure calculated values are in-range */
				if (selratio < 0.0)
					selratio = 0.0;
				else if (selratio > 1.0)
					selratio = 1.0;
			}

			/* overwrite the existing statistics in the tuple */
549
			attp->attdispersion = selratio;
550 551 552 553 554 555 556

			/* invalidate the tuple in the cache and write the buffer */
			RelationInvalidateHeapTuple(ad, atup);
			WriteNoReleaseBuffer(scan->rs_cbuf);

			/*
			 * Create pg_statistic tuples for the relation, if we have
557 558 559
			 * gathered the right data.  del_stats() previously deleted
			 * all the pg_statistic tuples for the rel, so we just have to
			 * insert new ones here.
560
			 *
561 562
			 * Note analyze_rel() has seen to it that we won't come here when
			 * vacuuming pg_statistic itself.
563 564 565
			 */
			if (VacAttrStatsLtGtValid(stats) && stats->initialized)
			{
566 567
				float4		nullratio;
				float4		bestratio;
568 569 570 571
				FmgrInfo	out_function;
				char	   *out_string;
				double		best_cnt_d = stats->best_cnt,
							null_cnt_d = stats->null_cnt,
572
							nonnull_cnt_d = stats->nonnull_cnt; /* prevent overflow */
573 574
				Datum		values[Natts_pg_statistic];
				char		nulls[Natts_pg_statistic];
575
				Relation	irelations[Num_pg_statistic_indices];
576 577 578 579 580 581 582 583 584

				nullratio = null_cnt_d / (nonnull_cnt_d + null_cnt_d);
				bestratio = best_cnt_d / (nonnull_cnt_d + null_cnt_d);

				fmgr_info(stats->outfunc, &out_function);

				for (i = 0; i < Natts_pg_statistic; ++i)
					nulls[i] = ' ';

585 586
				/*
				 * initialize values[]
587 588
				 */
				i = 0;
589 590 591 592 593
				values[i++] = ObjectIdGetDatum(relid);	/* starelid */
				values[i++] = Int16GetDatum(attp->attnum);		/* staattnum */
				values[i++] = ObjectIdGetDatum(stats->op_cmplt);		/* staop */
				values[i++] = Float4GetDatum(nullratio);		/* stanullfrac */
				values[i++] = Float4GetDatum(bestratio);		/* stacommonfrac */
594
				out_string = DatumGetCString(FunctionCall3(&out_function,
595 596 597 598 599
														   stats->best,
										ObjectIdGetDatum(stats->typelem),
								 Int32GetDatum(stats->attr->atttypmod)));
				values[i++] = DirectFunctionCall1(textin,		/* stacommonval */
											CStringGetDatum(out_string));
600
				pfree(out_string);
601
				out_string = DatumGetCString(FunctionCall3(&out_function,
602 603 604 605 606
														   stats->min,
										ObjectIdGetDatum(stats->typelem),
								 Int32GetDatum(stats->attr->atttypmod)));
				values[i++] = DirectFunctionCall1(textin,		/* staloval */
											CStringGetDatum(out_string));
607
				pfree(out_string);
608
				out_string = DatumGetCString(FunctionCall3(&out_function,
609 610 611 612 613
														   stats->max,
										ObjectIdGetDatum(stats->typelem),
								 Int32GetDatum(stats->attr->atttypmod)));
				values[i++] = DirectFunctionCall1(textin,		/* stahival */
											CStringGetDatum(out_string));
614 615 616 617
				pfree(out_string);

				stup = heap_formtuple(sd->rd_att, values, nulls);

618 619
				/* store tuple and update indexes too */
				heap_insert(sd, stup);
620

621 622 623
				CatalogOpenIndices(Num_pg_statistic_indices, Name_pg_statistic_indices, irelations);
				CatalogIndexInsert(irelations, Num_pg_statistic_indices, sd, stup);
				CatalogCloseIndices(Num_pg_statistic_indices, irelations);
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672

				/* release allocated space */
				pfree(DatumGetPointer(values[Anum_pg_statistic_stacommonval - 1]));
				pfree(DatumGetPointer(values[Anum_pg_statistic_staloval - 1]));
				pfree(DatumGetPointer(values[Anum_pg_statistic_stahival - 1]));
				heap_freetuple(stup);
			}
		}
	}
	heap_endscan(scan);
	/* close rels, but hold locks till upcoming commit */
	heap_close(ad, NoLock);
	heap_close(sd, NoLock);
}

/*
 *	del_stats() -- delete pg_statistic rows for a relation
 *
 *	If a list of attribute numbers is given, only zap stats for those attrs.
 */
static void
del_stats(Oid relid, int attcnt, int *attnums)
{
	Relation	pgstatistic;
	HeapScanDesc scan;
	HeapTuple	tuple;
	ScanKeyData key;

	pgstatistic = heap_openr(StatisticRelationName, RowExclusiveLock);

	ScanKeyEntryInitialize(&key, 0x0, Anum_pg_statistic_starelid,
						   F_OIDEQ, ObjectIdGetDatum(relid));
	scan = heap_beginscan(pgstatistic, false, SnapshotNow, 1, &key);

	while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
	{
		if (attcnt > 0)
		{
			Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(tuple);
			int			i;

			for (i = 0; i < attcnt; i++)
			{
				if (pgs->staattnum == attnums[i] + 1)
					break;
			}
			if (i >= attcnt)
				continue;		/* don't delete it */
		}
673
		simple_heap_delete(pgstatistic, &tuple->t_self);
674 675 676 677 678 679 680 681 682 683 684 685
	}

	heap_endscan(scan);

	/*
	 * Close rel, but *keep* lock; we will need to reacquire it later, so
	 * there's a possibility of deadlock against another VACUUM process if
	 * we let go now.  Keeping the lock shouldn't delay any common
	 * operation other than an attempted VACUUM of pg_statistic itself.
	 */
	heap_close(pgstatistic, NoLock);
}