bufmgr.c 51.1 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * bufmgr.c
4
 *	  buffer manager interface routines
5 6 7 8 9
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
10
 *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.63 1999/09/24 00:24:29 tgl Exp $
11 12 13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
/*
 *
 * BufferAlloc() -- lookup a buffer in the buffer table.  If
17 18 19 20
 *		it isn't there add it, but do not read it into memory.
 *		This is used when we are about to reinitialize the
 *		buffer so don't care what the current disk contents are.
 *		BufferAlloc() pins the new buffer in memory.
21 22
 *
 * ReadBuffer() -- same as BufferAlloc() but reads the data
23
 *		on a buffer cache miss.
24 25 26 27
 *
 * ReleaseBuffer() -- unpin the buffer
 *
 * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
28 29
 *		but don't unpin.  The disk IO is delayed until buffer
 *		replacement if WriteMode is BUFFER_LATE_WRITE.
30
 *
31
 * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
32 33 34 35
 *
 * FlushBuffer() -- as above but never delayed write.
 *
 * BufferSync() -- flush all dirty buffers in the buffer pool.
36
 *
37 38
 * InitBufferPool() -- Init the buffer module.
 *
39 40 41
 * See other files:
 *		freelist.c -- chooses victim for buffer replacement
 *		buf_table.c -- manages the buffer lookup table
42
 */
43
#include <sys/types.h>
44 45 46 47
#include <sys/file.h>
#include <math.h>
#include <signal.h>

Marc G. Fournier's avatar
Marc G. Fournier committed
48
#include "postgres.h"
Bruce Momjian's avatar
Bruce Momjian committed
49 50
#include "executor/execdebug.h"
#include "miscadmin.h"
51
#include "storage/s_lock.h"
52
#include "storage/smgr.h"
Bruce Momjian's avatar
Bruce Momjian committed
53
#include "utils/relcache.h"
54 55

extern SPINLOCK BufMgrLock;
56 57 58 59 60 61
extern long int ReadBufferCount;
extern long int ReadLocalBufferCount;
extern long int BufferHitCount;
extern long int LocalBufferHitCount;
extern long int BufferFlushCount;
extern long int LocalBufferFlushCount;
62

63 64 65 66 67 68 69 70 71
/*
 * It's used to avoid disk writes for read-only transactions
 * (i.e. when no one shared buffer was changed by transaction).
 * We set it to true in WriteBuffer/WriteNoReleaseBuffer when
 * marking shared buffer as dirty. We set it to false in xact.c
 * after transaction is committed/aborted.
 */
bool			SharedBufferChanged = false;

72
static int	WriteMode = BUFFER_LATE_WRITE;		/* Delayed write is
73 74
												 * default */

Bruce Momjian's avatar
Bruce Momjian committed
75
static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
76

77
#ifndef HAS_TEST_AND_SET
Bruce Momjian's avatar
Bruce Momjian committed
78
static void SignalIO(BufferDesc *buf);
79
extern long *NWaitIOBackendP;	/* defined in buf_init.c */
Bruce Momjian's avatar
Bruce Momjian committed
80

81
#endif	 /* HAS_TEST_AND_SET */
82

83
static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
84
						 bool bufferLockHeld);
85
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
86
			bool *foundPtr, bool bufferLockHeld);
87 88
static int	FlushBuffer(Buffer buffer, bool release);
static void BufferSync(void);
Bruce Momjian's avatar
Bruce Momjian committed
89
static int	BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld);
Bruce Momjian's avatar
Bruce Momjian committed
90
void		PrintBufferDescs(void);
91 92 93

/* ---------------------------------------------------
 * RelationGetBufferWithBuffer
94 95
 *		see if the given buffer is what we want
 *		if yes, we don't need to bother the buffer manager
96 97 98 99
 * ---------------------------------------------------
 */
Buffer
RelationGetBufferWithBuffer(Relation relation,
100 101
							BlockNumber blockNumber,
							Buffer buffer)
102
{
103
	BufferDesc *bufHdr;
104 105 106 107 108

	if (BufferIsValid(buffer))
	{
		if (!BufferIsLocal(buffer))
		{
109
			LockRelId  *lrelId = & relation->rd_lockInfo.lockRelId;
110

111 112 113
			bufHdr = &BufferDescriptors[buffer - 1];
			SpinAcquire(BufMgrLock);
			if (bufHdr->tag.blockNum == blockNumber &&
114 115
				bufHdr->tag.relId.relId == lrelId->relId &&
				bufHdr->tag.relId.dbId == lrelId->dbId)
116 117
			{
				SpinRelease(BufMgrLock);
118
				return buffer;
119
			}
120
			return ReadBufferWithBufferLock(relation, blockNumber, true);
121 122 123 124
		}
		else
		{
			bufHdr = &LocalBufferDescriptors[-buffer - 1];
125
			if (bufHdr->tag.relId.relId == RelationGetRelid(relation) &&
126
				bufHdr->tag.blockNum == blockNumber)
127
				return buffer;
128
		}
129
	}
130
	return ReadBuffer(relation, blockNumber);
131 132 133 134
}

/*
 * ReadBuffer -- returns a buffer containing the requested
135 136 137
 *		block of the requested relation.  If the blknum
 *		requested is P_NEW, extend the relation file and
 *		allocate a new block.
138 139
 *
 * Returns: the buffer number for the buffer containing
140
 *		the block read or NULL on an error.
141 142
 *
 * Assume when this function is called, that reln has been
143
 *		opened already.
144 145
 */

146 147
#undef ReadBuffer				/* conflicts with macro when BUFMGR_DEBUG
								 * defined */
148 149

/*
Bruce Momjian's avatar
Bruce Momjian committed
150
 * ReadBuffer
151 152 153 154 155
 *
 */
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
156
	return ReadBufferWithBufferLock(reln, blockNum, false);
157 158 159 160 161 162 163
}

/*
 * is_userbuffer
 *
 * XXX caller must have already acquired BufMgrLock
 */
164
#ifdef NOT_USED
165
static bool
166 167
is_userbuffer(Buffer buffer)
{
168
	BufferDesc *buf = &BufferDescriptors[buffer - 1];
169 170 171 172

	if (IsSystemRelationName(buf->sb_relname))
		return false;
	return true;
173
}
174

175
#endif
176

177
#ifdef NOT_USED
178 179
Buffer
ReadBuffer_Debug(char *file,
180 181 182
				 int line,
				 Relation reln,
				 BlockNumber blockNum)
183
{
184
	Buffer		buffer;
185 186 187 188

	buffer = ReadBufferWithBufferLock(reln, blockNum, false);
	if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
	{
189
		BufferDesc *buf = &BufferDescriptors[buffer - 1];
190 191

		fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \
192
refcount = %ld, file: %s, line: %d\n",
193 194 195 196
				buffer, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[buffer - 1], file, line);
	}
	return buffer;
197
}
198

199
#endif
200 201

/*
202 203 204 205 206
 * ReadBufferWithBufferLock -- does the work of
 *		ReadBuffer() but with the possibility that
 *		the buffer lock has already been held. this
 *		is yet another effort to reduce the number of
 *		semops in the system.
207
 */
208
static Buffer
209
ReadBufferWithBufferLock(Relation reln,
210 211
						 BlockNumber blockNum,
						 bool bufferLockHeld)
212
{
213 214 215 216 217
	BufferDesc *bufHdr;
	int			extend;			/* extending the file by one block */
	int			status;
	bool		found;
	bool		isLocalBuf;
218 219

	extend = (blockNum == P_NEW);
220
	isLocalBuf = reln->rd_myxactonly;
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242

	if (isLocalBuf)
	{
		ReadLocalBufferCount++;
		bufHdr = LocalBufferAlloc(reln, blockNum, &found);
		if (found)
			LocalBufferHitCount++;
	}
	else
	{
		ReadBufferCount++;

		/*
		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested
		 * block is not currently in memory.
		 */
		bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
		if (found)
			BufferHitCount++;
	}

	if (!bufHdr)
243
		return InvalidBuffer;
244 245 246 247 248 249 250 251 252 253 254 255 256

	/* if its already in the buffer pool, we're done */
	if (found)
	{

		/*
		 * This happens when a bogus buffer was returned previously and is
		 * floating around in the buffer pool.	A routine calling this
		 * would want this extended.
		 */
		if (extend)
		{
			/* new buffers are zero-filled */
Bruce Momjian's avatar
Bruce Momjian committed
257
			MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
Bruce Momjian's avatar
Bruce Momjian committed
258
			smgrextend(DEFAULT_SMGR, reln,
259 260
					   (char *) MAKE_PTR(bufHdr->data));
		}
261
		return BufferDescriptorGetBuffer(bufHdr);
262 263 264

	}

265
	/*
266 267
	 * if we have gotten to this point, the reln pointer must be ok and
	 * the relation file must be open.
268
	 */
269 270 271
	if (extend)
	{
		/* new buffers are zero-filled */
Bruce Momjian's avatar
Bruce Momjian committed
272
		MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
Bruce Momjian's avatar
Bruce Momjian committed
273
		status = smgrextend(DEFAULT_SMGR, reln,
274
							(char *) MAKE_PTR(bufHdr->data));
275
	}
276 277
	else
	{
Bruce Momjian's avatar
Bruce Momjian committed
278
		status = smgrread(DEFAULT_SMGR, reln, blockNum,
279 280 281 282
						  (char *) MAKE_PTR(bufHdr->data));
	}

	if (isLocalBuf)
283
		return BufferDescriptorGetBuffer(bufHdr);
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298

	/* lock buffer manager again to update IO IN PROGRESS */
	SpinAcquire(BufMgrLock);

	if (status == SM_FAIL)
	{
		/* IO Failed.  cleanup the data structures and go home */

		if (!BufTableDelete(bufHdr))
		{
			SpinRelease(BufMgrLock);
			elog(FATAL, "BufRead: buffer table broken after IO error\n");
		}
		/* remember that BufferAlloc() pinned the buffer */
		UnpinBuffer(bufHdr);
299

300 301 302 303 304 305
		/*
		 * Have to reset the flag so that anyone waiting for the buffer
		 * can tell that the contents are invalid.
		 */
		bufHdr->flags |= BM_IO_ERROR;
		bufHdr->flags &= ~BM_IO_IN_PROGRESS;
306
	}
307 308 309 310 311 312 313 314
	else
	{
		/* IO Succeeded.  clear the flags, finish buffer update */

		bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
	}

	/* If anyone was waiting for IO to complete, wake them up now */
315
#ifdef HAS_TEST_AND_SET
316
	S_UNLOCK(&(bufHdr->io_in_progress_lock));
317
#else
318 319
	if (bufHdr->refcount > 1)
		SignalIO(bufHdr);
320
#endif
321 322 323 324

	SpinRelease(BufMgrLock);

	if (status == SM_FAIL)
325
		return InvalidBuffer;
326

327
	return BufferDescriptorGetBuffer(bufHdr);
328 329 330 331
}

/*
 * BufferAlloc -- Get a buffer from the buffer pool but dont
332
 *		read it.
333 334 335 336 337 338 339
 *
 * Returns: descriptor for buffer
 *
 * When this routine returns, the BufMgrLock is guaranteed NOT be held.
 */
static BufferDesc *
BufferAlloc(Relation reln,
340
			BlockNumber blockNum,
341
			bool *foundPtr,
342
			bool bufferLockHeld)
343
{
344 345 346 347 348
	BufferDesc *buf,
			   *buf2;
	BufferTag	newTag;			/* identity of requested block */
	bool		inProgress;		/* buffer undergoing IO */
	bool		newblock = FALSE;
349 350 351 352 353 354

	/* create a new tag so we can lookup the buffer */
	/* assume that the relation is already open */
	if (blockNum == P_NEW)
	{
		newblock = TRUE;
Bruce Momjian's avatar
Bruce Momjian committed
355
		blockNum = smgrnblocks(DEFAULT_SMGR, reln);
356
	}
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401

	INIT_BUFFERTAG(&newTag, reln, blockNum);

	if (!bufferLockHeld)
		SpinAcquire(BufMgrLock);

	/* see if the block is in the buffer pool already */
	buf = BufTableLookup(&newTag);
	if (buf != NULL)
	{

		/*
		 * Found it.  Now, (a) pin the buffer so no one steals it from the
		 * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
		 * the buffer into the buffer pool.
		 */

		PinBuffer(buf);
		inProgress = (buf->flags & BM_IO_IN_PROGRESS);

		*foundPtr = TRUE;
		if (inProgress)
		{
			WaitIO(buf, BufMgrLock);
			if (buf->flags & BM_IO_ERROR)
			{

				/*
				 * wierd race condition:
				 *
				 * We were waiting for someone else to read the buffer. While
				 * we were waiting, the reader boof'd in some way, so the
				 * contents of the buffer are still invalid.  By saying
				 * that we didn't find it, we can make the caller
				 * reinitialize the buffer.  If two processes are waiting
				 * for this block, both will read the block.  The second
				 * one to finish may overwrite any updates made by the
				 * first.  (Assume higher level synchronization prevents
				 * this from happening).
				 *
				 * This is never going to happen, don't worry about it.
				 */
				*foundPtr = FALSE;
			}
		}
402
#ifdef BMTRACE
403
		_bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
404
#endif	 /* BMTRACE */
405 406 407

		SpinRelease(BufMgrLock);

408
		return buf;
409 410 411 412
	}

	*foundPtr = FALSE;

413
	/*
414 415 416
	 * Didn't find it in the buffer pool.  We'll have to initialize a new
	 * buffer.	First, grab one from the free list.  If it's dirty, flush
	 * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
417
	 */
418 419 420 421 422 423 424
	inProgress = FALSE;
	for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
	{

		/* GetFreeBuffer will abort if it can't find a free buffer */
		buf = GetFreeBuffer();

425
		/*
426
		 * But it can return buf == NULL if we are in aborting transaction
427
		 * now and so elog(ERROR,...) in GetFreeBuffer will not abort
428
		 * again.
429
		 */
430
		if (buf == NULL)
431
			return NULL;
432 433 434 435 436 437

		/*
		 * There should be exactly one pin on the buffer after it is
		 * allocated -- ours.  If it had a pin it wouldn't have been on
		 * the free list.  No one else could have pinned it between
		 * GetFreeBuffer and here because we have the BufMgrLock.
438
		 */
439 440 441 442 443
		Assert(buf->refcount == 0);
		buf->refcount = 1;
		PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;

		if (buf->flags & BM_DIRTY)
444
		{
445
			bool		smok;
446 447 448 449 450 451 452 453 454 455 456 457

			/*
			 * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
			 * with the contents of the buffer while we write it out. We
			 * don't really care if they try to read it, but if they can
			 * complete a BufferAlloc on it they can then scribble into
			 * it, and we'd really like to avoid that while we are
			 * flushing the buffer.  Setting this flag should block them
			 * in WaitIO until we're done.
			 */
			inProgress = TRUE;
			buf->flags |= BM_IO_IN_PROGRESS;
458
#ifdef HAS_TEST_AND_SET
Marc G. Fournier's avatar
Marc G. Fournier committed
459

460 461 462 463 464 465 466
			/*
			 * All code paths that acquire this lock pin the buffer first;
			 * since no one had it pinned (it just came off the free
			 * list), no one else can have this lock.
			 */
			Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
			S_LOCK(&(buf->io_in_progress_lock));
467
#endif	 /* HAS_TEST_AND_SET */
468 469 470 471 472 473 474 475 476 477 478 479

			/*
			 * Write the buffer out, being careful to release BufMgrLock
			 * before starting the I/O.
			 *
			 * This #ifndef is here because a few extra semops REALLY kill
			 * you on machines that don't have spinlocks.  If you don't
			 * operate with much concurrency, well...
			 */
			smok = BufferReplace(buf, true);
#ifndef OPTIMIZE_SINGLE
			SpinAcquire(BufMgrLock);
480
#endif	 /* OPTIMIZE_SINGLE */
481 482 483 484 485 486 487 488 489 490 491 492 493

			if (smok == FALSE)
			{
				elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
					 buf->tag.blockNum, buf->sb_dbname, buf->sb_relname);
				inProgress = FALSE;
				buf->flags |= BM_IO_ERROR;
				buf->flags &= ~BM_IO_IN_PROGRESS;
#ifdef HAS_TEST_AND_SET
				S_UNLOCK(&(buf->io_in_progress_lock));
#else							/* !HAS_TEST_AND_SET */
				if (buf->refcount > 1)
					SignalIO(buf);
494
#endif	 /* !HAS_TEST_AND_SET */
495
				PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
496
				Assert(buf->refcount > 0);
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
				buf->refcount--;
				if (buf->refcount == 0)
				{
					AddBufferToFreelist(buf);
					buf->flags |= BM_FREE;
				}
				buf = (BufferDesc *) NULL;
			}
			else
			{

				/*
				 * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
				 * be setted by anyone.		- vadim 01/17/97
				 */
				if (buf->flags & BM_JUST_DIRTIED)
				{
					elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing",
						 buf->tag.blockNum, buf->sb_relname);
				}
				else
					buf->flags &= ~BM_DIRTY;
			}

			/*
			 * Somebody could have pinned the buffer while we were doing
			 * the I/O and had given up the BufMgrLock (though they would
			 * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
			 * That's why this is a loop -- if so, we need to clear the
			 * I/O flags, remove our pin and start all over again.
			 *
			 * People may be making buffers free at any time, so there's no
			 * reason to think that we have an immediate disaster on our
			 * hands.
			 */
			if (buf && buf->refcount > 1)
			{
				inProgress = FALSE;
				buf->flags &= ~BM_IO_IN_PROGRESS;
#ifdef HAS_TEST_AND_SET
				S_UNLOCK(&(buf->io_in_progress_lock));
#else							/* !HAS_TEST_AND_SET */
				if (buf->refcount > 1)
					SignalIO(buf);
541
#endif	 /* !HAS_TEST_AND_SET */
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
				PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
				buf->refcount--;
				buf = (BufferDesc *) NULL;
			}

			/*
			 * Somebody could have allocated another buffer for the same
			 * block we are about to read in. (While we flush out the
			 * dirty buffer, we don't hold the lock and someone could have
			 * allocated another buffer for the same block. The problem is
			 * we haven't gotten around to insert the new tag into the
			 * buffer table. So we need to check here.		-ay 3/95
			 */
			buf2 = BufTableLookup(&newTag);
			if (buf2 != NULL)
			{

				/*
				 * Found it. Someone has already done what we're about to
				 * do. We'll just handle this as if it were found in the
				 * buffer pool in the first place.
				 */
				if (buf != NULL)
				{
#ifdef HAS_TEST_AND_SET
					S_UNLOCK(&(buf->io_in_progress_lock));
#else							/* !HAS_TEST_AND_SET */
					if (buf->refcount > 1)
						SignalIO(buf);
571
#endif	 /* !HAS_TEST_AND_SET */
572 573
					/* give up the buffer since we don't need it any more */
					PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
574 575 576 577 578 579 580
					Assert(buf->refcount > 0);
					buf->refcount--;
					if (buf->refcount == 0)
					{
						AddBufferToFreelist(buf);
						buf->flags |= BM_FREE;
					}
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
					buf->flags &= ~BM_IO_IN_PROGRESS;
				}

				PinBuffer(buf2);
				inProgress = (buf2->flags & BM_IO_IN_PROGRESS);

				*foundPtr = TRUE;
				if (inProgress)
				{
					WaitIO(buf2, BufMgrLock);
					if (buf2->flags & BM_IO_ERROR)
						*foundPtr = FALSE;
				}

				SpinRelease(BufMgrLock);

597
				return buf2;
598
			}
Marc G. Fournier's avatar
Marc G. Fournier committed
599
		}
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
	}

	/*
	 * At this point we should have the sole pin on a non-dirty buffer and
	 * we may or may not already have the BM_IO_IN_PROGRESS flag set.
	 */

	/*
	 * Change the name of the buffer in the lookup table:
	 *
	 * Need to update the lookup table before the read starts. If someone
	 * comes along looking for the buffer while we are reading it in, we
	 * don't want them to allocate a new buffer.  For the same reason, we
	 * didn't want to erase the buf table entry for the buffer we were
	 * writing back until now, either.
	 */

	if (!BufTableDelete(buf))
	{
619
		SpinRelease(BufMgrLock);
620 621
		elog(FATAL, "buffer wasn't in the buffer table\n");

622
	}
623 624 625

	/* record the database name and relation name for this buffer */
	strcpy(buf->sb_relname, reln->rd_rel->relname.data);
626
	strcpy(buf->sb_dbname, DatabaseName);
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643

	INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
	if (!BufTableInsert(buf))
	{
		SpinRelease(BufMgrLock);
		elog(FATAL, "Buffer in lookup table twice \n");
	}

	/*
	 * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
	 * so no one fiddles with them until the read completes.  If this
	 * routine has been called simply to allocate a buffer, no io will be
	 * attempted, so the flag isnt set.
	 */
	if (!inProgress)
	{
		buf->flags |= BM_IO_IN_PROGRESS;
644
#ifdef HAS_TEST_AND_SET
645 646
		Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
		S_LOCK(&(buf->io_in_progress_lock));
647
#endif	 /* HAS_TEST_AND_SET */
648 649
	}

650
#ifdef BMTRACE
651
	_bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
652
#endif	 /* BMTRACE */
653 654 655

	SpinRelease(BufMgrLock);

656
	return buf;
657 658 659
}

/*
660
 * WriteBuffer
661
 *
662 663
 *		Pushes buffer contents to disk if WriteMode is BUFFER_FLUSH_WRITE.
 *		Otherwise, marks contents as dirty.
664 665
 *
 * Assume that buffer is pinned.  Assume that reln is
666
 *		valid.
667 668
 *
 * Side Effects:
669
 *		Pin count is decremented.
670 671 672 673 674 675 676
 */

#undef WriteBuffer

int
WriteBuffer(Buffer buffer)
{
677
	BufferDesc *bufHdr;
678

679
	if (WriteMode == BUFFER_FLUSH_WRITE)
680
		return FlushBuffer(buffer, TRUE);
681 682
	else
	{
683

684 685
		if (BufferIsLocal(buffer))
			return WriteLocalBuffer(buffer, TRUE);
686

687
		if (BAD_BUFFER_ID(buffer))
688
			return FALSE;
689 690 691

		bufHdr = &BufferDescriptors[buffer - 1];

692 693
		SharedBufferChanged = true;

694 695 696 697 698 699 700
		SpinAcquire(BufMgrLock);
		Assert(bufHdr->refcount > 0);
		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
		UnpinBuffer(bufHdr);
		SpinRelease(BufMgrLock);
		CommitInfoNeedsSave[buffer - 1] = 0;
	}
701
	return TRUE;
702
}
703

704
#ifdef NOT_USED
705 706 707
void
WriteBuffer_Debug(char *file, int line, Buffer buffer)
{
708 709 710
	WriteBuffer(buffer);
	if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
	{
711
		BufferDesc *buf;
712 713 714

		buf = &BufferDescriptors[buffer - 1];
		fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \
715
refcount = %ld, file: %s, line: %d\n",
716 717 718
				buffer, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[buffer - 1], file, line);
	}
719
}
720

721
#endif
722 723

/*
724
 * DirtyBufferCopy() -- For a given dbid/relid/blockno, if the buffer is
725 726 727 728
 *						in the cache and is dirty, mark it clean and copy
 *						it to the requested location.  This is a logical
 *						write, and has been installed to support the cache
 *						management code for write-once storage managers.
729
 *
730 731
 *	DirtyBufferCopy() -- Copy a given dirty buffer to the requested
 *						 destination.
732
 *
733 734 735 736 737
 *		We treat this as a write.  If the requested buffer is in the pool
 *		and is dirty, we copy it to the location requested and mark it
 *		clean.	This routine supports the Sony jukebox storage manager,
 *		which agrees to take responsibility for the data once we mark
 *		it clean.
738
 *
739
 *	NOTE: used by sony jukebox code in postgres 4.2   - ay 2/95
740
 */
741
#ifdef NOT_USED
742 743 744
void
DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest)
{
745 746
	BufferDesc *buf;
	BufferTag	btag;
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770

	btag.relId.relId = relid;
	btag.relId.dbId = dbid;
	btag.blockNum = blkno;

	SpinAcquire(BufMgrLock);
	buf = BufTableLookup(&btag);

	if (buf == (BufferDesc *) NULL
		|| !(buf->flags & BM_DIRTY)
		|| !(buf->flags & BM_VALID))
	{
		SpinRelease(BufMgrLock);
		return;
	}

	/*
	 * hate to do this holding the lock, but release and reacquire is
	 * slower
	 */
	memmove(dest, (char *) MAKE_PTR(buf->data), BLCKSZ);

	buf->flags &= ~BM_DIRTY;

771 772
	SpinRelease(BufMgrLock);
}
773

774
#endif
775 776 777 778 779 780 781 782 783 784

/*
 * FlushBuffer -- like WriteBuffer, but force the page to disk.
 *
 * 'buffer' is known to be dirty/pinned, so there should not be a
 * problem reading the BufferDesc members without the BufMgrLock
 * (nobody should be able to change tags, flags, etc. out from under
 * us).
 */
static int
785
FlushBuffer(Buffer buffer, bool release)
786
{
787 788 789 790
	BufferDesc *bufHdr;
	Oid			bufdb;
	Relation	bufrel;
	int			status;
791 792

	if (BufferIsLocal(buffer))
793
		return FlushLocalBuffer(buffer, release) ? STATUS_OK : STATUS_ERROR;
794 795

	if (BAD_BUFFER_ID(buffer))
796
		return STATUS_ERROR;
797 798 799 800 801 802 803 804

	bufHdr = &BufferDescriptors[buffer - 1];
	bufdb = bufHdr->tag.relId.dbId;

	Assert(bufdb == MyDatabaseId || bufdb == (Oid) NULL);
	bufrel = RelationIdCacheGetRelation(bufHdr->tag.relId.relId);
	Assert(bufrel != (Relation) NULL);

805 806
	SharedBufferChanged = true;

807 808 809 810 811
	/* To check if block content changed while flushing. - vadim 01/17/97 */
	SpinAcquire(BufMgrLock);
	bufHdr->flags &= ~BM_JUST_DIRTIED;
	SpinRelease(BufMgrLock);

Bruce Momjian's avatar
Bruce Momjian committed
812
	status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
813
					   (char *) MAKE_PTR(bufHdr->data));
814

815
	/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
816
	RelationDecrementReferenceCount(bufrel);
817 818 819

	if (status == SM_FAIL)
	{
820
		elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s",
821
			 bufHdr->tag.blockNum, bufHdr->sb_relname);
822
		return STATUS_ERROR;
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
	}
	BufferFlushCount++;

	SpinAcquire(BufMgrLock);

	/*
	 * If this buffer was marked by someone as DIRTY while we were
	 * flushing it out we must not clear DIRTY flag - vadim 01/17/97
	 */
	if (bufHdr->flags & BM_JUST_DIRTIED)
	{
		elog(NOTICE, "FlusfBuffer: content of block %u (%s) changed while flushing",
			 bufHdr->tag.blockNum, bufHdr->sb_relname);
	}
	else
		bufHdr->flags &= ~BM_DIRTY;
	if (release)
		UnpinBuffer(bufHdr);
	SpinRelease(BufMgrLock);
	CommitInfoNeedsSave[buffer - 1] = 0;

844
	return STATUS_OK;
845 846 847 848
}

/*
 * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
849
 *						   when the operation is complete.
850
 *
851 852 853
 *		We know that the buffer is for a relation in our private cache,
 *		because this routine is called only to write out buffers that
 *		were changed by the executing backend.
854 855 856 857
 */
int
WriteNoReleaseBuffer(Buffer buffer)
{
858
	BufferDesc *bufHdr;
859

860
	if (WriteMode == BUFFER_FLUSH_WRITE)
861
		return FlushBuffer(buffer, FALSE);
862 863
	else
	{
864

865 866 867 868
		if (BufferIsLocal(buffer))
			return WriteLocalBuffer(buffer, FALSE);

		if (BAD_BUFFER_ID(buffer))
869
			return STATUS_ERROR;
870 871 872

		bufHdr = &BufferDescriptors[buffer - 1];

873 874
		SharedBufferChanged = true;

875 876 877 878 879
		SpinAcquire(BufMgrLock);
		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
		SpinRelease(BufMgrLock);
		CommitInfoNeedsSave[buffer - 1] = 0;
	}
880
	return STATUS_OK;
881 882 883 884 885 886
}


#undef ReleaseAndReadBuffer
/*
 * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
887
 *		so that only one semop needs to be called.
888 889 890 891
 *
 */
Buffer
ReleaseAndReadBuffer(Buffer buffer,
892 893
					 Relation relation,
					 BlockNumber blockNum)
894
{
895 896
	BufferDesc *bufHdr;
	Buffer		retbuf;
897 898 899 900 901 902 903 904 905 906 907 908 909

	if (BufferIsLocal(buffer))
	{
		Assert(LocalRefCount[-buffer - 1] > 0);
		LocalRefCount[-buffer - 1]--;
	}
	else
	{
		if (BufferIsValid(buffer))
		{
			bufHdr = &BufferDescriptors[buffer - 1];
			Assert(PrivateRefCount[buffer - 1] > 0);
			PrivateRefCount[buffer - 1]--;
910
			if (PrivateRefCount[buffer - 1] == 0)
911 912
			{
				SpinAcquire(BufMgrLock);
913
				Assert(bufHdr->refcount > 0);
914 915 916 917 918 919 920 921 922 923 924 925 926 927
				bufHdr->refcount--;
				if (bufHdr->refcount == 0)
				{
					AddBufferToFreelist(bufHdr);
					bufHdr->flags |= BM_FREE;
				}
				if (CommitInfoNeedsSave[buffer - 1])
				{
					bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
					CommitInfoNeedsSave[buffer - 1] = 0;
				}
				retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
				return retbuf;
			}
928
		}
929 930
	}

931
	return ReadBuffer(relation, blockNum);
932 933 934 935 936
}

/*
 * BufferSync -- Flush all dirty buffers in the pool.
 *
937 938 939 940 941 942 943 944 945 946
 *		This is called at transaction commit time.	It does the wrong thing,
 *		right now.	We should flush only our own changes to stable storage,
 *		and we should obey the lock protocol on the buffer manager metadata
 *		as we do it.  Also, we need to be sure that no other transaction is
 *		modifying the page as we flush it.	This is only a problem for objects
 *		that use a non-two-phase locking protocol, like btree indices.	For
 *		those objects, we would like to set a write lock for the duration of
 *		our IO.  Another possibility is to code updates to btree pages
 *		carefully, so that writing them out out of order cannot cause
 *		any unrecoverable errors.
947
 *
948 949
 *		I don't want to think hard about this right now, so I will try
 *		to come back to it later.
950 951 952
 */
static void
BufferSync()
953
{
954 955 956 957 958 959
	int			i;
	Oid			bufdb;
	Oid			bufrel;
	Relation	reln;
	BufferDesc *bufHdr;
	int			status;
960 961 962 963 964

	SpinAcquire(BufMgrLock);
	for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
	{
		if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
Marc G. Fournier's avatar
Marc G. Fournier committed
965
		{
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
			bufdb = bufHdr->tag.relId.dbId;
			bufrel = bufHdr->tag.relId.relId;
			if (bufdb == MyDatabaseId || bufdb == (Oid) 0)
			{
				reln = RelationIdCacheGetRelation(bufrel);

				/*
				 * We have to pin buffer to keep anyone from stealing it
				 * from the buffer pool while we are flushing it or
				 * waiting in WaitIO. It's bad for GetFreeBuffer in
				 * BufferAlloc, but there is no other way to prevent
				 * writing into disk block data from some other buffer,
				 * getting smgr status of some other block and clearing
				 * BM_DIRTY of ...			  - VAdim 09/16/96
				 */
				PinBuffer(bufHdr);
				if (bufHdr->flags & BM_IO_IN_PROGRESS)
				{
					WaitIO(bufHdr, BufMgrLock);
					UnpinBuffer(bufHdr);
					if (bufHdr->flags & BM_IO_ERROR)
					{
988
						elog(ERROR, "BufferSync: write error %u for %s",
989 990
							 bufHdr->tag.blockNum, bufHdr->sb_relname);
					}
991
					/* drop refcnt from RelationIdCacheGetRelation */
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
					if (reln != (Relation) NULL)
						RelationDecrementReferenceCount(reln);
					continue;
				}

				/*
				 * To check if block content changed while flushing (see
				 * below). - vadim 01/17/97
				 */
				bufHdr->flags &= ~BM_JUST_DIRTIED;

				/*
				 * If we didn't have the reldesc in our local cache, flush
				 * this page out using the 'blind write' storage manager
				 * routine.  If we did find it, use the standard
				 * interface.
				 */
Marc G. Fournier's avatar
Marc G. Fournier committed
1009

1010
#ifndef OPTIMIZE_SINGLE
1011
				SpinRelease(BufMgrLock);
1012
#endif	 /* OPTIMIZE_SINGLE */
1013 1014
				if (reln == (Relation) NULL)
				{
Bruce Momjian's avatar
Bruce Momjian committed
1015
					status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1016 1017 1018 1019 1020 1021
									   bufHdr->sb_relname, bufdb, bufrel,
										  bufHdr->tag.blockNum,
										(char *) MAKE_PTR(bufHdr->data));
				}
				else
				{
Bruce Momjian's avatar
Bruce Momjian committed
1022
					status = smgrwrite(DEFAULT_SMGR, reln,
1023 1024 1025
									   bufHdr->tag.blockNum,
									   (char *) MAKE_PTR(bufHdr->data));
				}
1026
#ifndef OPTIMIZE_SINGLE
1027
				SpinAcquire(BufMgrLock);
1028
#endif	 /* OPTIMIZE_SINGLE */
1029 1030 1031 1032 1033

				UnpinBuffer(bufHdr);
				if (status == SM_FAIL)
				{
					bufHdr->flags |= BM_IO_ERROR;
1034
					elog(ERROR, "BufferSync: cannot write %u for %s",
1035 1036 1037 1038 1039 1040 1041 1042 1043
						 bufHdr->tag.blockNum, bufHdr->sb_relname);
				}
				BufferFlushCount++;

				/*
				 * If this buffer was marked by someone as DIRTY while we
				 * were flushing it out we must not clear DIRTY flag -
				 * vadim 01/17/97
				 */
1044
				if (!(bufHdr->flags & BM_JUST_DIRTIED))
1045
					bufHdr->flags &= ~BM_DIRTY;
1046
				/* drop refcnt from RelationIdCacheGetRelation */
1047 1048 1049
				if (reln != (Relation) NULL)
					RelationDecrementReferenceCount(reln);
			}
1050
		}
1051
	}
1052
	SpinRelease(BufMgrLock);
1053

1054
	LocalBufferSync();
1055 1056 1057 1058 1059
}


/*
 * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf'
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
 *		is cleared.  Because IO_IN_PROGRESS conflicts are
 *		expected to be rare, there is only one BufferIO
 *		lock in the entire system.	All processes block
 *		on this semaphore when they try to use a buffer
 *		that someone else is faulting in.  Whenever a
 *		process finishes an IO and someone is waiting for
 *		the buffer, BufferIO is signaled (SignalIO).  All
 *		waiting processes then wake up and check to see
 *		if their buffer is now ready.  This implementation
 *		is simple, but efficient enough if WaitIO is
 *		rarely called by multiple processes simultaneously.
1071
 *
1072 1073
 *	ProcSleep atomically releases the spinlock and goes to
 *		sleep.
1074
 *
1075 1076 1077 1078
 *	Note: there is an easy fix if the queue becomes long.
 *		save the id of the buffer we are waiting for in
 *		the queue structure.  That way signal can figure
 *		out which proc to wake up.
1079 1080 1081
 */
#ifdef HAS_TEST_AND_SET
static void
Bruce Momjian's avatar
Bruce Momjian committed
1082
WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1083
{
1084 1085 1086 1087
	SpinRelease(spinlock);
	S_LOCK(&(buf->io_in_progress_lock));
	S_UNLOCK(&(buf->io_in_progress_lock));
	SpinAcquire(spinlock);
1088 1089
}

1090
#else							/* HAS_TEST_AND_SET */
1091
IpcSemaphoreId WaitIOSemId;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1092
IpcSemaphoreId WaitCLSemId;
1093 1094

static void
Bruce Momjian's avatar
Bruce Momjian committed
1095
WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1096
{
1097
	bool		inProgress;
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110

	for (;;)
	{

		/* wait until someone releases IO lock */
		(*NWaitIOBackendP)++;
		SpinRelease(spinlock);
		IpcSemaphoreLock(WaitIOSemId, 0, 1);
		SpinAcquire(spinlock);
		inProgress = (buf->flags & BM_IO_IN_PROGRESS);
		if (!inProgress)
			break;
	}
1111 1112 1113
}

/*
Bruce Momjian's avatar
Bruce Momjian committed
1114
 * SignalIO
1115 1116
 */
static void
Bruce Momjian's avatar
Bruce Momjian committed
1117
SignalIO(BufferDesc *buf)
1118
{
1119 1120 1121 1122
	/* somebody better be waiting. */
	Assert(buf->refcount > 1);
	IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
	*NWaitIOBackendP = 0;
1123 1124
}

1125
#endif	 /* HAS_TEST_AND_SET */
1126

1127
long		NDirectFileRead;	/* some I/O's are direct file access.
1128
								 * bypass bufmgr */
1129
long		NDirectFileWrite;	/* e.g., I/O in psort and hashjoin.					*/
1130 1131

void
1132
PrintBufferUsage(FILE *statfp)
1133
{
1134 1135
	float		hitrate;
	float		localhitrate;
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152

	if (ReadBufferCount == 0)
		hitrate = 0.0;
	else
		hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;

	if (ReadLocalBufferCount == 0)
		localhitrate = 0.0;
	else
		localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;

	fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
			ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
	fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
			ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
	fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
			NDirectFileRead, NDirectFileWrite);
1153 1154 1155 1156 1157
}

void
ResetBufferUsage()
{
1158 1159 1160 1161 1162 1163 1164 1165
	BufferHitCount = 0;
	ReadBufferCount = 0;
	BufferFlushCount = 0;
	LocalBufferHitCount = 0;
	ReadLocalBufferCount = 0;
	LocalBufferFlushCount = 0;
	NDirectFileRead = 0;
	NDirectFileWrite = 0;
1166 1167 1168
}

/* ----------------------------------------------
1169
 *		ResetBufferPool
1170
 *
1171
 *		this routine is supposed to be called when a transaction aborts.
1172
 *		it will release all the buffer pins held by the transaction.
1173 1174 1175 1176 1177 1178
 *
 * ----------------------------------------------
 */
void
ResetBufferPool()
{
1179
	int			i;
1180

1181
	for (i = 0; i < NBuffers; i++)
1182
	{
1183
		if (PrivateRefCount[i] != 0)
1184
		{
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
			BufferDesc *buf = &BufferDescriptors[i];

			SpinAcquire(BufMgrLock);
			Assert(buf->refcount > 0);
			buf->refcount--;
			if (buf->refcount == 0)
			{
				AddBufferToFreelist(buf);
				buf->flags |= BM_FREE;
			}
			SpinRelease(BufMgrLock);
1196
		}
1197 1198
		PrivateRefCount[i] = 0;
		CommitInfoNeedsSave[i] = 0;
1199 1200
	}

1201
	ResetLocalBufferPool();
1202 1203 1204
}

/* -----------------------------------------------
1205
 *		BufferPoolCheckLeak
1206
 *
1207
 *		check if there is buffer leak
1208 1209 1210 1211 1212 1213
 *
 * -----------------------------------------------
 */
int
BufferPoolCheckLeak()
{
1214
	int			i;
1215
	int			result = 0;
1216 1217 1218

	for (i = 1; i <= NBuffers; i++)
	{
1219
		if (PrivateRefCount[i - 1] != 0)
1220
		{
1221 1222
			BufferDesc *buf = &(BufferDescriptors[i - 1]);

1223
			elog(NOTICE,
Bruce Momjian's avatar
Bruce Momjian committed
1224
				 "Buffer Leak: [%03d] (freeNext=%d, freePrev=%d, \
1225 1226 1227 1228 1229
relname=%s, blockNum=%d, flags=0x%x, refcount=%d %d)",
				 i - 1, buf->freeNext, buf->freePrev,
				 buf->sb_relname, buf->tag.blockNum, buf->flags,
				 buf->refcount, PrivateRefCount[i - 1]);
			result = 1;
1230
		}
1231
	}
1232
	return result;
1233 1234 1235
}

/* ------------------------------------------------
1236
 *		FlushBufferPool
1237
 *
1238
 *		flush all dirty blocks in buffer pool to disk
1239 1240 1241 1242 1243 1244
 *
 * ------------------------------------------------
 */
void
FlushBufferPool(int StableMainMemoryFlag)
{
1245 1246 1247 1248 1249
	if (!StableMainMemoryFlag)
	{
		BufferSync();
		smgrcommit();
	}
1250 1251 1252
}

/*
Bruce Momjian's avatar
Bruce Momjian committed
1253
 * BufferGetBlockNumber
1254
 *		Returns the block number associated with a buffer.
1255 1256
 *
 * Note:
1257
 *		Assumes that the buffer is valid.
1258 1259 1260 1261
 */
BlockNumber
BufferGetBlockNumber(Buffer buffer)
{
1262
	Assert(BufferIsValid(buffer));
1263

1264 1265
	/* XXX should be a critical section */
	if (BufferIsLocal(buffer))
1266
		return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
1267
	else
1268
		return BufferDescriptors[buffer - 1].tag.blockNum;
1269 1270
}

1271
#ifdef NOT_USED
1272
/*
Bruce Momjian's avatar
Bruce Momjian committed
1273
 * BufferGetRelation
1274
 *		Returns the relation desciptor associated with a buffer.
1275 1276
 *
 * Note:
1277
 *		Assumes buffer is valid.
1278 1279 1280 1281
 */
Relation
BufferGetRelation(Buffer buffer)
{
1282 1283
	Relation	relation;
	Oid			relid;
1284 1285 1286 1287 1288

	Assert(BufferIsValid(buffer));
	Assert(!BufferIsLocal(buffer));		/* not supported for local buffers */

	/* XXX should be a critical section */
1289
	relid = BufferDescriptors[buffer - 1].tag.relId.relId;
1290
	relation = RelationIdGetRelation(relid);
1291
	Assert(relation);
1292

1293
	/* drop relcache refcnt incremented by RelationIdGetRelation */
1294 1295 1296 1297 1298 1299
	RelationDecrementReferenceCount(relation);

	if (RelationHasReferenceCountZero(relation))
	{

		/*
1300
		 * XXX why??
1301 1302 1303 1304 1305
		 */

		RelationIncrementReferenceCount(relation);
	}

1306
	return relation;
1307
}
Bruce Momjian's avatar
Bruce Momjian committed
1308

1309
#endif
1310 1311 1312 1313 1314 1315 1316 1317

/*
 * BufferReplace
 *
 * Flush the buffer corresponding to 'bufHdr'
 *
 */
static int
Bruce Momjian's avatar
Bruce Momjian committed
1318
BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
1319
{
1320 1321 1322 1323
	Relation	reln;
	Oid			bufdb,
				bufrel;
	int			status;
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347

	if (!bufferLockHeld)
		SpinAcquire(BufMgrLock);

	/*
	 * first try to find the reldesc in the cache, if no luck, don't
	 * bother to build the reldesc from scratch, just do a blind write.
	 */

	bufdb = bufHdr->tag.relId.dbId;
	bufrel = bufHdr->tag.relId.relId;

	if (bufdb == MyDatabaseId || bufdb == (Oid) NULL)
		reln = RelationIdCacheGetRelation(bufrel);
	else
		reln = (Relation) NULL;

	/* To check if block content changed while flushing. - vadim 01/17/97 */
	bufHdr->flags &= ~BM_JUST_DIRTIED;

	SpinRelease(BufMgrLock);

	if (reln != (Relation) NULL)
	{
Bruce Momjian's avatar
Bruce Momjian committed
1348
		status = smgrflush(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
1349 1350 1351 1352 1353
						   (char *) MAKE_PTR(bufHdr->data));
	}
	else
	{
		/* blind write always flushes */
Bruce Momjian's avatar
Bruce Momjian committed
1354
		status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1355 1356 1357 1358
							  bufHdr->sb_relname, bufdb, bufrel,
							  bufHdr->tag.blockNum,
							  (char *) MAKE_PTR(bufHdr->data));
	}
1359

1360
	/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
1361 1362
	if (reln != (Relation) NULL)
		RelationDecrementReferenceCount(reln);
1363 1364

	if (status == SM_FAIL)
1365
		return FALSE;
1366 1367 1368

	BufferFlushCount++;

1369
	return TRUE;
1370 1371 1372
}

/*
Bruce Momjian's avatar
Bruce Momjian committed
1373
 * RelationGetNumberOfBlocks
1374
 *		Returns the buffer descriptor associated with a page in a relation.
1375 1376
 *
 * Note:
1377 1378 1379
 *		XXX may fail for huge relations.
 *		XXX should be elsewhere.
 *		XXX maybe should be hidden
1380 1381 1382 1383
 */
BlockNumber
RelationGetNumberOfBlocks(Relation relation)
{
1384
	return ((relation->rd_myxactonly) ? relation->rd_nblocks :
Bruce Momjian's avatar
Bruce Momjian committed
1385
			smgrnblocks(DEFAULT_SMGR, relation));
1386 1387 1388
}

/* ---------------------------------------------------------------------
1389
 *		ReleaseRelationBuffers
1390
 *
1391 1392 1393 1394 1395
 *		this function unmarks all the dirty pages of a relation
 *		in the buffer pool so that at the end of transaction
 *		these pages will not be flushed.
 *		XXX currently it sequentially searches the buffer pool, should be
 *		changed to more clever ways of searching.
1396 1397 1398
 * --------------------------------------------------------------------
 */
void
1399
ReleaseRelationBuffers(Relation rel)
1400
{
1401
	int			i;
1402 1403
	int			holding = 0;
	BufferDesc *buf;
1404

1405
	if (rel->rd_myxactonly)
1406 1407 1408 1409 1410
	{
		for (i = 0; i < NLocBuffer; i++)
		{
			buf = &LocalBufferDescriptors[i];
			if ((buf->flags & BM_DIRTY) &&
1411
				(buf->tag.relId.relId == RelationGetRelid(rel)))
1412 1413 1414
				buf->flags &= ~BM_DIRTY;
		}
		return;
1415
	}
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426

	for (i = 1; i <= NBuffers; i++)
	{
		buf = &BufferDescriptors[i - 1];
		if (!holding)
		{
			SpinAcquire(BufMgrLock);
			holding = 1;
		}
		if ((buf->flags & BM_DIRTY) &&
			(buf->tag.relId.dbId == MyDatabaseId) &&
1427
			(buf->tag.relId.relId == RelationGetRelid(rel)))
1428 1429 1430 1431 1432 1433 1434 1435 1436
		{
			buf->flags &= ~BM_DIRTY;
			if (!(buf->flags & BM_FREE))
			{
				SpinRelease(BufMgrLock);
				holding = 0;
				ReleaseBuffer(i);
			}
		}
1437
	}
1438 1439
	if (holding)
		SpinRelease(BufMgrLock);
1440 1441 1442
}

/* ---------------------------------------------------------------------
1443
 *		DropBuffers
1444
 *
1445 1446 1447 1448
 *		This function marks all the buffers in the buffer cache for a
 *		particular database as clean.  This is used when we destroy a
 *		database, to avoid trying to flush data to disk when the directory
 *		tree no longer exists.
1449
 *
1450
 *		This is an exceedingly non-public interface.
1451 1452 1453 1454 1455
 * --------------------------------------------------------------------
 */
void
DropBuffers(Oid dbid)
{
1456
	int			i;
1457
	BufferDesc *buf;
1458 1459 1460 1461 1462 1463 1464 1465 1466

	SpinAcquire(BufMgrLock);
	for (i = 1; i <= NBuffers; i++)
	{
		buf = &BufferDescriptors[i - 1];
		if ((buf->tag.relId.dbId == dbid) && (buf->flags & BM_DIRTY))
			buf->flags &= ~BM_DIRTY;
	}
	SpinRelease(BufMgrLock);
1467 1468 1469
}

/* -----------------------------------------------------------------
1470
 *		PrintBufferDescs
1471
 *
1472 1473
 *		this function prints all the buffer descriptors, for debugging
 *		use only.
1474 1475
 * -----------------------------------------------------------------
 */
1476
void
1477 1478
PrintBufferDescs()
{
1479 1480
	int			i;
	BufferDesc *buf = BufferDescriptors;
1481

1482 1483 1484 1485 1486
	if (IsUnderPostmaster)
	{
		SpinAcquire(BufMgrLock);
		for (i = 0; i < NBuffers; ++i, ++buf)
		{
1487
			elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1488
blockNum=%d, flags=0x%x, refcount=%d %d)",
1489 1490 1491 1492 1493
				 i, buf->freeNext, buf->freePrev,
				 buf->sb_relname, buf->tag.blockNum, buf->flags,
				 buf->refcount, PrivateRefCount[i]);
		}
		SpinRelease(BufMgrLock);
1494
	}
1495 1496 1497 1498 1499 1500 1501 1502 1503
	else
	{
		/* interactive backend */
		for (i = 0; i < NBuffers; ++i, ++buf)
		{
			printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
				   i, buf->sb_relname, buf->tag.blockNum,
				   buf->flags, buf->refcount, PrivateRefCount[i]);
		}
1504 1505 1506 1507 1508 1509
	}
}

void
PrintPinnedBufs()
{
1510 1511
	int			i;
	BufferDesc *buf = BufferDescriptors;
1512 1513 1514 1515 1516 1517

	SpinAcquire(BufMgrLock);
	for (i = 0; i < NBuffers; ++i, ++buf)
	{
		if (PrivateRefCount[i] > 0)
			elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1518
blockNum=%d, flags=0x%x, refcount=%d %d)\n",
1519 1520 1521 1522 1523
				 i, buf->freeNext, buf->freePrev, buf->sb_relname,
				 buf->tag.blockNum, buf->flags,
				 buf->refcount, PrivateRefCount[i]);
	}
	SpinRelease(BufMgrLock);
1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
}

/*
 * BufferPoolBlowaway
 *
 * this routine is solely for the purpose of experiments -- sometimes
 * you may want to blowaway whatever is left from the past in buffer
 * pool and start measuring some performance with a clean empty buffer
 * pool.
 */
1534
#ifdef NOT_USED
1535 1536 1537
void
BufferPoolBlowaway()
{
1538
	int			i;
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549

	BufferSync();
	for (i = 1; i <= NBuffers; i++)
	{
		if (BufferIsValid(i))
		{
			while (BufferIsValid(i))
				ReleaseBuffer(i);
		}
		BufTableDelete(&BufferDescriptors[i - 1]);
	}
1550
}
1551

1552
#endif
1553

1554
/* ---------------------------------------------------------------------
1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565
 *		FlushRelationBuffers
 *
 *		This function removes from the buffer pool all pages of a relation
 *		that have blocknumber >= specified block.  If doFlush is true,
 *		dirty buffers are written out --- otherwise it's an error for any
 *		of the buffers to be dirty.
 *
 *		This is used by VACUUM before truncating the relation to the given
 *		number of blocks.  For VACUUM, we pass doFlush = false since it would
 *		mean a bug in VACUUM if any of the unwanted pages were still dirty.
 *		(TRUNCATE TABLE also uses it in the same way.)
1566
 *
1567 1568 1569 1570 1571 1572 1573 1574 1575
 *		This is also used by RENAME TABLE (with block = 0 and doFlush = true)
 *		to clear out the buffer cache before renaming the physical files of
 *		a relation.  Without that, some other backend might try to do a
 *		blind write of a buffer page (relying on the sb_relname of the buffer)
 *		and fail because it's not got the right filename anymore.
 *
 *		In both cases, the caller should be holding AccessExclusiveLock on
 *		the target relation to ensure that no other backend is busy reading
 *		more blocks of the relation...
1576
 *
1577
 *		Returns: 0 - Ok, -1 - DIRTY, -2 - PINNED
1578
 *
1579 1580 1581 1582 1583
 *		XXX currently it sequentially searches the buffer pool, should be
 *		changed to more clever ways of searching.
 * --------------------------------------------------------------------
 */
int
1584
FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
1585
{
1586 1587
	int			i;
	BufferDesc *buf;
1588

1589
	if (rel->rd_myxactonly)
1590 1591 1592 1593
	{
		for (i = 0; i < NLocBuffer; i++)
		{
			buf = &LocalBufferDescriptors[i];
1594
			if (buf->tag.relId.relId == RelationGetRelid(rel) &&
1595 1596 1597
				buf->tag.blockNum >= block)
			{
				if (buf->flags & BM_DIRTY)
1598
				{
1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613
					if (doFlush)
					{
						if (FlushBuffer(-i-1, false) != STATUS_OK)
						{
							elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
								 rel->rd_rel->relname.data, block, buf->tag.blockNum);
							return -1;
						}
					}
					else
					{
						elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty",
							 rel->rd_rel->relname.data, block, buf->tag.blockNum);
						return -1;
					}
1614
				}
1615
				if (LocalRefCount[i] > 0)
1616
				{
1617
					elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is referenced (%d)",
1618
						 rel->rd_rel->relname.data, block,
1619
						 buf->tag.blockNum, LocalRefCount[i]);
1620
					return -2;
1621
				}
1622 1623 1624
				buf->tag.relId.relId = InvalidOid;
			}
		}
1625
		return 0;
1626 1627 1628 1629 1630 1631 1632
	}

	SpinAcquire(BufMgrLock);
	for (i = 0; i < NBuffers; i++)
	{
		buf = &BufferDescriptors[i];
		if (buf->tag.relId.dbId == MyDatabaseId &&
1633
			buf->tag.relId.relId == RelationGetRelid(rel) &&
1634 1635 1636 1637
			buf->tag.blockNum >= block)
		{
			if (buf->flags & BM_DIRTY)
			{
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
				if (doFlush)
				{
					SpinRelease(BufMgrLock);
					if (FlushBuffer(i+1, false) != STATUS_OK)
					{
						elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %d, global %d), could not flush it",
							 buf->sb_relname, block, buf->tag.blockNum,
							 PrivateRefCount[i], buf->refcount);
						return -1;
					}
					SpinAcquire(BufMgrLock);
				}
				else
				{
					SpinRelease(BufMgrLock);
					elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %d, global %d)",
						 buf->sb_relname, block, buf->tag.blockNum,
						 PrivateRefCount[i], buf->refcount);
					return -1;
				}
1658 1659 1660 1661
			}
			if (!(buf->flags & BM_FREE))
			{
				SpinRelease(BufMgrLock);
1662 1663 1664
				elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %d, global %d)",
					 buf->sb_relname, block, buf->tag.blockNum,
					 PrivateRefCount[i], buf->refcount);
1665
				return -2;
1666 1667 1668 1669 1670
			}
			BufTableDelete(buf);
		}
	}
	SpinRelease(BufMgrLock);
1671
	return 0;
1672 1673
}

1674 1675 1676 1677
#undef ReleaseBuffer

/*
 * ReleaseBuffer -- remove the pin on a buffer without
1678
 *		marking it dirty.
1679 1680 1681 1682 1683
 *
 */
int
ReleaseBuffer(Buffer buffer)
{
1684
	BufferDesc *bufHdr;
1685 1686 1687 1688 1689

	if (BufferIsLocal(buffer))
	{
		Assert(LocalRefCount[-buffer - 1] > 0);
		LocalRefCount[-buffer - 1]--;
1690
		return STATUS_OK;
1691
	}
1692 1693

	if (BAD_BUFFER_ID(buffer))
1694
		return STATUS_ERROR;
1695 1696 1697 1698 1699

	bufHdr = &BufferDescriptors[buffer - 1];

	Assert(PrivateRefCount[buffer - 1] > 0);
	PrivateRefCount[buffer - 1]--;
1700
	if (PrivateRefCount[buffer - 1] == 0)
1701 1702
	{
		SpinAcquire(BufMgrLock);
1703
		Assert(bufHdr->refcount > 0);
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715
		bufHdr->refcount--;
		if (bufHdr->refcount == 0)
		{
			AddBufferToFreelist(bufHdr);
			bufHdr->flags |= BM_FREE;
		}
		if (CommitInfoNeedsSave[buffer - 1])
		{
			bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
			CommitInfoNeedsSave[buffer - 1] = 0;
		}
		SpinRelease(BufMgrLock);
1716
	}
1717

1718
	return STATUS_OK;
1719 1720
}

1721
#ifdef NOT_USED
1722 1723 1724
void
IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
{
1725 1726 1727
	IncrBufferRefCount(buffer);
	if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
	{
1728
		BufferDesc *buf = &BufferDescriptors[buffer - 1];
1729 1730

		fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
1731
refcount = %ld, file: %s, line: %d\n",
1732 1733 1734
				buffer, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[buffer - 1], file, line);
	}
1735
}
1736

1737
#endif
1738

1739
#ifdef NOT_USED
1740 1741 1742
void
ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
{
1743 1744 1745
	ReleaseBuffer(buffer);
	if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
	{
1746
		BufferDesc *buf = &BufferDescriptors[buffer - 1];
1747 1748

		fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
1749
refcount = %ld, file: %s, line: %d\n",
1750 1751 1752
				buffer, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[buffer - 1], file, line);
	}
1753
}
1754

1755
#endif
1756

1757
#ifdef NOT_USED
1758 1759
int
ReleaseAndReadBuffer_Debug(char *file,
1760 1761 1762 1763
						   int line,
						   Buffer buffer,
						   Relation relation,
						   BlockNumber blockNum)
1764
{
1765 1766
	bool		bufferValid;
	Buffer		b;
1767 1768 1769 1770 1771 1772

	bufferValid = BufferIsValid(buffer);
	b = ReleaseAndReadBuffer(buffer, relation, blockNum);
	if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
		&& is_userbuffer(buffer))
	{
1773
		BufferDesc *buf = &BufferDescriptors[buffer - 1];
1774 1775

		fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1776
refcount = %ld, file: %s, line: %d\n",
1777 1778 1779 1780 1781
				buffer, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[buffer - 1], file, line);
	}
	if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
	{
1782
		BufferDesc *buf = &BufferDescriptors[b - 1];
1783 1784

		fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1785
refcount = %ld, file: %s, line: %d\n",
1786 1787 1788 1789
				b, buf->sb_relname, buf->tag.blockNum,
				PrivateRefCount[b - 1], file, line);
	}
	return b;
1790
}
1791

1792
#endif
1793 1794 1795 1796

#ifdef BMTRACE

/*
1797 1798 1799
 *	trace allocations and deallocations in a circular buffer in
 *	shared memory.	check the buffer before doing the allocation,
 *	and die if there's anything fishy.
1800 1801 1802 1803
 */

_bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
{
1804 1805 1806
	long		start,
				cur;
	bmtrace    *tb;
1807 1808 1809 1810 1811

	start = *CurTraceBuf;

	if (start > 0)
		cur = start - 1;
1812
	else
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842
		cur = BMT_LIMIT - 1;

	for (;;)
	{
		tb = &TraceBuf[cur];
		if (tb->bmt_op != BMT_NOTUSED)
		{
			if (tb->bmt_buf == bufNo)
			{
				if ((tb->bmt_op == BMT_DEALLOC)
					|| (tb->bmt_dbid == dbId && tb->bmt_relid == relId
						&& tb->bmt_blkno == blkNo))
					goto okay;

				/* die holding the buffer lock */
				_bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
			}
		}

		if (cur == start)
			goto okay;

		if (cur == 0)
			cur = BMT_LIMIT - 1;
		else
			cur--;
	}

okay:
	tb = &TraceBuf[start];
Bruce Momjian's avatar
Bruce Momjian committed
1843
	tb->bmt_pid = MyProcPid;
1844 1845 1846 1847 1848 1849 1850
	tb->bmt_buf = bufNo;
	tb->bmt_dbid = dbId;
	tb->bmt_relid = relId;
	tb->bmt_blkno = blkNo;
	tb->bmt_op = allocType;

	*CurTraceBuf = (start + 1) % BMT_LIMIT;
1851 1852 1853
}

_bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
1854
		int allocType, long start, long cur)
1855
{
1856 1857 1858
	FILE	   *fp;
	bmtrace    *tb;
	int			i;
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877

	tb = &TraceBuf[cur];

	if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
		elog(FATAL, "buffer alloc trace error and can't open log file");

	fprintf(fp, "buffer alloc trace detected the following error:\n\n");
	fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
		 bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
			(tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));

	fprintf(fp, "the trace buffer contains:\n");

	i = start;
	for (;;)
	{
		tb = &TraceBuf[i];
		if (tb->bmt_op != BMT_NOTUSED)
		{
1878
			fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%u,%d> ",
1879 1880 1881 1882 1883 1884
					i, (i == cur ? " ---> " : "\t"),
					tb->bmt_pid, tb->bmt_buf,
					tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);

			switch (tb->bmt_op)
			{
1885 1886 1887
				case BMT_ALLOCFND:
					fprintf(fp, "allocate (found)\n");
					break;
1888

1889 1890 1891
				case BMT_ALLOCNOTFND:
					fprintf(fp, "allocate (not found)\n");
					break;
1892

1893 1894 1895
				case BMT_DEALLOC:
					fprintf(fp, "deallocate\n");
					break;
1896

1897 1898 1899
				default:
					fprintf(fp, "unknown op type %d\n", tb->bmt_op);
					break;
1900 1901 1902 1903 1904 1905 1906 1907 1908
			}
		}

		i = (i + 1) % BMT_LIMIT;
		if (i == start)
			break;
	}

	fprintf(fp, "\noperation causing error:\n");
1909
	fprintf(fp, "\tpid %d buf %d for <%d,%u,%d> ",
1910 1911 1912 1913
			getpid(), bufNo, dbId, relId, blkNo);

	switch (allocType)
	{
1914 1915 1916
		case BMT_ALLOCFND:
			fprintf(fp, "allocate (found)\n");
			break;
1917

1918 1919 1920
		case BMT_ALLOCNOTFND:
			fprintf(fp, "allocate (not found)\n");
			break;
1921

1922 1923 1924
		case BMT_DEALLOC:
			fprintf(fp, "deallocate\n");
			break;
1925

1926 1927 1928
		default:
			fprintf(fp, "unknown op type %d\n", allocType);
			break;
1929
	}
1930 1931 1932 1933

	FreeFile(fp);

	kill(getpid(), SIGILL);
1934 1935
}

1936
#endif	 /* BMTRACE */
1937

1938 1939
int
SetBufferWriteMode(int mode)
1940
{
1941
	int			old;
1942 1943 1944

	old = WriteMode;
	WriteMode = mode;
1945
	return old;
1946
}
1947

1948 1949
void
SetBufferCommitInfoNeedsSave(Buffer buffer)
1950
{
1951 1952
	if (!BufferIsLocal(buffer))
		CommitInfoNeedsSave[buffer - 1]++;
1953
}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964

void
UnlockBuffers()
{
	BufferDesc *buf;
	int			i;

	for (i = 0; i < NBuffers; i++)
	{
		if (BufferLocks[i] == 0)
			continue;
Bruce Momjian's avatar
Bruce Momjian committed
1965 1966

		Assert(BufferIsValid(i + 1));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981
		buf = &(BufferDescriptors[i]);

#ifdef HAS_TEST_AND_SET
		S_LOCK(&(buf->cntx_lock));
#else
		IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
#endif

		if (BufferLocks[i] & BL_R_LOCK)
		{
			Assert(buf->r_locks > 0);
			(buf->r_locks)--;
		}
		if (BufferLocks[i] & BL_RI_LOCK)
		{
1982 1983 1984 1985 1986 1987
			/* 
			 * Someone else could remove our RI lock when acquiring
			 * W lock. This is possible if we came here from elog(ERROR)
			 * from IpcSemaphore{Lock|Unlock}(WaitCLSemId). And so we
			 * don't do Assert(buf->ri_lock) here.
			 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004
			buf->ri_lock = false;
		}
		if (BufferLocks[i] & BL_W_LOCK)
		{
			Assert(buf->w_lock);
			buf->w_lock = false;
		}
#ifdef HAS_TEST_AND_SET
		S_UNLOCK(&(buf->cntx_lock));
#else
		IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
#endif
		BufferLocks[i] = 0;
	}
}

void
Bruce Momjian's avatar
Bruce Momjian committed
2005
LockBuffer(Buffer buffer, int mode)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2006 2007 2008 2009 2010 2011 2012
{
	BufferDesc *buf;

	Assert(BufferIsValid(buffer));
	if (BufferIsLocal(buffer))
		return;

Bruce Momjian's avatar
Bruce Momjian committed
2013
	buf = &(BufferDescriptors[buffer - 1]);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2014 2015

#ifdef HAS_TEST_AND_SET
Bruce Momjian's avatar
Bruce Momjian committed
2016
	S_LOCK(&(buf->cntx_lock));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2017
#else
Bruce Momjian's avatar
Bruce Momjian committed
2018
	IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2019 2020 2021 2022
#endif

	if (mode == BUFFER_LOCK_UNLOCK)
	{
Bruce Momjian's avatar
Bruce Momjian committed
2023
		if (BufferLocks[buffer - 1] & BL_R_LOCK)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2024 2025 2026
		{
			Assert(buf->r_locks > 0);
			Assert(!(buf->w_lock));
Bruce Momjian's avatar
Bruce Momjian committed
2027 2028 2029
			Assert(!(BufferLocks[buffer - 1] & (BL_W_LOCK | BL_RI_LOCK)))
				(buf->r_locks)--;
			BufferLocks[buffer - 1] &= ~BL_R_LOCK;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2030
		}
Bruce Momjian's avatar
Bruce Momjian committed
2031
		else if (BufferLocks[buffer - 1] & BL_W_LOCK)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2032 2033
		{
			Assert(buf->w_lock);
2034
			Assert(buf->r_locks == 0);
Bruce Momjian's avatar
Bruce Momjian committed
2035 2036 2037
			Assert(!(BufferLocks[buffer - 1] & (BL_R_LOCK | BL_RI_LOCK)))
				buf->w_lock = false;
			BufferLocks[buffer - 1] &= ~BL_W_LOCK;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2038 2039 2040 2041 2042 2043 2044 2045
		}
		else
			elog(ERROR, "UNLockBuffer: buffer %u is not locked", buffer);
	}
	else if (mode == BUFFER_LOCK_SHARE)
	{
		unsigned	i = 0;

Bruce Momjian's avatar
Bruce Momjian committed
2046
		Assert(!(BufferLocks[buffer - 1] & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2047 2048 2049 2050 2051 2052 2053 2054
		while (buf->ri_lock || buf->w_lock)
		{
#ifdef HAS_TEST_AND_SET
			S_UNLOCK(&(buf->cntx_lock));
			s_lock_sleep(i++);
			S_LOCK(&(buf->cntx_lock));
#else
			IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
2055 2056
			s_lock_sleep(i++);
			IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2057 2058 2059
#endif
		}
		(buf->r_locks)++;
Bruce Momjian's avatar
Bruce Momjian committed
2060
		BufferLocks[buffer - 1] |= BL_R_LOCK;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2061 2062 2063 2064
	}
	else if (mode == BUFFER_LOCK_EXCLUSIVE)
	{
		unsigned	i = 0;
Bruce Momjian's avatar
Bruce Momjian committed
2065 2066

		Assert(!(BufferLocks[buffer - 1] & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2067 2068
		while (buf->r_locks > 0 || buf->w_lock)
		{
2069
			if (buf->r_locks > 3 || (BufferLocks[buffer - 1] & BL_RI_LOCK))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2070
			{
2071 2072 2073 2074 2075 2076 2077
				/*
				 * Our RI lock might be removed by concurrent W lock
				 * acquiring (see what we do with RI locks below
				 * when our own W acquiring succeeded) and so
				 * we set RI lock again if we already did this.
				 */
				BufferLocks[buffer - 1] |= BL_RI_LOCK;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2078 2079 2080 2081 2082 2083 2084 2085
				buf->ri_lock = true;
			}
#ifdef HAS_TEST_AND_SET
			S_UNLOCK(&(buf->cntx_lock));
			s_lock_sleep(i++);
			S_LOCK(&(buf->cntx_lock));
#else
			IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
2086 2087
			s_lock_sleep(i++);
			IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2088 2089 2090
#endif
		}
		buf->w_lock = true;
Bruce Momjian's avatar
Bruce Momjian committed
2091 2092
		BufferLocks[buffer - 1] |= BL_W_LOCK;
		if (BufferLocks[buffer - 1] & BL_RI_LOCK)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2093
		{
2094 2095 2096 2097
			/*
			 * It's possible to remove RI locks acquired by another
			 * W lockers here, but they'll take care about it.
			 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2098
			buf->ri_lock = false;
Bruce Momjian's avatar
Bruce Momjian committed
2099
			BufferLocks[buffer - 1] &= ~BL_RI_LOCK;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2100 2101 2102 2103 2104 2105
		}
	}
	else
		elog(ERROR, "LockBuffer: unknown lock mode %d", mode);

#ifdef HAS_TEST_AND_SET
Bruce Momjian's avatar
Bruce Momjian committed
2106
	S_UNLOCK(&(buf->cntx_lock));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2107
#else
Bruce Momjian's avatar
Bruce Momjian committed
2108
	IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2109 2110 2111
#endif

}