clog.c 30.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*-------------------------------------------------------------------------
 *
 * clog.c
 *		PostgreSQL transaction-commit-log manager
 *
 * This module replaces the old "pg_log" access code, which treated pg_log
 * essentially like a relation, in that it went through the regular buffer
 * manager.  The problem with that was that there wasn't any good way to
 * recycle storage space for transactions so old that they'll never be
 * looked up again.  Now we use specialized access code so that the commit
 * log can be broken into relatively small, independent segments.
 *
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
16
 * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.4 2001/09/29 04:02:21 tgl Exp $
17 18 19 20 21 22 23 24
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
25 26
#include <sys/stat.h>
#include <sys/types.h>
27 28 29
#include <unistd.h>

#include "access/clog.h"
30
#include "storage/lwlock.h"
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
#include "miscadmin.h"


/*
 * Defines for CLOG page and segment sizes.  A page is the same BLCKSZ
 * as is used everywhere else in Postgres.  The CLOG segment size can be
 * chosen somewhat arbitrarily; we make it 1 million transactions by default,
 * or 256Kb.
 *
 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
 * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
 * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
 * take no explicit notice of that fact in this module, except when comparing
 * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
 */

#define CLOG_BLCKSZ			BLCKSZ

/* We need two bits per xact, so four xacts fit in a byte */
#define CLOG_BITS_PER_XACT	2
#define CLOG_XACTS_PER_BYTE	4
#define CLOG_XACTS_PER_PAGE	(CLOG_BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK	((1 << CLOG_BITS_PER_XACT) - 1)

#define CLOG_XACTS_PER_SEGMENT	0x100000
#define CLOG_PAGES_PER_SEGMENT	(CLOG_XACTS_PER_SEGMENT / CLOG_XACTS_PER_PAGE)

#define TransactionIdToPage(xid)	((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToPgIndex(xid)	((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToByte(xid)	(TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
#define TransactionIdToBIndex(xid)	((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)


/*----------
 * Shared-memory data structures for CLOG control
 *
 * We use a simple least-recently-used scheme to manage a pool of page
 * buffers for the CLOG.  Under ordinary circumstances we expect that write
 * traffic will occur mostly to the latest CLOG page (and to the just-prior
 * page, soon after a page transition).  Read traffic will probably touch
 * a larger span of pages, but in any case a fairly small number of page
 * buffers should be sufficient.  So, we just search the buffers using plain
 * linear search; there's no need for a hashtable or anything fancy.
 * The management algorithm is straight LRU except that we will never swap
 * out the latest page (since we know it's going to be hit again eventually).
 *
77 78
 * We use an overall LWLock to protect the shared data structures, plus
 * per-buffer LWLocks that synchronize I/O for each buffer.  A process
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
 * that is reading in or writing out a page buffer does not hold the control
 * lock, only the per-buffer lock for the buffer it is working on.
 *
 * To change the page number or state of a buffer, one must normally hold
 * the control lock.  (The sole exception to this rule is that a writer
 * process changes the state from DIRTY to WRITE_IN_PROGRESS while holding
 * only the per-buffer lock.)  If the buffer's state is neither EMPTY nor
 * CLEAN, then there may be processes doing (or waiting to do) I/O on the
 * buffer, so the page number may not be changed, and the only allowed state
 * transition is to change WRITE_IN_PROGRESS to DIRTY after dirtying the page.
 * To do any other state transition involving a buffer with potential I/O
 * processes, one must hold both the per-buffer lock and the control lock.
 * (Note the control lock must be acquired second; do not wait on a buffer
 * lock while holding the control lock.)  A process wishing to read a page
 * marks the buffer state as READ_IN_PROGRESS, then drops the control lock,
 * acquires the per-buffer lock, and rechecks the state before proceeding.
 * This recheck takes care of the possibility that someone else already did
 * the read, while the early marking prevents someone else from trying to
 * read the same page into a different buffer.
 *
 * Note we are assuming that read and write of the state value is atomic,
 * since I/O processes may examine and change the state while not holding
 * the control lock.
 *
 * As with the regular buffer manager, it is possible for another process
 * to re-dirty a page that is currently being written out.  This is handled
 * by setting the page's state from WRITE_IN_PROGRESS to DIRTY.  The writing
 * process must notice this and not mark the page CLEAN when it's done.
 *
 * XLOG interactions: this module generates an XLOG record whenever a new
 * CLOG page is initialized to zeroes.  Other writes of CLOG come from
 * recording of transaction commit or abort in xact.c, which generates its
 * own XLOG records for these events and will re-perform the status update
 * on redo; so we need make no additional XLOG entry here.  Also, the XLOG
 * is guaranteed flushed through the XLOG commit record before we are called
 * to log a commit, so the WAL rule "write xlog before data" is satisfied
 * automatically for commits, and we don't really care for aborts.  Therefore,
 * we don't need to mark XLOG pages with LSN information; we have enough
 * synchronization already.
 *----------
 */

typedef enum
{
	CLOG_PAGE_EMPTY,			/* CLOG buffer is not in use */
	CLOG_PAGE_READ_IN_PROGRESS,	/* CLOG page is being read in */
	CLOG_PAGE_CLEAN,			/* CLOG page is valid and not dirty */
	CLOG_PAGE_DIRTY,			/* CLOG page is valid but needs write */
	CLOG_PAGE_WRITE_IN_PROGRESS	/* CLOG page is being written out in */
} ClogPageStatus;

/*
 * Shared-memory state for CLOG.
 */
typedef struct ClogCtlData
{
	/*
	 * Info for each buffer slot.  Page number is undefined when status is
	 * EMPTY.  lru_count is essentially the number of operations since last
	 * use of this page; the page with highest lru_count is the best candidate
	 * to replace.
	 */
	char	   *page_buffer[NUM_CLOG_BUFFERS];
	ClogPageStatus	page_status[NUM_CLOG_BUFFERS];
	int			page_number[NUM_CLOG_BUFFERS];
	unsigned int	page_lru_count[NUM_CLOG_BUFFERS];
	/*
	 * latest_page_number is the page number of the current end of the
	 * CLOG; this is not critical data, since we use it only to avoid
	 * swapping out the latest page.
	 */
	int			latest_page_number;
} ClogCtlData;

static ClogCtlData *ClogCtl = NULL;

155 156 157 158 159 160 161
/*
 * ClogBufferLocks is set during CLOGShmemInit and does not change thereafter.
 * The value is automatically inherited by backends via fork, and
 * doesn't need to be in shared memory.
 */
static LWLockId ClogBufferLocks[NUM_CLOG_BUFFERS]; /* Per-buffer I/O locks */

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
/*
 * ClogDir is set during CLOGShmemInit and does not change thereafter.
 * The value is automatically inherited by backends via fork, and
 * doesn't need to be in shared memory.
 */
static char ClogDir[MAXPGPATH];

#define ClogFileName(path, seg)	\
	snprintf(path, MAXPGPATH, "%s/%04X", ClogDir, seg)

/*
 * Macro to mark a buffer slot "most recently used".
 */
#define ClogRecentlyUsed(slotno)	\
	do { \
		int		iilru; \
		for (iilru = 0; iilru < NUM_CLOG_BUFFERS; iilru++) \
			ClogCtl->page_lru_count[iilru]++; \
		ClogCtl->page_lru_count[slotno] = 0; \
	} while (0)


static int	ZeroCLOGPage(int pageno, bool writeXlog);
static int	ReadCLOGPage(int pageno);
static void WriteCLOGPage(int slotno);
static void CLOGPhysicalReadPage(int pageno, int slotno);
static void CLOGPhysicalWritePage(int pageno, int slotno);
static int	SelectLRUCLOGPage(int pageno);
static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions);
static bool CLOGPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno);


/*
 * Record the final state of a transaction in the commit log.
 *
 * NB: this is a low-level routine and is NOT the preferred entry point
 * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
 */
void
TransactionIdSetStatus(TransactionId xid, XidStatus status)
{
	int			pageno = TransactionIdToPage(xid);
	int			byteno = TransactionIdToByte(xid);
	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
	int			slotno;
	char	   *byteptr;

	Assert(status == TRANSACTION_STATUS_COMMITTED ||
		   status == TRANSACTION_STATUS_ABORTED);

213
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
214 215 216 217 218 219 220 221 222 223 224 225

	slotno = ReadCLOGPage(pageno);
	byteptr = ClogCtl->page_buffer[slotno] + byteno;

	/* Current state should be 0 or target state */
	Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
		   ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);

	*byteptr |= (status << bshift);

	ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;

226
	LWLockRelease(CLogControlLock);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
}

/*
 * Interrogate the state of a transaction in the commit log.
 *
 * NB: this is a low-level routine and is NOT the preferred entry point
 * for most uses; TransactionLogTest() in transam.c is the intended caller.
 */
XidStatus
TransactionIdGetStatus(TransactionId xid)
{
	int			pageno = TransactionIdToPage(xid);
	int			byteno = TransactionIdToByte(xid);
	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
	int			slotno;
	char	   *byteptr;
	XidStatus	status;

245
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
246 247 248 249 250 251

	slotno = ReadCLOGPage(pageno);
	byteptr = ClogCtl->page_buffer[slotno] + byteno;

	status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;

252
	LWLockRelease(CLogControlLock);
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290

	return status;
}


/*
 * Initialization of shared memory for CLOG
 */

int
CLOGShmemSize(void)
{
	return MAXALIGN(sizeof(ClogCtlData) + CLOG_BLCKSZ * NUM_CLOG_BUFFERS);
}

void
CLOGShmemInit(void)
{
	bool		found;
	char	   *bufptr;
	int			slotno;

	/* this must agree with space requested by CLOGShmemSize() */
	ClogCtl = (ClogCtlData *)
		ShmemInitStruct("CLOG Ctl",
						MAXALIGN(sizeof(ClogCtlData) +
								 CLOG_BLCKSZ * NUM_CLOG_BUFFERS),
						&found);
	Assert(!found);

	memset(ClogCtl, 0, sizeof(ClogCtlData));

	bufptr = ((char *) ClogCtl) + sizeof(ClogCtlData);

	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		ClogCtl->page_buffer[slotno] = bufptr;
		ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
291
		ClogBufferLocks[slotno] = LWLockAssign();
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
		bufptr += CLOG_BLCKSZ;
	}

	/* ClogCtl->latest_page_number will be set later */

	/* Init CLOG directory path */
	snprintf(ClogDir, MAXPGPATH, "%s/pg_clog", DataDir);
}

/*
 * This func must be called ONCE on system install.  It creates
 * the initial CLOG segment.  (The CLOG directory is assumed to
 * have been created by the initdb shell script, and CLOGShmemInit
 * must have been called already.)
 */
void
BootStrapCLOG(void)
{
	int			slotno;

312
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
313 314 315 316 317 318 319 320

	/* Create and zero the first page of the commit log */
	slotno = ZeroCLOGPage(0, false);

	/* Make sure it's written out */
	WriteCLOGPage(slotno);
	Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);

321
	LWLockRelease(CLogControlLock);
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
}

/*
 * Initialize (or reinitialize) a page of CLOG to zeroes.
 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
 *
 * The page is not actually written, just set up in shared memory.
 * The slot number of the new page is returned.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ZeroCLOGPage(int pageno, bool writeXlog)
{
	int			slotno;

	/* Find a suitable buffer slot for the page */
	slotno = SelectLRUCLOGPage(pageno);
	Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
		   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN ||
		   ClogCtl->page_number[slotno] == pageno);

	/* Mark the slot as containing this page */
	ClogCtl->page_number[slotno] = pageno;
	ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
	ClogRecentlyUsed(slotno);

	/* Set the buffer to zeroes */
	MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);

	/* Assume this page is now the latest active page */
	ClogCtl->latest_page_number = pageno;

	if (writeXlog)
		WriteZeroPageXlogRec(pageno);

	return slotno;
}

/*
 * Find a CLOG page in a shared buffer, reading it in if necessary.
 * The page number must correspond to an already-initialized page.
 *
 * Return value is the shared-buffer slot number now holding the page.
 * The buffer's LRU access info is updated.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ReadCLOGPage(int pageno)
{
	/* Outer loop handles restart if we lose the buffer to someone else */
	for (;;)
	{
		int			slotno;

		/* See if page already is in memory; if not, pick victim slot */
		slotno = SelectLRUCLOGPage(pageno);

		/* Did we find the page in memory? */
		if (ClogCtl->page_number[slotno] == pageno &&
			ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
		{
			/* If page is still being read in, we cannot use it yet */
			if (ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
			{
				/* otherwise, it's ready to use */
				ClogRecentlyUsed(slotno);
				return slotno;
			}
		}
		else
		{
			/* We found no match; assert we selected a freeable slot */
			Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
				   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
		}

		/* Mark the slot read-busy (no-op if it already was) */
		ClogCtl->page_number[slotno] = pageno;
		ClogCtl->page_status[slotno] = CLOG_PAGE_READ_IN_PROGRESS;

		/*
		 * Temporarily mark page as recently-used to discourage
		 * SelectLRUCLOGPage from selecting it again for someone else.
		 */
		ClogCtl->page_lru_count[slotno] = 0;

		/* Release shared lock, grab per-buffer lock instead */
411 412
		LWLockRelease(CLogControlLock);
		LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
413 414 415 416 417 418 419 420

		/*
		 * Check to see if someone else already did the read, or took the
		 * buffer away from us.  If so, restart from the top.
		 */
		if (ClogCtl->page_number[slotno] != pageno ||
			ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
		{
421 422
			LWLockRelease(ClogBufferLocks[slotno]);
			LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
423 424 425 426 427 428 429
			continue;
		}

		/* Okay, do the read */
		CLOGPhysicalReadPage(pageno, slotno);

		/* Re-acquire shared control lock and update page state */
430
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
431 432 433 434 435 436

		Assert(ClogCtl->page_number[slotno] == pageno &&
			   ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS);

		ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;

437
		LWLockRelease(ClogBufferLocks[slotno]);
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467

		ClogRecentlyUsed(slotno);
		return slotno;
	}
}

/*
 * Write a CLOG page from a shared buffer, if necessary.
 * Does nothing if the specified slot is not dirty.
 *
 * NOTE: only one write attempt is made here.  Hence, it is possible that
 * the page is still dirty at exit (if someone else re-dirtied it during
 * the write).  However, we *do* attempt a fresh write even if the page
 * is already being written; this is for checkpoints.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static void
WriteCLOGPage(int slotno)
{
	int pageno;

	/* Do nothing if page does not need writing */
	if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
		ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS)
		return;

	pageno = ClogCtl->page_number[slotno];

	/* Release shared lock, grab per-buffer lock instead */
468 469
	LWLockRelease(CLogControlLock);
	LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
470 471 472 473 474 475 476 477 478 479 480 481

	/*
	 * Check to see if someone else already did the write, or took the
	 * buffer away from us.  If so, do nothing.  NOTE: we really should
	 * never see WRITE_IN_PROGRESS here, since that state should only
	 * occur while the writer is holding the buffer lock.  But accept it
	 * so that we have a recovery path if a writer aborts.
	 */
	if (ClogCtl->page_number[slotno] != pageno ||
		(ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
		 ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS))
	{
482 483
		LWLockRelease(ClogBufferLocks[slotno]);
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
		return;
	}

	/*
	 * Mark the slot write-busy.  After this point, a transaction status
	 * update on this page will mark it dirty again.  NB: we are assuming
	 * that read/write of the page status field is atomic, since we change
	 * the state while not holding control lock.  However, we cannot set
	 * this state any sooner, or we'd possibly fool a previous writer
	 * into thinking he's successfully dumped the page when he hasn't.
	 * (Scenario: other writer starts, page is redirtied, we come along and
	 * set WRITE_IN_PROGRESS again, other writer completes and sets CLEAN
	 * because redirty info has been lost, then we think it's clean too.)
	 */
	ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS;

	/* Okay, do the write */
	CLOGPhysicalWritePage(pageno, slotno);

	/* Re-acquire shared control lock and update page state */
504
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
505 506 507 508 509 510 511 512 513

	Assert(ClogCtl->page_number[slotno] == pageno &&
		   (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS ||
			ClogCtl->page_status[slotno] == CLOG_PAGE_DIRTY));

	/* Cannot set CLEAN if someone re-dirtied page since write started */
	if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS)
		ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;

514
	LWLockRelease(ClogBufferLocks[slotno]);
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
}

/*
 * Physical read of a (previously existing) page into a buffer slot
 *
 * For now, assume it's not worth keeping a file pointer open across
 * read/write operations.  We could cache one virtual file pointer ...
 */
static void
CLOGPhysicalReadPage(int pageno, int slotno)
{
	int			segno =   pageno / CLOG_PAGES_PER_SEGMENT;
	int			rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
	int			offset = rpageno * CLOG_BLCKSZ;
	char		path[MAXPGPATH];
	int			fd;

	ClogFileName(path, segno);

	/*
	 * In a crash-and-restart situation, it's possible for us to receive
	 * commands to set the commit status of transactions whose bits are
	 * in already-truncated segments of the commit log (see notes in
	 * CLOGPhysicalWritePage).  Hence, if we are InRecovery, allow the
	 * case where the file doesn't exist, and return zeroes instead.
	 */
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
	{
		if (errno != ENOENT || !InRecovery)
			elog(STOP, "open of %s failed: %m", path);
		elog(DEBUG, "clog file %s doesn't exist, reading as zeroes", path);
		MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
		return;
	}

	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
		elog(STOP, "lseek of clog file %u, offset %u failed: %m",
			 segno, offset);

	errno = 0;
	if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
		elog(STOP, "read of clog file %u, offset %u failed: %m",
			 segno, offset);

	close(fd);
}

/*
 * Physical write of a page from a buffer slot
 *
 * For now, assume it's not worth keeping a file pointer open across
 * read/write operations.  We could cache one virtual file pointer ...
 */
static void
CLOGPhysicalWritePage(int pageno, int slotno)
{
	int			segno =   pageno / CLOG_PAGES_PER_SEGMENT;
	int			rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
	int			offset = rpageno * CLOG_BLCKSZ;
	char		path[MAXPGPATH];
	int			fd;

	ClogFileName(path, segno);

	/*
	 * If the file doesn't already exist, we should create it.  It is possible
	 * for this to need to happen when writing a page that's not first in
	 * its segment; we assume the OS can cope with that.  (Note: it might seem
	 * that it'd be okay to create files only when ZeroCLOGPage is called for
	 * the first page of a segment.  However, if after a crash and restart
	 * the REDO logic elects to replay the log from a checkpoint before the
	 * latest one, then it's possible that we will get commands to set
	 * transaction status of transactions that have already been truncated
	 * from the commit log.  Easiest way to deal with that is to accept
	 * references to nonexistent files here and in CLOGPhysicalReadPage.)
	 */
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
	{
		if (errno != ENOENT)
			elog(STOP, "open of %s failed: %m", path);
		fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
						   S_IRUSR | S_IWUSR);
		if (fd < 0)
			elog(STOP, "creation of file %s failed: %m", path);
	}

	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
		elog(STOP, "lseek of clog file %u, offset %u failed: %m",
			 segno, offset);

	errno = 0;
	if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
	{
		/* if write didn't set errno, assume problem is no disk space */
		if (errno == 0)
			errno = ENOSPC;
		elog(STOP, "write of clog file %u, offset %u failed: %m",
			 segno, offset);
	}

	close(fd);
}

/*
 * Select the slot to re-use when we need a free slot.
 *
 * The target page number is passed because we need to consider the
 * possibility that some other process reads in the target page while
 * we are doing I/O to free a slot.  Hence, check or recheck to see if
 * any slot already holds the target page, and return that slot if so.
 * Thus, the returned slot is *either* a slot already holding the pageno
 * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
 * or CLEAN).
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
SelectLRUCLOGPage(int pageno)
{
	/* Outer loop handles restart after I/O */
	for (;;)
	{
		int			slotno;
		int			bestslot = 0;
		unsigned int bestcount = 0;

		/* See if page already has a buffer assigned */
		for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
		{
			if (ClogCtl->page_number[slotno] == pageno &&
				ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
				return slotno;
		}

		/*
		 * If we find any EMPTY slot, just select that one.
		 * Else locate the least-recently-used slot that isn't the
		 * latest CLOG page.
		 */
		for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
		{
			if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
				return slotno;
			if (ClogCtl->page_lru_count[slotno] > bestcount &&
				ClogCtl->page_number[slotno] != ClogCtl->latest_page_number)
			{
				bestslot = slotno;
				bestcount = ClogCtl->page_lru_count[slotno];
			}
		}

		/*
		 * If the selected page is clean, we're set.
		 */
		if (ClogCtl->page_status[bestslot] == CLOG_PAGE_CLEAN)
			return bestslot;

		/*
		 * We need to do I/O.  Normal case is that we have to write it out,
		 * but it's possible in the worst case to have selected a read-busy
		 * page.  In that case we use ReadCLOGPage to wait for the read to
		 * complete.
		 */
		if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS)
			(void) ReadCLOGPage(ClogCtl->page_number[bestslot]);
		else
			WriteCLOGPage(bestslot);

		/*
		 * Now loop back and try again.  This is the easiest way of dealing
		 * with corner cases such as the victim page being re-dirtied while
		 * we wrote it.
		 */
	}
}

/*
 * This must be called ONCE during postmaster or standalone-backend startup,
 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
 */
void
StartupCLOG(void)
{
	/*
	 * Initialize our idea of the latest page number.
	 */
	ClogCtl->latest_page_number = TransactionIdToPage(ShmemVariableCache->nextXid);
}

/*
 * This must be called ONCE during postmaster or standalone-backend shutdown
 */
void
ShutdownCLOG(void)
{
	int			slotno;

714
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
715 716 717 718 719 720 721 722

	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		WriteCLOGPage(slotno);
		Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
			   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
	}

723
	LWLockRelease(CLogControlLock);
724 725 726 727 728 729 730 731 732 733
}

/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
void
CheckPointCLOG(void)
{
	int			slotno;

734
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
735 736 737 738 739 740 741 742 743 744

	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		WriteCLOGPage(slotno);
		/*
		 * We cannot assert that the slot is clean now, since another
		 * process might have re-dirtied it already.  That's okay.
		 */
	}

745
	LWLockRelease(CLogControlLock);
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
}


/*
 * Make sure that CLOG has room for a newly-allocated XID.
 *
 * NB: this is called while holding XidGenLock.  We want it to be very fast
 * most of the time; even when it's not so fast, no actual I/O need happen
 * unless we're forced to write out a dirty clog or xlog page to make room
 * in shared memory.
 */
void
ExtendCLOG(TransactionId newestXact)
{
	int			pageno;

762 763 764 765 766 767
	/*
	 * No work except at first XID of a page.  But beware: just after
	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
	 */
	if (TransactionIdToPgIndex(newestXact) != 0 &&
		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
768 769 770 771
		return;

	pageno = TransactionIdToPage(newestXact);

772
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
773 774 775 776

	/* Zero the page and make an XLOG entry about it */
	ZeroCLOGPage(pageno, true);

777
	LWLockRelease(CLogControlLock);
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
}


/*
 * Remove all CLOG segments before the one holding the passed transaction ID
 *
 * When this is called, we know that the database logically contains no
 * reference to transaction IDs older than oldestXact.  However, we must
 * not truncate the CLOG until we have performed a checkpoint, to ensure
 * that no such references remain on disk either; else a crash just after
 * the truncation might leave us with a problem.  Since CLOG segments hold
 * a large number of transactions, the opportunity to actually remove a
 * segment is fairly rare, and so it seems best not to do the checkpoint
 * unless we have confirmed that there is a removable segment.  Therefore
 * we issue the checkpoint command here, not in higher-level code as might
 * seem cleaner.
 */
void
TruncateCLOG(TransactionId oldestXact)
{
	int			cutoffPage;
	int			slotno;

	/*
	 * The cutoff point is the start of the segment containing oldestXact.
	 */
	oldestXact -= oldestXact % CLOG_XACTS_PER_SEGMENT;
	cutoffPage = TransactionIdToPage(oldestXact);

	if (!ScanCLOGDirectory(cutoffPage, false))
		return;					/* nothing to remove */

	/* Perform a CHECKPOINT */
	CreateCheckPoint(false);

	/*
	 * Scan CLOG shared memory and remove any pages preceding the cutoff
	 * page, to ensure we won't rewrite them later.  (Any dirty pages
	 * should have been flushed already during the checkpoint, we're
	 * just being extra careful here.)
	 */
819
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
820 821

restart:;
822 823 824 825 826 827 828 829
	/*
	 * While we are holding the lock, make an important safety check:
	 * the planned cutoff point must be <= the current CLOG endpoint page.
	 * Otherwise we have already wrapped around, and proceeding with the
	 * truncation would risk removing the current CLOG segment.
	 */
	if (CLOGPagePrecedes(ClogCtl->latest_page_number, cutoffPage))
	{
830
		LWLockRelease(CLogControlLock);
831 832 833
		elog(LOG, "unable to truncate commit log: apparent wraparound");
		return;
	}
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860

	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
			continue;
		if (!CLOGPagePrecedes(ClogCtl->page_number[slotno], cutoffPage))
			continue;
		/*
		 * If page is CLEAN, just change state to EMPTY (expected case).
		 */
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN)
		{
			ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
			continue;
		}
		/*
		 * Hmm, we have (or may have) I/O operations acting on the page,
		 * so we've got to wait for them to finish and then start again.
		 * This is the same logic as in SelectLRUCLOGPage.
		 */
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS)
			(void) ReadCLOGPage(ClogCtl->page_number[slotno]);
		else
			WriteCLOGPage(slotno);
		goto restart;
	}

861
	LWLockRelease(CLogControlLock);
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899

	/* Now we can remove the old CLOG segment(s) */
	(void) ScanCLOGDirectory(cutoffPage, true);
}

/*
 * TruncateCLOG subroutine: scan CLOG directory for removable segments.
 * Actually remove them iff doDeletions is true.  Return TRUE iff any
 * removable segments were found.  Note: no locking is needed.
 */
static bool
ScanCLOGDirectory(int cutoffPage, bool doDeletions)
{
	bool		found = false;
	DIR		   *cldir;
	struct dirent *clde;
	int			segno;
	int			segpage;
	char		path[MAXPGPATH];

	cldir = opendir(ClogDir);
	if (cldir == NULL)
		elog(STOP, "could not open transaction-commit log directory (%s): %m",
			 ClogDir);

	errno = 0;
	while ((clde = readdir(cldir)) != NULL)
	{
		if (strlen(clde->d_name) == 4 &&
			strspn(clde->d_name, "0123456789ABCDEF") == 4)
		{
			segno = (int) strtol(clde->d_name, NULL, 16);
			segpage = segno * CLOG_PAGES_PER_SEGMENT;
			if (CLOGPagePrecedes(segpage, cutoffPage))
			{
				found = true;
				if (doDeletions)
				{
900
					elog(LOG, "removing commit log file %s", clde->d_name);
901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973
					snprintf(path, MAXPGPATH, "%s/%s", ClogDir, clde->d_name);
					unlink(path);
				}
			}
		}
		errno = 0;
	}
	if (errno)
		elog(STOP, "could not read transaction-commit log directory (%s): %m",
			 ClogDir);
	closedir(cldir);

	return found;
}

/*
 * Decide which of two CLOG page numbers is "older" for truncation purposes.
 *
 * We need to use comparison of TransactionIds here in order to do the right
 * thing with wraparound XID arithmetic.  However, if we are asked about
 * page number zero, we don't want to hand InvalidTransactionId to
 * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
 * offset both xids by FirstNormalTransactionId to avoid that.
 */
static bool
CLOGPagePrecedes(int page1, int page2)
{
	TransactionId xid1;
	TransactionId xid2;

	xid1 = (TransactionId) page1 * CLOG_XACTS_PER_PAGE;
	xid1 += FirstNormalTransactionId;
	xid2 = (TransactionId) page2 * CLOG_XACTS_PER_PAGE;
	xid2 += FirstNormalTransactionId;

	return TransactionIdPrecedes(xid1, xid2);
}


/*
 * Write a ZEROPAGE xlog record
 *
 * Note: xlog record is marked as outside transaction control, since we
 * want it to be redone whether the invoking transaction commits or not.
 * (Besides which, this is normally done just before entering a transaction.)
 */
static void
WriteZeroPageXlogRec(int pageno)
{
	XLogRecData rdata;

	rdata.buffer = InvalidBuffer;
	rdata.data = (char *) (&pageno);
	rdata.len = sizeof(int);
	rdata.next = NULL;
	(void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
}

/*
 * CLOG resource manager's routines
 */
void
clog_redo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8		info = record->xl_info & ~XLR_INFO_MASK;

	if (info == CLOG_ZEROPAGE)
	{
		int		pageno;
		int		slotno;

		memcpy(&pageno, XLogRecGetData(record), sizeof(int));

974
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
975 976 977 978 979

		slotno = ZeroCLOGPage(pageno, false);
		WriteCLOGPage(slotno);
		Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);

980
		LWLockRelease(CLogControlLock);
981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003
	}
}

void
clog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}

void
clog_desc(char *buf, uint8 xl_info, char *rec)
{
	uint8		info = xl_info & ~XLR_INFO_MASK;

	if (info == CLOG_ZEROPAGE)
	{
		int		pageno;

		memcpy(&pageno, rec, sizeof(int));
		sprintf(buf + strlen(buf), "zeropage: %d", pageno);
	}
	else
		strcat(buf, "UNKNOWN");
}