xlog.c 179 KB
Newer Older
1
/*-------------------------------------------------------------------------
2 3
 *
 * xlog.c
4
 *		PostgreSQL transaction log manager
5 6
 *
 *
7
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.241 2006/06/22 20:42:57 tgl Exp $
11 12 13
 *
 *-------------------------------------------------------------------------
 */
14

15 16
#include "postgres.h"

17
#include <ctype.h>
18
#include <fcntl.h>
Tom Lane's avatar
Tom Lane committed
19
#include <signal.h>
20
#include <time.h>
21 22
#include <unistd.h>
#include <sys/stat.h>
23
#include <sys/time.h>
24

25
#include "access/clog.h"
26
#include "access/multixact.h"
27
#include "access/subtrans.h"
28
#include "access/twophase.h"
29
#include "access/xact.h"
30
#include "access/xlog.h"
31
#include "access/xlog_internal.h"
32
#include "access/xlogutils.h"
33
#include "catalog/catversion.h"
Tom Lane's avatar
Tom Lane committed
34
#include "catalog/pg_control.h"
35
#include "miscadmin.h"
36
#include "pgstat.h"
37
#include "postmaster/bgwriter.h"
38
#include "storage/bufpage.h"
39
#include "storage/fd.h"
40
#include "storage/lwlock.h"
41
#include "storage/pmsignal.h"
42
#include "storage/proc.h"
43
#include "storage/procarray.h"
44
#include "storage/spin.h"
45
#include "utils/builtins.h"
46
#include "utils/guc.h"
47
#include "utils/nabstime.h"
48
#include "utils/pg_locale.h"
49
#include "utils/relcache.h"
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
50

51

52
/*
53
 *	Because O_DIRECT bypasses the kernel buffers, and because we never
54
 *	read those buffers except during crash recovery, it is a win to use
55
 *	it in all cases where we sync on each write().	We could allow O_DIRECT
56 57 58
 *	with fsync(), but because skipping the kernel buffer forces writes out
 *	quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
 *	how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
Bruce Momjian's avatar
Bruce Momjian committed
59 60
 *	Also, O_DIRECT is never enough to force data to the drives, it merely
 *	tries to bypass the kernel cache, so we still need O_SYNC or fsync().
61 62 63 64 65 66 67
 */
#ifdef O_DIRECT
#define PG_O_DIRECT				O_DIRECT
#else
#define PG_O_DIRECT				0
#endif

68 69 70
/*
 * This chunk of hackery attempts to determine which file sync methods
 * are available on the current platform, and to choose an appropriate
71
 * default method.	We assume that fsync() is always available, and that
72 73
 * configure determined whether fdatasync() is.
 */
74 75
#if defined(O_SYNC)
#define BARE_OPEN_SYNC_FLAG		O_SYNC
76
#elif defined(O_FSYNC)
77
#define BARE_OPEN_SYNC_FLAG		O_FSYNC
78
#endif
79 80
#ifdef BARE_OPEN_SYNC_FLAG
#define OPEN_SYNC_FLAG			(BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
81
#endif
82

83 84
#if defined(O_DSYNC)
#if defined(OPEN_SYNC_FLAG)
85
/* O_DSYNC is distinct? */
86
#if O_DSYNC != BARE_OPEN_SYNC_FLAG
87
#define OPEN_DATASYNC_FLAG		(O_DSYNC | PG_O_DIRECT)
88
#endif
89
#else							/* !defined(OPEN_SYNC_FLAG) */
90
/* Win32 only has O_DSYNC */
91
#define OPEN_DATASYNC_FLAG		(O_DSYNC | PG_O_DIRECT)
92
#endif
93 94
#endif

95
#if defined(OPEN_DATASYNC_FLAG)
96
#define DEFAULT_SYNC_METHOD_STR "open_datasync"
Bruce Momjian's avatar
Bruce Momjian committed
97 98
#define DEFAULT_SYNC_METHOD		SYNC_METHOD_OPEN
#define DEFAULT_SYNC_FLAGBIT	OPEN_DATASYNC_FLAG
99
#elif defined(HAVE_FDATASYNC)
Bruce Momjian's avatar
Bruce Momjian committed
100 101 102
#define DEFAULT_SYNC_METHOD_STR "fdatasync"
#define DEFAULT_SYNC_METHOD		SYNC_METHOD_FDATASYNC
#define DEFAULT_SYNC_FLAGBIT	0
103
#elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
Bruce Momjian's avatar
Bruce Momjian committed
104 105 106
#define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
#define DEFAULT_SYNC_METHOD		SYNC_METHOD_FSYNC_WRITETHROUGH
#define DEFAULT_SYNC_FLAGBIT	0
107 108 109 110
#else
#define DEFAULT_SYNC_METHOD_STR "fsync"
#define DEFAULT_SYNC_METHOD		SYNC_METHOD_FSYNC
#define DEFAULT_SYNC_FLAGBIT	0
111
#endif
112 113


114 115
/*
 * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
116
 * but XLOG_BLCKSZ is assumed to be enough for it.
117 118
 */
#ifdef O_DIRECT
119
#define ALIGNOF_XLOG_BUFFER		XLOG_BLCKSZ
120 121 122 123 124
#else
#define ALIGNOF_XLOG_BUFFER		ALIGNOF_BUFFER
#endif


125 126 127 128 129 130
/* File path names (all relative to $PGDATA) */
#define BACKUP_LABEL_FILE		"backup_label"
#define RECOVERY_COMMAND_FILE	"recovery.conf"
#define RECOVERY_COMMAND_DONE	"recovery.done"


Tom Lane's avatar
Tom Lane committed
131 132
/* User-settable parameters */
int			CheckPointSegments = 3;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
133
int			XLOGbuffers = 8;
134
char	   *XLogArchiveCommand = NULL;
135 136
char	   *XLOG_sync_method = NULL;
const char	XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
137
bool		fullPageWrites = true;
Tom Lane's avatar
Tom Lane committed
138

139 140 141 142
#ifdef WAL_DEBUG
bool		XLOG_DEBUG = false;
#endif

143
/*
144
 * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
145
 * preallocated XLOG segments --- we try to have at least XLOGfiles advance
Bruce Momjian's avatar
Bruce Momjian committed
146
 * segments but no more than XLOGfileslop segments.  This could
147 148 149 150 151 152 153 154 155 156
 * be made a separate GUC variable, but at present I think it's sufficient
 * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
 * checkpoint will free no more than 2*CheckPointSegments log segments, and
 * we want to recycle all of them; the +1 allows boundary cases to happen
 * without wasting a delete/create-segment cycle.
 */

#define XLOGfileslop	(2*CheckPointSegments + 1)


157
/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
158
int			sync_method = DEFAULT_SYNC_METHOD;
159 160 161 162
static int	open_sync_bit = DEFAULT_SYNC_FLAGBIT;

#define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)

Tom Lane's avatar
Tom Lane committed
163 164

/*
165 166
 * ThisTimeLineID will be same in all backends --- it identifies current
 * WAL timeline for the database system.
Tom Lane's avatar
Tom Lane committed
167
 */
168
TimeLineID	ThisTimeLineID = 0;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
169

170
/* Are we doing recovery from XLOG? */
Tom Lane's avatar
Tom Lane committed
171
bool		InRecovery = false;
Bruce Momjian's avatar
Bruce Momjian committed
172

173
/* Are we recovering using offline XLOG archives? */
Bruce Momjian's avatar
Bruce Momjian committed
174 175
static bool InArchiveRecovery = false;

176
/* Was the last xlog file restored from archive, or local? */
Bruce Momjian's avatar
Bruce Momjian committed
177
static bool restoredFromArchive = false;
178

179 180
/* options taken from recovery.conf */
static char *recoveryRestoreCommand = NULL;
181 182 183
static bool recoveryTarget = false;
static bool recoveryTargetExact = false;
static bool recoveryTargetInclusive = true;
Bruce Momjian's avatar
Bruce Momjian committed
184 185
static TransactionId recoveryTargetXid;
static time_t recoveryTargetTime;
186

187
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
Bruce Momjian's avatar
Bruce Momjian committed
188 189 190
static TransactionId recoveryStopXid;
static time_t recoveryStopTime;
static bool recoveryStopAfter;
191

192
/* constraint set by read_backup_label */
Bruce Momjian's avatar
Bruce Momjian committed
193
static XLogRecPtr recoveryMinXlogOffset = {0, 0};
194

195 196 197 198 199 200 201 202 203 204 205 206
/*
 * During normal operation, the only timeline we care about is ThisTimeLineID.
 * During recovery, however, things are more complicated.  To simplify life
 * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
 * scan through the WAL history (that is, it is the line that was active when
 * the currently-scanned WAL record was generated).  We also need these
 * timeline values:
 *
 * recoveryTargetTLI: the desired timeline that we want to end in.
 *
 * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
 * its known parents, newest first (so recoveryTargetTLI is always the
Bruce Momjian's avatar
Bruce Momjian committed
207
 * first list member).	Only these TLIs are expected to be seen in the WAL
208 209 210 211 212 213 214 215 216
 * segments we read, and indeed only these TLIs will be considered as
 * candidate WAL files to open at all.
 *
 * curFileTLI: the TLI appearing in the name of the current input WAL file.
 * (This is not necessarily the same as ThisTimeLineID, because we could
 * be scanning data that was copied from an ancestor timeline when the current
 * file was created.)  During a sequential scan we do not allow this value
 * to decrease.
 */
Bruce Momjian's avatar
Bruce Momjian committed
217 218 219
static TimeLineID recoveryTargetTLI;
static List *expectedTLIs;
static TimeLineID curFileTLI;
220

Tom Lane's avatar
Tom Lane committed
221 222
/*
 * MyLastRecPtr points to the start of the last XLOG record inserted by the
223 224
 * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
 * xact hasn't yet inserted any transaction-controlled XLOG records.
Tom Lane's avatar
Tom Lane committed
225 226
 *
 * Note that XLOG records inserted outside transaction control are not
227
 * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
Bruce Momjian's avatar
Bruce Momjian committed
228
 * to be set true.	The latter can be used to test whether the current xact
229 230
 * made any loggable changes (including out-of-xact changes, such as
 * sequence updates).
231 232 233
 *
 * When we insert/update/delete a tuple in a temporary relation, we do not
 * make any XLOG record, since we don't care about recovering the state of
Bruce Momjian's avatar
Bruce Momjian committed
234
 * the temp rel after a crash.	However, we will still need to remember
235 236 237
 * whether our transaction committed or aborted in that case.  So, we must
 * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
 * interest later.
Tom Lane's avatar
Tom Lane committed
238 239
 */
XLogRecPtr	MyLastRecPtr = {0, 0};
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
240

241 242
bool		MyXactMadeXLogEntry = false;

243 244
bool		MyXactMadeTempRelUpdate = false;

Tom Lane's avatar
Tom Lane committed
245 246 247
/*
 * ProcLastRecPtr points to the start of the last XLOG record inserted by the
 * current backend.  It is updated for all inserts, transaction-controlled
Bruce Momjian's avatar
Bruce Momjian committed
248
 * or not.	ProcLastRecEnd is similar but points to end+1 of last record.
Tom Lane's avatar
Tom Lane committed
249 250
 */
static XLogRecPtr ProcLastRecPtr = {0, 0};
251

252 253
XLogRecPtr	ProcLastRecEnd = {0, 0};

Tom Lane's avatar
Tom Lane committed
254 255 256
/*
 * RedoRecPtr is this backend's local copy of the REDO record pointer
 * (which is almost but not quite the same as a pointer to the most recent
257
 * CHECKPOINT record).	We update this from the shared-memory copy,
Tom Lane's avatar
Tom Lane committed
258
 * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
Bruce Momjian's avatar
Bruce Momjian committed
259
 * hold the Insert lock).  See XLogInsert for details.	We are also allowed
260
 * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
261 262
 * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
 * InitXLOGAccess.
Tom Lane's avatar
Tom Lane committed
263
 */
264
static XLogRecPtr RedoRecPtr;
265

Tom Lane's avatar
Tom Lane committed
266 267 268 269 270 271 272 273 274
/*----------
 * Shared-memory data structures for XLOG control
 *
 * LogwrtRqst indicates a byte position that we need to write and/or fsync
 * the log up to (all records before that point must be written or fsynced).
 * LogwrtResult indicates the byte positions we have already written/fsynced.
 * These structs are identical but are declared separately to indicate their
 * slightly different functions.
 *
275
 * We do a lot of pushups to minimize the amount of access to lockable
Tom Lane's avatar
Tom Lane committed
276 277 278
 * shared memory values.  There are actually three shared-memory copies of
 * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
 *		XLogCtl->LogwrtResult is protected by info_lck
279 280 281 282
 *		XLogCtl->Write.LogwrtResult is protected by WALWriteLock
 *		XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
 * One must hold the associated lock to read or write any of these, but
 * of course no lock is needed to read/write the unshared LogwrtResult.
Tom Lane's avatar
Tom Lane committed
283 284 285
 *
 * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
 * right", since both are updated by a write or flush operation before
286 287
 * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
 * is that it can be examined/modified by code that already holds WALWriteLock
Tom Lane's avatar
Tom Lane committed
288 289 290
 * without needing to grab info_lck as well.
 *
 * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
291
 * but is updated when convenient.	Again, it exists for the convenience of
292
 * code that is already holding WALInsertLock but not the other locks.
Tom Lane's avatar
Tom Lane committed
293 294 295 296 297 298 299 300 301 302
 *
 * The unshared LogwrtResult may lag behind any or all of these, and again
 * is updated when convenient.
 *
 * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
 * (protected by info_lck), but we don't need to cache any copies of it.
 *
 * Note that this all works because the request and result positions can only
 * advance forward, never back up, and so we can easily determine which of two
 * values is "more up to date".
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
 *
 * info_lck is only held long enough to read/update the protected variables,
 * so it's a plain spinlock.  The other locks are held longer (potentially
 * over I/O operations), so we use LWLocks for them.  These locks are:
 *
 * WALInsertLock: must be held to insert a record into the WAL buffers.
 *
 * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
 * XLogFlush).
 *
 * ControlFileLock: must be held to read/update control file or create
 * new log file.
 *
 * CheckpointLock: must be held to do a checkpoint (ensures only one
 * checkpointer at a time; even though the postmaster won't launch
 * parallel checkpoint processes, we need this because manual checkpoints
 * could be launched simultaneously).
 *
Tom Lane's avatar
Tom Lane committed
321 322
 *----------
 */
323

Tom Lane's avatar
Tom Lane committed
324
typedef struct XLogwrtRqst
325
{
Tom Lane's avatar
Tom Lane committed
326 327
	XLogRecPtr	Write;			/* last byte + 1 to write out */
	XLogRecPtr	Flush;			/* last byte + 1 to flush */
328
} XLogwrtRqst;
329

330 331 332 333 334 335
typedef struct XLogwrtResult
{
	XLogRecPtr	Write;			/* last byte + 1 written out */
	XLogRecPtr	Flush;			/* last byte + 1 flushed */
} XLogwrtResult;

Tom Lane's avatar
Tom Lane committed
336 337 338
/*
 * Shared state data for XLogInsert.
 */
339 340
typedef struct XLogCtlInsert
{
341 342
	XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
343
	int			curridx;		/* current block index in cache */
344 345 346
	XLogPageHeader currpage;	/* points to header of block in cache */
	char	   *currpos;		/* current insertion point in cache */
	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
347
	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
348 349
} XLogCtlInsert;

Tom Lane's avatar
Tom Lane committed
350 351 352
/*
 * Shared state data for XLogWrite/XLogFlush.
 */
353 354
typedef struct XLogCtlWrite
{
355
	XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
356
	int			curridx;		/* cache index of next block to write */
357 358
} XLogCtlWrite;

Tom Lane's avatar
Tom Lane committed
359 360 361
/*
 * Total shared-memory state for XLOG.
 */
362 363
typedef struct XLogCtlData
{
364
	/* Protected by WALInsertLock: */
365
	XLogCtlInsert Insert;
Tom Lane's avatar
Tom Lane committed
366
	/* Protected by info_lck: */
367 368
	XLogwrtRqst LogwrtRqst;
	XLogwrtResult LogwrtResult;
369
	/* Protected by WALWriteLock: */
370 371
	XLogCtlWrite Write;

Tom Lane's avatar
Tom Lane committed
372
	/*
373 374 375
	 * These values do not change after startup, although the pointed-to pages
	 * and xlblocks values certainly do.  Permission to read/write the pages
	 * and xlblocks values depends on WALInsertLock and WALWriteLock.
Tom Lane's avatar
Tom Lane committed
376
	 */
377
	char	   *pages;			/* buffers for unwritten XLOG pages */
378
	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
379 380
	Size		XLogCacheByte;	/* # bytes in xlog buffers */
	int			XLogCacheBlck;	/* highest allocated xlog buffer index */
381
	TimeLineID	ThisTimeLineID;
Tom Lane's avatar
Tom Lane committed
382

383
	slock_t		info_lck;		/* locks shared LogwrtRqst/LogwrtResult */
384 385
} XLogCtlData;

386
static XLogCtlData *XLogCtl = NULL;
387

388
/*
Tom Lane's avatar
Tom Lane committed
389
 * We maintain an image of pg_control in shared memory.
390
 */
391
static ControlFileData *ControlFile = NULL;
392

Tom Lane's avatar
Tom Lane committed
393 394 395 396 397
/*
 * Macros for managing XLogInsert state.  In most cases, the calling routine
 * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
 * so these are passed as parameters instead of being fetched via XLogCtl.
 */
398

Tom Lane's avatar
Tom Lane committed
399 400
/* Free space remaining in the current xlog page buffer */
#define INSERT_FREESPACE(Insert)  \
401
	(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
Tom Lane's avatar
Tom Lane committed
402 403 404 405 406 407

/* Construct XLogRecPtr value for current insertion point */
#define INSERT_RECPTR(recptr,Insert,curridx)  \
	( \
	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
	  (recptr).xrecoff = \
408
		XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
Tom Lane's avatar
Tom Lane committed
409 410 411 412 413 414 415
	)

#define PrevBufIdx(idx)		\
		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))

#define NextBufIdx(idx)		\
		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
416

Tom Lane's avatar
Tom Lane committed
417 418 419 420
/*
 * Private, possibly out-of-date copy of shared LogwrtResult.
 * See discussion above.
 */
421
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
422

Tom Lane's avatar
Tom Lane committed
423 424 425 426 427 428 429 430 431 432
/*
 * openLogFile is -1 or a kernel FD for an open log file segment.
 * When it's open, openLogOff is the current seek offset in the file.
 * openLogId/openLogSeg identify the segment.  These variables are only
 * used to write the XLOG, and so will normally refer to the active segment.
 */
static int	openLogFile = -1;
static uint32 openLogId = 0;
static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
433

Tom Lane's avatar
Tom Lane committed
434 435 436 437 438 439
/*
 * These variables are used similarly to the ones above, but for reading
 * the XLOG.  Note, however, that readOff generally represents the offset
 * of the page just read, not the seek position of the FD itself, which
 * will be just past that page.
 */
440 441 442 443
static int	readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
444

445
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
Tom Lane's avatar
Tom Lane committed
446
static char *readBuf = NULL;
447

448 449 450 451
/* Buffer for current ReadRecord result (expandable) */
static char *readRecordBuf = NULL;
static uint32 readRecordBufSize = 0;

Tom Lane's avatar
Tom Lane committed
452
/* State information for XLOG reading */
453 454
static XLogRecPtr ReadRecPtr;	/* start of last record read */
static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
455
static XLogRecord *nextRecord = NULL;
456
static TimeLineID lastPageTLI = 0;
457

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
458 459
static bool InRedo = false;

460

461 462
static void XLogArchiveNotify(const char *xlog);
static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
463
static bool XLogArchiveCheckDone(const char *xlog);
464 465
static void XLogArchiveCleanup(const char *xlog);
static void readRecoveryCommandFile(void);
466
static void exitArchiveRecovery(TimeLineID endTLI,
Bruce Momjian's avatar
Bruce Momjian committed
467
					uint32 endLogId, uint32 endLogSeg);
468
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
Tom Lane's avatar
Tom Lane committed
469

470
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
471
				XLogRecPtr *lsn, BkpBlock *bkpb);
Tom Lane's avatar
Tom Lane committed
472
static bool AdvanceXLInsertBuffer(void);
473
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
474 475
static int XLogFileInit(uint32 log, uint32 seg,
			 bool *use_existent, bool use_lock);
476 477
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
					   bool find_free, int *max_advance,
478
					   bool use_lock);
479 480
static int	XLogFileOpen(uint32 log, uint32 seg);
static int	XLogFileRead(uint32 log, uint32 seg, int emode);
481
static void	XLogFileClose(void);
482
static bool RestoreArchivedFile(char *path, const char *xlogfname,
Bruce Momjian's avatar
Bruce Momjian committed
483
					const char *recovername, off_t expectedSize);
484 485
static int	PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
486
				int *nsegsremoved, int *nsegsrecycled);
487
static void CleanupBackupHistory(void);
488
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
489
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
490
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
491 492 493 494
static List *readTimeLineHistory(TimeLineID targetTLI);
static bool existsTimeLineHistory(TimeLineID probeTLI);
static TimeLineID findNewestTimeLine(TimeLineID startTLI);
static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
Bruce Momjian's avatar
Bruce Momjian committed
495 496
					 TimeLineID endTLI,
					 uint32 endLogId, uint32 endLogSeg);
Tom Lane's avatar
Tom Lane committed
497 498 499
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(time_t tnow);
500
static void issue_xlog_fsync(void);
Bruce Momjian's avatar
Bruce Momjian committed
501

502
#ifdef WAL_DEBUG
503
static void xlog_outrec(StringInfo buf, XLogRecord *record);
504
#endif
505 506
static bool read_backup_label(XLogRecPtr *checkPointLoc);
static void remove_backup_label(void);
507
static void rm_redo_error_callback(void *arg);
Tom Lane's avatar
Tom Lane committed
508 509 510 511 512


/*
 * Insert an XLOG record having the specified RMID and info bytes,
 * with the body of the record being the data chunk(s) described by
513
 * the rdata chain (see xlog.h for notes about rdata).
Tom Lane's avatar
Tom Lane committed
514 515 516 517 518 519 520 521 522 523 524
 *
 * Returns XLOG pointer to end of record (beginning of next record).
 * This can be used as LSN for data pages affected by the logged action.
 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
 * before the data page can be written out.  This implements the basic
 * WAL rule "write the log before the data".)
 *
 * NB: this routine feels free to scribble on the XLogRecData structs,
 * though not on the data they reference.  This is OK since the XLogRecData
 * structs are always just temporaries in the calling code.
 */
525
XLogRecPtr
526
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
527
{
528 529
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogRecord *record;
Tom Lane's avatar
Tom Lane committed
530
	XLogContRecord *contrecord;
531 532 533
	XLogRecPtr	RecPtr;
	XLogRecPtr	WriteRqst;
	uint32		freespace;
534
	int			curridx;
535 536 537 538 539
	XLogRecData *rdt;
	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
	XLogRecPtr	dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
540 541 542 543
	XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
	XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
	XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
	pg_crc32	rdata_crc;
544 545 546
	uint32		len,
				write_len;
	unsigned	i;
547
	XLogwrtRqst LogwrtRqst;
548
	bool		updrqst;
549
	bool		doPageWrites;
550
	bool		no_tran = (rmid == RM_XLOG_ID) ? true : false;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
551 552 553 554

	if (info & XLR_INFO_MASK)
	{
		if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
555
			elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
556 557 558 559
		no_tran = true;
		info &= ~XLR_INFO_MASK;
	}

Tom Lane's avatar
Tom Lane committed
560
	/*
561 562
	 * In bootstrap mode, we don't actually log anything but XLOG resources;
	 * return a phony record pointer.
Tom Lane's avatar
Tom Lane committed
563
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
564
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
565 566
	{
		RecPtr.xlogid = 0;
567
		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
568
		return RecPtr;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
569 570
	}

Tom Lane's avatar
Tom Lane committed
571
	/*
572
	 * Here we scan the rdata chain, determine which buffers must be backed
Tom Lane's avatar
Tom Lane committed
573
	 * up, and compute the CRC values for the data.  Note that the record
574 575 576 577
	 * header isn't added into the CRC initially since we don't know the final
	 * length or info bits quite yet.  Thus, the CRC will represent the CRC of
	 * the whole record in the order "rdata, then backup blocks, then record
	 * header".
Tom Lane's avatar
Tom Lane committed
578
	 *
579 580 581 582 583
	 * We may have to loop back to here if a race condition is detected below.
	 * We could prevent the race by doing all this work while holding the
	 * insert lock, but it seems better to avoid doing CRC calculations while
	 * holding the lock.  This means we have to be careful about modifying the
	 * rdata chain until we know we aren't going to loop back again.  The only
584 585 586 587 588
	 * change we allow ourselves to make earlier is to set rdt->data = NULL in
	 * chain items we have decided we will have to back up the whole buffer
	 * for.  This is OK because we will certainly decide the same thing again
	 * for those items if we do it over; doing it here saves an extra pass
	 * over the chain later.
Tom Lane's avatar
Tom Lane committed
589
	 */
590
begin:;
Tom Lane's avatar
Tom Lane committed
591 592 593 594 595 596
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		dtbuf[i] = InvalidBuffer;
		dtbuf_bkp[i] = false;
	}

597 598 599 600 601 602 603 604
	/*
	 * Decide if we need to do full-page writes in this XLOG record: true if
	 * full_page_writes is on or we have a PITR request for it.  Since we
	 * don't yet have the insert lock, forcePageWrites could change under us,
	 * but we'll recheck it once we have the lock.
	 */
	doPageWrites = fullPageWrites || Insert->forcePageWrites;

605
	INIT_CRC32(rdata_crc);
Tom Lane's avatar
Tom Lane committed
606
	len = 0;
607
	for (rdt = rdata;;)
608 609 610
	{
		if (rdt->buffer == InvalidBuffer)
		{
Tom Lane's avatar
Tom Lane committed
611
			/* Simple data, just include it */
612
			len += rdt->len;
613
			COMP_CRC32(rdata_crc, rdt->data, rdt->len);
614
		}
Tom Lane's avatar
Tom Lane committed
615
		else
616
		{
Tom Lane's avatar
Tom Lane committed
617 618
			/* Find info for buffer */
			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
619
			{
Tom Lane's avatar
Tom Lane committed
620
				if (rdt->buffer == dtbuf[i])
621
				{
622
					/* Buffer already referenced by earlier chain item */
Tom Lane's avatar
Tom Lane committed
623 624 625 626 627
					if (dtbuf_bkp[i])
						rdt->data = NULL;
					else if (rdt->data)
					{
						len += rdt->len;
628
						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
Tom Lane's avatar
Tom Lane committed
629 630
					}
					break;
631
				}
Tom Lane's avatar
Tom Lane committed
632
				if (dtbuf[i] == InvalidBuffer)
633
				{
Tom Lane's avatar
Tom Lane committed
634 635
					/* OK, put it in this slot */
					dtbuf[i] = rdt->buffer;
636 637
					if (XLogCheckBuffer(rdt, doPageWrites,
										&(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
Tom Lane's avatar
Tom Lane committed
638 639 640 641 642 643 644
					{
						dtbuf_bkp[i] = true;
						rdt->data = NULL;
					}
					else if (rdt->data)
					{
						len += rdt->len;
645
						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
Tom Lane's avatar
Tom Lane committed
646 647
					}
					break;
648 649
				}
			}
Tom Lane's avatar
Tom Lane committed
650
			if (i >= XLR_MAX_BKP_BLOCKS)
651
				elog(PANIC, "can backup at most %d blocks per xlog record",
Tom Lane's avatar
Tom Lane committed
652
					 XLR_MAX_BKP_BLOCKS);
653
		}
654
		/* Break out of loop when rdt points to last chain item */
655 656 657 658 659
		if (rdt->next == NULL)
			break;
		rdt = rdt->next;
	}

660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
	/*
	 * Now add the backup block headers and data into the CRC
	 */
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		if (dtbuf_bkp[i])
		{
			BkpBlock   *bkpb = &(dtbuf_xlg[i]);
			char	   *page;

			COMP_CRC32(rdata_crc,
					   (char *) bkpb,
					   sizeof(BkpBlock));
			page = (char *) BufferGetBlock(dtbuf[i]);
			if (bkpb->hole_length == 0)
			{
				COMP_CRC32(rdata_crc,
						   page,
						   BLCKSZ);
			}
			else
			{
				/* must skip the hole */
				COMP_CRC32(rdata_crc,
						   page,
						   bkpb->hole_offset);
				COMP_CRC32(rdata_crc,
						   page + (bkpb->hole_offset + bkpb->hole_length),
						   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
			}
		}
	}

Tom Lane's avatar
Tom Lane committed
693
	/*
694 695 696 697 698 699
	 * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
	 * of the rmgr data might have been suppressed in favor of backup blocks.
	 * Currently, all callers of XLogInsert provide at least some
	 * not-in-a-buffer data and so len == 0 should never happen, but that may
	 * not be true forever.  If you need to remove the len == 0 check, also
	 * remove the check for xl_len == 0 in ReadRecord, below.
Tom Lane's avatar
Tom Lane committed
700
	 */
701
	if (len == 0)
702
		elog(PANIC, "invalid xlog record length %u", len);
703

704
	START_CRIT_SECTION();
705

706
	/* update LogwrtResult before doing cache fill check */
707 708 709 710
	{
		/* use volatile pointer to prevent code rearrangement */
		volatile XLogCtlData *xlogctl = XLogCtl;

711
		SpinLockAcquire(&xlogctl->info_lck);
712 713
		LogwrtRqst = xlogctl->LogwrtRqst;
		LogwrtResult = xlogctl->LogwrtResult;
714
		SpinLockRelease(&xlogctl->info_lck);
715
	}
716

717
	/*
718 719
	 * If cache is half filled then try to acquire write lock and do
	 * XLogWrite. Ignore any fractional blocks in performing this check.
720
	 */
721
	LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
722 723 724
	if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
		(LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
		 XLogCtl->XLogCacheByte / 2))
Tom Lane's avatar
Tom Lane committed
725
	{
726
		if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
727
		{
728 729 730 731
			/*
			 * Since the amount of data we write here is completely optional
			 * anyway, tell XLogWrite it can be "flexible" and stop at a
			 * convenient boundary.  This allows writes triggered by this
732 733 734
			 * mechanism to synchronize with the cache boundaries, so that in
			 * a long transaction we'll basically dump alternating halves of
			 * the buffer array.
735
			 */
736 737
			LogwrtResult = XLogCtl->Write.LogwrtResult;
			if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
738
				XLogWrite(LogwrtRqst, true);
739
			LWLockRelease(WALWriteLock);
740 741 742
		}
	}

743 744 745
	/* Now wait to get insert lock */
	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);

Tom Lane's avatar
Tom Lane committed
746
	/*
747 748 749
	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
	 * back and recompute everything.  This can only happen just after a
	 * checkpoint, so it's better to be slow in this case and fast otherwise.
750 751 752 753
	 *
	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
	 * affect the contents of the XLOG record, so we'll update our local
	 * copy but not force a recomputation.
Tom Lane's avatar
Tom Lane committed
754 755
	 */
	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
756
	{
Tom Lane's avatar
Tom Lane committed
757 758 759
		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
		RedoRecPtr = Insert->RedoRecPtr;

760
		if (doPageWrites)
761
		{
762
			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
Tom Lane's avatar
Tom Lane committed
763
			{
764 765 766 767 768 769 770 771 772 773 774 775 776
				if (dtbuf[i] == InvalidBuffer)
					continue;
				if (dtbuf_bkp[i] == false &&
					XLByteLE(dtbuf_lsn[i], RedoRecPtr))
				{
					/*
					 * Oops, this buffer now needs to be backed up, but we
					 * didn't think so above.  Start over.
					 */
					LWLockRelease(WALInsertLock);
					END_CRIT_SECTION();
					goto begin;
				}
Tom Lane's avatar
Tom Lane committed
777
			}
778 779 780
		}
	}

781 782 783 784 785 786 787 788 789 790 791 792 793 794
	/*
	 * Also check to see if forcePageWrites was just turned on; if we
	 * weren't already doing full-page writes then go back and recompute.
	 * (If it was just turned off, we could recompute the record without
	 * full pages, but we choose not to bother.)
	 */
	if (Insert->forcePageWrites && !doPageWrites)
	{
		/* Oops, must redo it with full-page data */
		LWLockRelease(WALInsertLock);
		END_CRIT_SECTION();
		goto begin;
	}

Tom Lane's avatar
Tom Lane committed
795
	/*
796 797 798 799
	 * Make additional rdata chain entries for the backup blocks, so that we
	 * don't need to special-case them in the write loop.  Note that we have
	 * now irrevocably changed the input rdata chain.  At the exit of this
	 * loop, write_len includes the backup block data.
Tom Lane's avatar
Tom Lane committed
800
	 *
801 802 803
	 * Also set the appropriate info bits to show which buffers were backed
	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
	 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
Tom Lane's avatar
Tom Lane committed
804 805 806
	 */
	write_len = len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
807
	{
808 809 810
		BkpBlock   *bkpb;
		char	   *page;

811
		if (!dtbuf_bkp[i])
812 813
			continue;

Tom Lane's avatar
Tom Lane committed
814
		info |= XLR_SET_BKP_BLOCK(i);
815

816 817 818 819 820
		bkpb = &(dtbuf_xlg[i]);
		page = (char *) BufferGetBlock(dtbuf[i]);

		rdt->next = &(dtbuf_rdt1[i]);
		rdt = rdt->next;
821

822 823
		rdt->data = (char *) bkpb;
		rdt->len = sizeof(BkpBlock);
Tom Lane's avatar
Tom Lane committed
824
		write_len += sizeof(BkpBlock);
825

826 827
		rdt->next = &(dtbuf_rdt2[i]);
		rdt = rdt->next;
828

829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
		if (bkpb->hole_length == 0)
		{
			rdt->data = page;
			rdt->len = BLCKSZ;
			write_len += BLCKSZ;
			rdt->next = NULL;
		}
		else
		{
			/* must skip the hole */
			rdt->data = page;
			rdt->len = bkpb->hole_offset;
			write_len += bkpb->hole_offset;

			rdt->next = &(dtbuf_rdt3[i]);
			rdt = rdt->next;

			rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
			rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
			write_len += rdt->len;
			rdt->next = NULL;
		}
851 852
	}

853
	/*
854
	 * If there isn't enough space on the current XLOG page for a record
855
	 * header, advance to the next page (leaving the unused space as zeroes).
856
	 */
Tom Lane's avatar
Tom Lane committed
857 858
	updrqst = false;
	freespace = INSERT_FREESPACE(Insert);
859 860
	if (freespace < SizeOfXLogRecord)
	{
Tom Lane's avatar
Tom Lane committed
861
		updrqst = AdvanceXLInsertBuffer();
862 863 864
		freespace = INSERT_FREESPACE(Insert);
	}

Tom Lane's avatar
Tom Lane committed
865
	curridx = Insert->curridx;
866
	record = (XLogRecord *) Insert->currpos;
Tom Lane's avatar
Tom Lane committed
867

868 869
	/* Insert record header */

870
	record->xl_prev = Insert->PrevRecord;
871
	record->xl_xid = GetCurrentTransactionIdIfAny();
872
	record->xl_tot_len = SizeOfXLogRecord + write_len;
Tom Lane's avatar
Tom Lane committed
873
	record->xl_len = len;		/* doesn't include backup blocks */
874
	record->xl_info = info;
875
	record->xl_rmid = rmid;
876

877 878 879 880
	/* Now we can finish computing the record's CRC */
	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
			   SizeOfXLogRecord - sizeof(pg_crc32));
	FIN_CRC32(rdata_crc);
881 882
	record->xl_crc = rdata_crc;

Tom Lane's avatar
Tom Lane committed
883 884 885
	/* Compute record's XLOG location */
	INSERT_RECPTR(RecPtr, Insert, curridx);

886
#ifdef WAL_DEBUG
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
887 888
	if (XLOG_DEBUG)
	{
889
		StringInfoData	buf;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
890

891 892 893 894
		initStringInfo(&buf);
		appendStringInfo(&buf, "INSERT @ %X/%X: ", 
							RecPtr.xlogid, RecPtr.xrecoff);
		xlog_outrec(&buf, record);
895
		if (rdata->data != NULL)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
896
		{
897 898
			appendStringInfo(&buf, " - ");
			RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
899
		}
900 901
		elog(LOG, "%s", buf.data);
		pfree(buf.data);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
902
	}
903
#endif
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
904

Tom Lane's avatar
Tom Lane committed
905 906 907 908 909
	/* Record begin of record in appropriate places */
	if (!no_tran)
		MyLastRecPtr = RecPtr;
	ProcLastRecPtr = RecPtr;
	Insert->PrevRecord = RecPtr;
910
	MyXactMadeXLogEntry = true;
Tom Lane's avatar
Tom Lane committed
911

912
	Insert->currpos += SizeOfXLogRecord;
Tom Lane's avatar
Tom Lane committed
913
	freespace -= SizeOfXLogRecord;
914

Tom Lane's avatar
Tom Lane committed
915 916 917 918
	/*
	 * Append the data, including backup blocks if any
	 */
	while (write_len)
919
	{
920 921 922 923
		while (rdata->data == NULL)
			rdata = rdata->next;

		if (freespace > 0)
924
		{
925 926 927 928 929
			if (rdata->len > freespace)
			{
				memcpy(Insert->currpos, rdata->data, freespace);
				rdata->data += freespace;
				rdata->len -= freespace;
Tom Lane's avatar
Tom Lane committed
930
				write_len -= freespace;
931 932 933 934 935
			}
			else
			{
				memcpy(Insert->currpos, rdata->data, rdata->len);
				freespace -= rdata->len;
Tom Lane's avatar
Tom Lane committed
936
				write_len -= rdata->len;
937 938 939 940
				Insert->currpos += rdata->len;
				rdata = rdata->next;
				continue;
			}
941 942
		}

943
		/* Use next buffer */
Tom Lane's avatar
Tom Lane committed
944 945 946 947 948 949 950
		updrqst = AdvanceXLInsertBuffer();
		curridx = Insert->curridx;
		/* Insert cont-record header */
		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
		contrecord = (XLogContRecord *) Insert->currpos;
		contrecord->xl_rem_len = write_len;
		Insert->currpos += SizeOfXLogContRecord;
951
		freespace = INSERT_FREESPACE(Insert);
952
	}
953

Tom Lane's avatar
Tom Lane committed
954 955
	/* Ensure next record will be properly aligned */
	Insert->currpos = (char *) Insert->currpage +
956
		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
Tom Lane's avatar
Tom Lane committed
957
	freespace = INSERT_FREESPACE(Insert);
958

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
959
	/*
960 961
	 * The recptr I return is the beginning of the *next* record. This will be
	 * stored as LSN for changed data pages...
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
962
	 */
Tom Lane's avatar
Tom Lane committed
963
	INSERT_RECPTR(RecPtr, Insert, curridx);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
964

Tom Lane's avatar
Tom Lane committed
965
	/* Need to update shared LogwrtRqst if some block was filled up */
966
	if (freespace < SizeOfXLogRecord)
967 968
		updrqst = true;			/* curridx is filled and available for writing
								 * out */
969 970
	else
		curridx = PrevBufIdx(curridx);
Tom Lane's avatar
Tom Lane committed
971
	WriteRqst = XLogCtl->xlblocks[curridx];
972

973
	LWLockRelease(WALInsertLock);
974 975 976

	if (updrqst)
	{
977 978 979
		/* use volatile pointer to prevent code rearrangement */
		volatile XLogCtlData *xlogctl = XLogCtl;

980
		SpinLockAcquire(&xlogctl->info_lck);
Tom Lane's avatar
Tom Lane committed
981
		/* advance global request to include new block(s) */
982 983
		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
			xlogctl->LogwrtRqst.Write = WriteRqst;
Tom Lane's avatar
Tom Lane committed
984
		/* update local result copy while I have the chance */
985
		LogwrtResult = xlogctl->LogwrtResult;
986
		SpinLockRelease(&xlogctl->info_lck);
987 988
	}

989 990
	ProcLastRecEnd = RecPtr;

991
	END_CRIT_SECTION();
992

993
	return RecPtr;
994
}
995

996
/*
997 998 999
 * Determine whether the buffer referenced by an XLogRecData item has to
 * be backed up, and if so fill a BkpBlock struct for it.  In any case
 * save the buffer's LSN at *lsn.
1000
 */
1001
static bool
1002
XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1003
				XLogRecPtr *lsn, BkpBlock *bkpb)
1004 1005
{
	PageHeader	page;
1006 1007 1008 1009

	page = (PageHeader) BufferGetBlock(rdata->buffer);

	/*
1010 1011 1012
	 * XXX We assume page LSN is first data on *every* page that can be passed
	 * to XLogInsert, whether it otherwise has the standard page layout or
	 * not.
1013 1014 1015
	 */
	*lsn = page->pd_lsn;

1016
	if (doPageWrites &&
1017
		XLByteLE(page->pd_lsn, RedoRecPtr))
1018
	{
1019 1020 1021 1022 1023
		/*
		 * The page needs to be backed up, so set up *bkpb
		 */
		bkpb->node = BufferGetFileNode(rdata->buffer);
		bkpb->block = BufferGetBlockNumber(rdata->buffer);
1024

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
		if (rdata->buffer_std)
		{
			/* Assume we can omit data between pd_lower and pd_upper */
			uint16		lower = page->pd_lower;
			uint16		upper = page->pd_upper;

			if (lower >= SizeOfPageHeaderData &&
				upper > lower &&
				upper <= BLCKSZ)
			{
				bkpb->hole_offset = lower;
				bkpb->hole_length = upper - lower;
			}
			else
			{
				/* No "hole" to compress out */
				bkpb->hole_offset = 0;
				bkpb->hole_length = 0;
			}
		}
		else
		{
			/* Not a standard page header, don't try to eliminate "hole" */
			bkpb->hole_offset = 0;
			bkpb->hole_length = 0;
		}
1051

1052
		return true;			/* buffer requires backup */
1053
	}
1054 1055

	return false;				/* buffer does not need to be backed up */
1056 1057
}

1058 1059 1060 1061 1062 1063
/*
 * XLogArchiveNotify
 *
 * Create an archive notification file
 *
 * The name of the notification file is the message that will be picked up
1064
 * by the archiver, e.g. we write 0000000100000001000000C6.ready
1065
 * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1066
 * then when complete, rename it to 0000000100000001000000C6.done
1067 1068 1069 1070 1071
 */
static void
XLogArchiveNotify(const char *xlog)
{
	char		archiveStatusPath[MAXPGPATH];
Bruce Momjian's avatar
Bruce Momjian committed
1072
	FILE	   *fd;
1073 1074 1075 1076

	/* insert an otherwise empty file called <XLOG>.ready */
	StatusFilePath(archiveStatusPath, xlog, ".ready");
	fd = AllocateFile(archiveStatusPath, "w");
Bruce Momjian's avatar
Bruce Momjian committed
1077 1078
	if (fd == NULL)
	{
1079 1080 1081 1082 1083 1084
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not create archive status file \"%s\": %m",
						archiveStatusPath)));
		return;
	}
Bruce Momjian's avatar
Bruce Momjian committed
1085 1086
	if (FreeFile(fd))
	{
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not write archive status file \"%s\": %m",
						archiveStatusPath)));
		return;
	}

	/* Notify archiver that it's got something to do */
	if (IsUnderPostmaster)
		SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
}

/*
 * Convenience routine to notify using log/seg representation of filename
 */
static void
XLogArchiveNotifySeg(uint32 log, uint32 seg)
{
	char		xlog[MAXFNAMELEN];

1107
	XLogFileName(xlog, ThisTimeLineID, log, seg);
1108 1109 1110 1111
	XLogArchiveNotify(xlog);
}

/*
1112
 * XLogArchiveCheckDone
1113
 *
1114 1115 1116 1117
 * This is called when we are ready to delete or recycle an old XLOG segment
 * file or backup history file.  If it is okay to delete it then return true.
 * If it is not time to delete it, make sure a .ready file exists, and return
 * false.
1118 1119
 *
 * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1120 1121 1122 1123
 * then return false; else create <XLOG>.ready and return false.
 *
 * The reason we do things this way is so that if the original attempt to
 * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1124 1125
 */
static bool
1126
XLogArchiveCheckDone(const char *xlog)
1127 1128 1129 1130
{
	char		archiveStatusPath[MAXPGPATH];
	struct stat stat_buf;

1131 1132 1133 1134 1135
	/* Always deletable if archiving is off */
	if (!XLogArchivingActive())
		return true;

	/* First check for .done --- this means archiver is done with it */
1136 1137 1138 1139 1140 1141 1142
	StatusFilePath(archiveStatusPath, xlog, ".done");
	if (stat(archiveStatusPath, &stat_buf) == 0)
		return true;

	/* check for .ready --- this means archiver is still busy with it */
	StatusFilePath(archiveStatusPath, xlog, ".ready");
	if (stat(archiveStatusPath, &stat_buf) == 0)
Bruce Momjian's avatar
Bruce Momjian committed
1143
		return false;
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157

	/* Race condition --- maybe archiver just finished, so recheck */
	StatusFilePath(archiveStatusPath, xlog, ".done");
	if (stat(archiveStatusPath, &stat_buf) == 0)
		return true;

	/* Retry creation of the .ready file */
	XLogArchiveNotify(xlog);
	return false;
}

/*
 * XLogArchiveCleanup
 *
1158
 * Cleanup archive notification file(s) for a particular xlog segment
1159 1160 1161 1162
 */
static void
XLogArchiveCleanup(const char *xlog)
{
Bruce Momjian's avatar
Bruce Momjian committed
1163
	char		archiveStatusPath[MAXPGPATH];
1164

1165
	/* Remove the .done file */
1166 1167 1168
	StatusFilePath(archiveStatusPath, xlog, ".done");
	unlink(archiveStatusPath);
	/* should we complain about failure? */
1169 1170 1171 1172 1173

	/* Remove the .ready file if present --- normally it shouldn't be */
	StatusFilePath(archiveStatusPath, xlog, ".ready");
	unlink(archiveStatusPath);
	/* should we complain about failure? */
1174 1175
}

Tom Lane's avatar
Tom Lane committed
1176 1177 1178 1179 1180
/*
 * Advance the Insert state to the next buffer page, writing out the next
 * buffer if it still contains unwritten data.
 *
 * The global LogwrtRqst.Write pointer needs to be advanced to include the
1181
 * just-filled page.  If we can do this for free (without an extra lock),
Tom Lane's avatar
Tom Lane committed
1182 1183 1184
 * we do so here.  Otherwise the caller must do it.  We return TRUE if the
 * request update still needs to be done, FALSE if we did it internally.
 *
1185
 * Must be called with WALInsertLock held.
Tom Lane's avatar
Tom Lane committed
1186 1187 1188
 */
static bool
AdvanceXLInsertBuffer(void)
1189
{
Tom Lane's avatar
Tom Lane committed
1190 1191
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogCtlWrite *Write = &XLogCtl->Write;
1192
	int			nextidx = NextBufIdx(Insert->curridx);
Tom Lane's avatar
Tom Lane committed
1193 1194 1195
	bool		update_needed = true;
	XLogRecPtr	OldPageRqstPtr;
	XLogwrtRqst WriteRqst;
1196 1197
	XLogRecPtr	NewPageEndPtr;
	XLogPageHeader NewPage;
1198

Tom Lane's avatar
Tom Lane committed
1199 1200 1201
	/* Use Insert->LogwrtResult copy if it's more fresh */
	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
		LogwrtResult = Insert->LogwrtResult;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1202

Tom Lane's avatar
Tom Lane committed
1203
	/*
1204 1205 1206
	 * Get ending-offset of the buffer page we need to replace (this may be
	 * zero if the buffer hasn't been used yet).  Fall through if it's already
	 * written out.
Tom Lane's avatar
Tom Lane committed
1207 1208 1209 1210 1211 1212
	 */
	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
	{
		/* nope, got work to do... */
		XLogRecPtr	FinishedPageRqstPtr;
1213

Tom Lane's avatar
Tom Lane committed
1214
		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1215

1216
		/* Before waiting, get info_lck and update LogwrtResult */
1217 1218 1219 1220
		{
			/* use volatile pointer to prevent code rearrangement */
			volatile XLogCtlData *xlogctl = XLogCtl;

1221
			SpinLockAcquire(&xlogctl->info_lck);
1222 1223 1224
			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
			LogwrtResult = xlogctl->LogwrtResult;
1225
			SpinLockRelease(&xlogctl->info_lck);
1226
		}
1227 1228 1229 1230 1231 1232 1233 1234 1235

		update_needed = false;	/* Did the shared-request update */

		if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
		{
			/* OK, someone wrote it already */
			Insert->LogwrtResult = LogwrtResult;
		}
		else
1236
		{
1237 1238 1239 1240
			/* Must acquire write lock */
			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
			LogwrtResult = Write->LogwrtResult;
			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1241
			{
1242 1243 1244
				/* OK, someone wrote it already */
				LWLockRelease(WALWriteLock);
				Insert->LogwrtResult = LogwrtResult;
Tom Lane's avatar
Tom Lane committed
1245
			}
1246
			else
Tom Lane's avatar
Tom Lane committed
1247 1248
			{
				/*
1249 1250
				 * Have to write buffers while holding insert lock. This is
				 * not good, so only write as much as we absolutely must.
Tom Lane's avatar
Tom Lane committed
1251 1252 1253 1254
				 */
				WriteRqst.Write = OldPageRqstPtr;
				WriteRqst.Flush.xlogid = 0;
				WriteRqst.Flush.xrecoff = 0;
1255
				XLogWrite(WriteRqst, false);
1256
				LWLockRelease(WALWriteLock);
Tom Lane's avatar
Tom Lane committed
1257
				Insert->LogwrtResult = LogwrtResult;
1258 1259 1260 1261
			}
		}
	}

Tom Lane's avatar
Tom Lane committed
1262
	/*
1263 1264
	 * Now the next buffer slot is free and we can set it up to be the next
	 * output page.
Tom Lane's avatar
Tom Lane committed
1265
	 */
1266 1267
	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
	if (NewPageEndPtr.xrecoff >= XLogFileSize)
1268
	{
Tom Lane's avatar
Tom Lane committed
1269
		/* crossing a logid boundary */
1270
		NewPageEndPtr.xlogid += 1;
1271
		NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1272
	}
Tom Lane's avatar
Tom Lane committed
1273
	else
1274
		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1275
	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1276
	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1277

Tom Lane's avatar
Tom Lane committed
1278
	Insert->curridx = nextidx;
1279
	Insert->currpage = NewPage;
1280 1281

	Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1282

Tom Lane's avatar
Tom Lane committed
1283
	/*
1284 1285
	 * Be sure to re-zero the buffer so that bytes beyond what we've written
	 * will look like zeroes and not valid XLOG records...
Tom Lane's avatar
Tom Lane committed
1286
	 */
1287
	MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1288

1289 1290 1291
	/*
	 * Fill the new page's header
	 */
1292 1293
	NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;

1294
	/* NewPage->xlp_info = 0; */	/* done by memset */
1295 1296
	NewPage   ->xlp_tli = ThisTimeLineID;
	NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1297
	NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
Tom Lane's avatar
Tom Lane committed
1298

1299
	/*
1300
	 * If first page of an XLOG segment file, make it a long header.
1301 1302 1303
	 */
	if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
	{
1304
		XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1305

1306 1307
		NewLongPage->xlp_sysid = ControlFile->system_identifier;
		NewLongPage->xlp_seg_size = XLogSegSize;
1308
		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1309 1310 1311
		NewPage   ->xlp_info |= XLP_LONG_HEADER;

		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1312 1313
	}

Tom Lane's avatar
Tom Lane committed
1314
	return update_needed;
1315 1316
}

Tom Lane's avatar
Tom Lane committed
1317 1318 1319
/*
 * Write and/or fsync the log at least as far as WriteRqst indicates.
 *
1320 1321 1322 1323 1324
 * If flexible == TRUE, we don't have to write as far as WriteRqst, but
 * may stop at any convenient boundary (such as a cache or logfile boundary).
 * This option allows us to avoid uselessly issuing multiple writes when a
 * single one would do.
 *
1325
 * Must be called with WALWriteLock held.
Tom Lane's avatar
Tom Lane committed
1326
 */
1327
static void
1328
XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
1329
{
1330
	XLogCtlWrite *Write = &XLogCtl->Write;
Tom Lane's avatar
Tom Lane committed
1331
	bool		ispartialpage;
1332
	bool		finishing_seg;
1333
	bool		use_existent;
1334 1335 1336 1337
	int			curridx;
	int			npages;
	int			startidx;
	uint32		startoffset;
1338

1339 1340 1341
	/* We should always be inside a critical section here */
	Assert(CritSectionCount > 0);

1342
	/*
1343
	 * Update local LogwrtResult (caller probably did this already, but...)
1344
	 */
Tom Lane's avatar
Tom Lane committed
1345 1346
	LogwrtResult = Write->LogwrtResult;

1347 1348 1349
	/*
	 * Since successive pages in the xlog cache are consecutively allocated,
	 * we can usually gather multiple pages together and issue just one
1350 1351 1352 1353 1354
	 * write() call.  npages is the number of pages we have determined can be
	 * written together; startidx is the cache block index of the first one,
	 * and startoffset is the file offset at which it should go. The latter
	 * two variables are only valid when npages > 0, but we must initialize
	 * all of them to keep the compiler quiet.
1355 1356 1357 1358 1359 1360 1361 1362 1363
	 */
	npages = 0;
	startidx = 0;
	startoffset = 0;

	/*
	 * Within the loop, curridx is the cache block index of the page to
	 * consider writing.  We advance Write->curridx only after successfully
	 * writing pages.  (Right now, this refinement is useless since we are
1364 1365
	 * going to PANIC if any error occurs anyway; but someday it may come in
	 * useful.)
1366 1367
	 */
	curridx = Write->curridx;
1368

Tom Lane's avatar
Tom Lane committed
1369
	while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1370
	{
1371
		/*
1372 1373 1374
		 * Make sure we're not ahead of the insert process.  This could happen
		 * if we're passed a bogus WriteRqst.Write that is past the end of the
		 * last page that's been initialized by AdvanceXLInsertBuffer.
1375
		 */
1376
		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1377
			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1378
				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1379 1380
				 XLogCtl->xlblocks[curridx].xlogid,
				 XLogCtl->xlblocks[curridx].xrecoff);
1381

Tom Lane's avatar
Tom Lane committed
1382
		/* Advance LogwrtResult.Write to end of current buffer page */
1383
		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
Tom Lane's avatar
Tom Lane committed
1384 1385 1386
		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);

		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1387
		{
Tom Lane's avatar
Tom Lane committed
1388
			/*
1389 1390
			 * Switch to new logfile segment.  We cannot have any pending
			 * pages here (since we dump what we have at segment end).
Tom Lane's avatar
Tom Lane committed
1391
			 */
1392
			Assert(npages == 0);
Tom Lane's avatar
Tom Lane committed
1393
			if (openLogFile >= 0)
1394
				XLogFileClose();
Tom Lane's avatar
Tom Lane committed
1395 1396
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);

1397 1398 1399 1400
			/* create/use new log file */
			use_existent = true;
			openLogFile = XLogFileInit(openLogId, openLogSeg,
									   &use_existent, true);
Tom Lane's avatar
Tom Lane committed
1401
			openLogOff = 0;
1402

Tom Lane's avatar
Tom Lane committed
1403
			/* update pg_control, unless someone else already did */
1404
			LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1405 1406 1407
			if (ControlFile->logId < openLogId ||
				(ControlFile->logId == openLogId &&
				 ControlFile->logSeg < openLogSeg + 1))
Tom Lane's avatar
Tom Lane committed
1408 1409 1410 1411 1412
			{
				ControlFile->logId = openLogId;
				ControlFile->logSeg = openLogSeg + 1;
				ControlFile->time = time(NULL);
				UpdateControlFile();
1413

1414
				/*
1415 1416 1417 1418
				 * Signal bgwriter to start a checkpoint if it's been too long
				 * since the last one.	(We look at local copy of RedoRecPtr
				 * which might be a little out of date, but should be close
				 * enough for this purpose.)
1419
				 *
1420 1421
				 * A straight computation of segment number could overflow 32
				 * bits.  Rather than assuming we have working 64-bit
1422 1423
				 * arithmetic, we compare the highest-order bits separately,
				 * and force a checkpoint immediately when they change.
1424
				 */
1425
				if (IsUnderPostmaster)
1426
				{
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
					uint32		old_segno,
								new_segno;
					uint32		old_highbits,
								new_highbits;

					old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
						(RedoRecPtr.xrecoff / XLogSegSize);
					old_highbits = RedoRecPtr.xlogid / XLogSegSize;
					new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
						openLogSeg;
					new_highbits = openLogId / XLogSegSize;
					if (new_highbits != old_highbits ||
						new_segno >= old_segno + (uint32) CheckPointSegments)
					{
1441
#ifdef WAL_DEBUG
1442 1443
						if (XLOG_DEBUG)
							elog(LOG, "time for a checkpoint, signaling bgwriter");
1444
#endif
1445
						RequestCheckpoint(false, true);
1446
					}
1447
				}
Tom Lane's avatar
Tom Lane committed
1448
			}
1449
			LWLockRelease(ControlFileLock);
1450 1451
		}

1452
		/* Make sure we have the current logfile open */
Tom Lane's avatar
Tom Lane committed
1453
		if (openLogFile < 0)
1454
		{
Tom Lane's avatar
Tom Lane committed
1455
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1456
			openLogFile = XLogFileOpen(openLogId, openLogSeg);
Tom Lane's avatar
Tom Lane committed
1457
			openLogOff = 0;
1458 1459
		}

1460 1461 1462 1463 1464
		/* Add current page to the set of pending pages-to-dump */
		if (npages == 0)
		{
			/* first of group */
			startidx = curridx;
1465
			startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1466 1467
		}
		npages++;
1468

Tom Lane's avatar
Tom Lane committed
1469
		/*
1470 1471 1472 1473
		 * Dump the set if this will be the last loop iteration, or if we are
		 * at the last page of the cache area (since the next page won't be
		 * contiguous in memory), or if we are at the end of the logfile
		 * segment.
Tom Lane's avatar
Tom Lane committed
1474
		 */
1475
		finishing_seg = !ispartialpage &&
1476
			(startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1477 1478 1479 1480

		if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
			curridx == XLogCtl->XLogCacheBlck ||
			finishing_seg)
Tom Lane's avatar
Tom Lane committed
1481
		{
1482 1483
			char	   *from;
			Size		nbytes;
1484

1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
			/* Need to seek in the file? */
			if (openLogOff != startoffset)
			{
				if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
					ereport(PANIC,
							(errcode_for_file_access(),
							 errmsg("could not seek in log file %u, "
									"segment %u to offset %u: %m",
									openLogId, openLogSeg, startoffset)));
				openLogOff = startoffset;
			}

			/* OK to write the page(s) */
1498 1499
			from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
			nbytes = npages * (Size) XLOG_BLCKSZ;
1500 1501 1502 1503 1504 1505 1506 1507 1508
			errno = 0;
			if (write(openLogFile, from, nbytes) != nbytes)
			{
				/* if write didn't set errno, assume no disk space */
				if (errno == 0)
					errno = ENOSPC;
				ereport(PANIC,
						(errcode_for_file_access(),
						 errmsg("could not write to log file %u, segment %u "
Peter Eisentraut's avatar
Peter Eisentraut committed
1509
								"at offset %u, length %lu: %m",
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
								openLogId, openLogSeg,
								openLogOff, (unsigned long) nbytes)));
			}

			/* Update state for write */
			openLogOff += nbytes;
			Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
			npages = 0;

			/*
			 * If we just wrote the whole last page of a logfile segment,
			 * fsync the segment immediately.  This avoids having to go back
			 * and re-open prior segments when an fsync request comes along
			 * later. Doing it here ensures that one and only one backend will
			 * perform this fsync.
			 *
			 * This is also the right place to notify the Archiver that the
			 * segment is ready to copy to archival storage.
			 */
			if (finishing_seg)
			{
				issue_xlog_fsync();
1532
				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
1533 1534 1535 1536

				if (XLogArchivingActive())
					XLogArchiveNotifySeg(openLogId, openLogSeg);
			}
Tom Lane's avatar
Tom Lane committed
1537
		}
1538

Tom Lane's avatar
Tom Lane committed
1539 1540 1541 1542 1543 1544
		if (ispartialpage)
		{
			/* Only asked to write a partial page */
			LogwrtResult.Write = WriteRqst.Write;
			break;
		}
1545 1546 1547 1548 1549
		curridx = NextBufIdx(curridx);

		/* If flexible, break out of loop as soon as we wrote something */
		if (flexible && npages == 0)
			break;
1550
	}
1551 1552 1553

	Assert(npages == 0);
	Assert(curridx == Write->curridx);
1554

Tom Lane's avatar
Tom Lane committed
1555 1556 1557 1558 1559
	/*
	 * If asked to flush, do so
	 */
	if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
		XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1560
	{
Tom Lane's avatar
Tom Lane committed
1561
		/*
1562 1563 1564
		 * Could get here without iterating above loop, in which case we might
		 * have no open file or the wrong one.	However, we do not need to
		 * fsync more than one file.
Tom Lane's avatar
Tom Lane committed
1565
		 */
1566
		if (sync_method != SYNC_METHOD_OPEN)
Tom Lane's avatar
Tom Lane committed
1567
		{
1568
			if (openLogFile >= 0 &&
1569
				!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1570
				XLogFileClose();
1571 1572 1573
			if (openLogFile < 0)
			{
				XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1574
				openLogFile = XLogFileOpen(openLogId, openLogSeg);
1575 1576 1577
				openLogOff = 0;
			}
			issue_xlog_fsync();
Tom Lane's avatar
Tom Lane committed
1578 1579
		}
		LogwrtResult.Flush = LogwrtResult.Write;
1580 1581
	}

Tom Lane's avatar
Tom Lane committed
1582 1583 1584
	/*
	 * Update shared-memory status
	 *
1585
	 * We make sure that the shared 'request' values do not fall behind the
1586 1587
	 * 'result' values.  This is not absolutely essential, but it saves some
	 * code in a couple of places.
Tom Lane's avatar
Tom Lane committed
1588
	 */
1589 1590 1591 1592
	{
		/* use volatile pointer to prevent code rearrangement */
		volatile XLogCtlData *xlogctl = XLogCtl;

1593
		SpinLockAcquire(&xlogctl->info_lck);
1594 1595 1596 1597 1598
		xlogctl->LogwrtResult = LogwrtResult;
		if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
			xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
		if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
			xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1599
		SpinLockRelease(&xlogctl->info_lck);
1600
	}
1601

Tom Lane's avatar
Tom Lane committed
1602 1603 1604 1605 1606 1607
	Write->LogwrtResult = LogwrtResult;
}

/*
 * Ensure that all XLOG data through the given position is flushed to disk.
 *
1608
 * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
Tom Lane's avatar
Tom Lane committed
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
 * already held, and we try to avoid acquiring it if possible.
 */
void
XLogFlush(XLogRecPtr record)
{
	XLogRecPtr	WriteRqstPtr;
	XLogwrtRqst WriteRqst;

	/* Disabled during REDO */
	if (InRedo)
		return;

	/* Quick exit if already known flushed */
	if (XLByteLE(record, LogwrtResult.Flush))
		return;

1625
#ifdef WAL_DEBUG
1626
	if (XLOG_DEBUG)
1627
		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1628 1629 1630
			 record.xlogid, record.xrecoff,
			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1631
#endif
1632

Tom Lane's avatar
Tom Lane committed
1633 1634 1635 1636
	START_CRIT_SECTION();

	/*
	 * Since fsync is usually a horribly expensive operation, we try to
1637 1638 1639 1640
	 * piggyback as much data as we can on each fsync: if we see any more data
	 * entered into the xlog buffer, we'll write and fsync that too, so that
	 * the final value of LogwrtResult.Flush is as large as possible. This
	 * gives us some chance of avoiding another fsync immediately after.
Tom Lane's avatar
Tom Lane committed
1641 1642 1643 1644 1645
	 */

	/* initialize to given target; may increase below */
	WriteRqstPtr = record;

1646
	/* read LogwrtResult and update local state */
1647 1648 1649 1650
	{
		/* use volatile pointer to prevent code rearrangement */
		volatile XLogCtlData *xlogctl = XLogCtl;

1651
		SpinLockAcquire(&xlogctl->info_lck);
1652 1653 1654
		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
		LogwrtResult = xlogctl->LogwrtResult;
1655
		SpinLockRelease(&xlogctl->info_lck);
1656
	}
1657 1658 1659

	/* done already? */
	if (!XLByteLE(record, LogwrtResult.Flush))
Tom Lane's avatar
Tom Lane committed
1660
	{
1661 1662 1663 1664
		/* now wait for the write lock */
		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
		LogwrtResult = XLogCtl->Write.LogwrtResult;
		if (!XLByteLE(record, LogwrtResult.Flush))
Tom Lane's avatar
Tom Lane committed
1665
		{
1666 1667 1668 1669 1670 1671
			/* try to write/flush later additions to XLOG as well */
			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
			{
				XLogCtlInsert *Insert = &XLogCtl->Insert;
				uint32		freespace = INSERT_FREESPACE(Insert);

Bruce Momjian's avatar
Bruce Momjian committed
1672
				if (freespace < SizeOfXLogRecord)		/* buffer is full */
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
				else
				{
					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
					WriteRqstPtr.xrecoff -= freespace;
				}
				LWLockRelease(WALInsertLock);
				WriteRqst.Write = WriteRqstPtr;
				WriteRqst.Flush = WriteRqstPtr;
			}
			else
			{
				WriteRqst.Write = WriteRqstPtr;
				WriteRqst.Flush = record;
			}
1688
			XLogWrite(WriteRqst, false);
Tom Lane's avatar
Tom Lane committed
1689
		}
1690
		LWLockRelease(WALWriteLock);
Tom Lane's avatar
Tom Lane committed
1691 1692 1693
	}

	END_CRIT_SECTION();
1694 1695 1696

	/*
	 * If we still haven't flushed to the request point then we have a
1697 1698
	 * problem; most likely, the requested flush point is past end of XLOG.
	 * This has been seen to occur when a disk page has a corrupted LSN.
1699
	 *
1700 1701 1702 1703 1704 1705 1706 1707 1708
	 * Formerly we treated this as a PANIC condition, but that hurts the
	 * system's robustness rather than helping it: we do not want to take down
	 * the whole system due to corruption on one data page.  In particular, if
	 * the bad page is encountered again during recovery then we would be
	 * unable to restart the database at all!  (This scenario has actually
	 * happened in the field several times with 7.1 releases. Note that we
	 * cannot get here while InRedo is true, but if the bad page is brought in
	 * and marked dirty during recovery then CreateCheckPoint will try to
	 * flush it at the end of recovery.)
1709
	 *
1710 1711 1712 1713
	 * The current approach is to ERROR under normal conditions, but only
	 * WARNING during recovery, so that the system can be brought up even if
	 * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
	 * be promoted to PANIC since xact.c calls this routine inside a critical
1714 1715
	 * section.  However, calls from bufmgr.c are not within critical sections
	 * and so we will not force a restart for a bad LSN on a data page.
1716 1717
	 */
	if (XLByteLT(LogwrtResult.Flush, record))
Bruce Momjian's avatar
Bruce Momjian committed
1718
		elog(InRecovery ? WARNING : ERROR,
1719
		"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1720 1721
			 record.xlogid, record.xrecoff,
			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1722 1723
}

Tom Lane's avatar
Tom Lane committed
1724 1725 1726
/*
 * Create a new XLOG file segment, or open a pre-existing one.
 *
1727 1728 1729
 * log, seg: identify segment to be created/opened.
 *
 * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1730
 * pre-existing file will be deleted).	On return, TRUE if a pre-existing
1731 1732
 * file was used.
 *
1733
 * use_lock: if TRUE, acquire ControlFileLock while moving file into
1734
 * place.  This should be TRUE except during bootstrap log creation.  The
1735
 * caller must *not* hold the lock at call.
1736
 *
Tom Lane's avatar
Tom Lane committed
1737
 * Returns FD of opened file.
1738 1739 1740 1741 1742
 *
 * Note: errors here are ERROR not PANIC because we might or might not be
 * inside a critical section (eg, during checkpoint there is no reason to
 * take down the system on failure).  They will promote to PANIC if we are
 * in a critical section.
Tom Lane's avatar
Tom Lane committed
1743
 */
1744
static int
1745 1746
XLogFileInit(uint32 log, uint32 seg,
			 bool *use_existent, bool use_lock)
1747
{
1748
	char		path[MAXPGPATH];
1749
	char		tmppath[MAXPGPATH];
1750
	char		zbuffer[XLOG_BLCKSZ];
1751 1752 1753
	uint32		installed_log;
	uint32		installed_seg;
	int			max_advance;
1754
	int			fd;
1755
	int			nbytes;
1756

1757
	XLogFilePath(path, ThisTimeLineID, log, seg);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1758 1759

	/*
1760
	 * Try to use existent file (checkpoint maker may have created it already)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1761
	 */
1762
	if (*use_existent)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1763
	{
1764 1765
		fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
						   S_IRUSR | S_IWUSR);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1766 1767 1768
		if (fd < 0)
		{
			if (errno != ENOENT)
1769
				ereport(ERROR,
1770
						(errcode_for_file_access(),
1771
						 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1772
								path, log, seg)));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1773 1774
		}
		else
1775
			return fd;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1776 1777
	}

1778
	/*
1779 1780 1781 1782
	 * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
	 * another process is doing the same thing.  If so, we will end up
	 * pre-creating an extra log segment.  That seems OK, and better than
	 * holding the lock throughout this lengthy process.
1783
	 */
1784
	snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1785 1786

	unlink(tmppath);
1787

1788
	/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1789
	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
Tom Lane's avatar
Tom Lane committed
1790
					   S_IRUSR | S_IWUSR);
1791
	if (fd < 0)
1792
		ereport(ERROR,
1793
				(errcode_for_file_access(),
1794
				 errmsg("could not create file \"%s\": %m", tmppath)));
1795

1796
	/*
1797 1798 1799 1800 1801 1802 1803
	 * Zero-fill the file.	We have to do this the hard way to ensure that all
	 * the file space has really been allocated --- on platforms that allow
	 * "holes" in files, just seeking to the end doesn't allocate intermediate
	 * space.  This way, we know that we have all the space and (after the
	 * fsync below) that all the indirect blocks are down on disk.	Therefore,
	 * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
	 * log file.
1804 1805 1806 1807
	 */
	MemSet(zbuffer, 0, sizeof(zbuffer));
	for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
	{
1808
		errno = 0;
1809
		if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
Tom Lane's avatar
Tom Lane committed
1810
		{
1811
			int			save_errno = errno;
Tom Lane's avatar
Tom Lane committed
1812

1813
			/*
1814
			 * If we fail to make the file, delete it to release disk space
1815
			 */
1816
			unlink(tmppath);
1817 1818
			/* if write didn't set errno, assume problem is no disk space */
			errno = save_errno ? save_errno : ENOSPC;
Tom Lane's avatar
Tom Lane committed
1819

1820
			ereport(ERROR,
1821
					(errcode_for_file_access(),
1822
					 errmsg("could not write to file \"%s\": %m", tmppath)));
Tom Lane's avatar
Tom Lane committed
1823
		}
1824
	}
1825

1826
	if (pg_fsync(fd) != 0)
1827
		ereport(ERROR,
1828
				(errcode_for_file_access(),
1829
				 errmsg("could not fsync file \"%s\": %m", tmppath)));
1830

1831
	if (close(fd))
1832
		ereport(ERROR,
1833 1834
				(errcode_for_file_access(),
				 errmsg("could not close file \"%s\": %m", tmppath)));
Tom Lane's avatar
Tom Lane committed
1835

1836
	/*
1837 1838
	 * Now move the segment into place with its final name.
	 *
1839
	 * If caller didn't want to use a pre-existing file, get rid of any
1840 1841 1842
	 * pre-existing file.  Otherwise, cope with possibility that someone else
	 * has created the file while we were filling ours: if so, use ours to
	 * pre-create a future log segment.
1843
	 */
1844 1845 1846 1847 1848
	installed_log = log;
	installed_seg = seg;
	max_advance = XLOGfileslop;
	if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
								*use_existent, &max_advance,
1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
								use_lock))
	{
		/* No need for any more future segments... */
		unlink(tmppath);
	}

	/* Set flag to tell caller there was no existent file */
	*use_existent = false;

	/* Now open original target segment (might not be file I just made) */
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
	if (fd < 0)
1862
		ereport(ERROR,
1863
				(errcode_for_file_access(),
1864 1865
		   errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
				  path, log, seg)));
1866

1867
	return fd;
1868 1869
}

1870 1871 1872 1873 1874 1875 1876 1877 1878
/*
 * Create a new XLOG file segment by copying a pre-existing one.
 *
 * log, seg: identify segment to be created.
 *
 * srcTLI, srclog, srcseg: identify segment to be copied (could be from
 *		a different timeline)
 *
 * Currently this is only used during recovery, and so there are no locking
Bruce Momjian's avatar
Bruce Momjian committed
1879
 * considerations.	But we should be just as tense as XLogFileInit to avoid
1880 1881 1882 1883 1884 1885 1886 1887
 * emplacing a bogus file.
 */
static void
XLogFileCopy(uint32 log, uint32 seg,
			 TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
{
	char		path[MAXPGPATH];
	char		tmppath[MAXPGPATH];
1888
	char		buffer[XLOG_BLCKSZ];
1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
	int			srcfd;
	int			fd;
	int			nbytes;

	/*
	 * Open the source file
	 */
	XLogFilePath(path, srcTLI, srclog, srcseg);
	srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
	if (srcfd < 0)
1899
		ereport(ERROR,
1900 1901 1902 1903 1904 1905
				(errcode_for_file_access(),
				 errmsg("could not open file \"%s\": %m", path)));

	/*
	 * Copy into a temp file name.
	 */
1906
	snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1907 1908 1909 1910 1911 1912 1913

	unlink(tmppath);

	/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
					   S_IRUSR | S_IWUSR);
	if (fd < 0)
1914
		ereport(ERROR,
1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
				(errcode_for_file_access(),
				 errmsg("could not create file \"%s\": %m", tmppath)));

	/*
	 * Do the data copying.
	 */
	for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
	{
		errno = 0;
		if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
		{
			if (errno != 0)
1927
				ereport(ERROR,
1928 1929 1930
						(errcode_for_file_access(),
						 errmsg("could not read file \"%s\": %m", path)));
			else
1931
				ereport(ERROR,
1932
						(errmsg("not enough data in file \"%s\"", path)));
1933 1934 1935 1936 1937 1938 1939
		}
		errno = 0;
		if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
		{
			int			save_errno = errno;

			/*
1940
			 * If we fail to make the file, delete it to release disk space
1941 1942 1943 1944 1945
			 */
			unlink(tmppath);
			/* if write didn't set errno, assume problem is no disk space */
			errno = save_errno ? save_errno : ENOSPC;

1946
			ereport(ERROR,
1947
					(errcode_for_file_access(),
1948
					 errmsg("could not write to file \"%s\": %m", tmppath)));
1949 1950 1951 1952
		}
	}

	if (pg_fsync(fd) != 0)
1953
		ereport(ERROR,
1954 1955 1956 1957
				(errcode_for_file_access(),
				 errmsg("could not fsync file \"%s\": %m", tmppath)));

	if (close(fd))
1958
		ereport(ERROR,
1959 1960 1961 1962 1963 1964 1965 1966
				(errcode_for_file_access(),
				 errmsg("could not close file \"%s\": %m", tmppath)));

	close(srcfd);

	/*
	 * Now move the segment into place with its final name.
	 */
1967
	if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
1968
		elog(ERROR, "InstallXLogFileSegment should not have failed");
1969 1970
}

1971 1972 1973 1974 1975 1976
/*
 * Install a new XLOG segment file as a current or future log segment.
 *
 * This is used both to install a newly-created segment (which has a temp
 * filename while it's being created) and to recycle an old segment.
 *
1977 1978 1979
 * *log, *seg: identify segment to install as (or first possible target).
 * When find_free is TRUE, these are modified on return to indicate the
 * actual installation location or last segment searched.
1980 1981 1982 1983 1984 1985 1986
 *
 * tmppath: initial name of file to install.  It will be renamed into place.
 *
 * find_free: if TRUE, install the new segment at the first empty log/seg
 * number at or after the passed numbers.  If FALSE, install the new segment
 * exactly where specified, deleting any existing segment file there.
 *
1987 1988 1989 1990
 * *max_advance: maximum number of log/seg slots to advance past the starting
 * point.  Fail if no free slot is found in this range.  On return, reduced
 * by the number of slots skipped over.  (Irrelevant, and may be NULL,
 * when find_free is FALSE.)
1991
 *
1992
 * use_lock: if TRUE, acquire ControlFileLock while moving file into
1993
 * place.  This should be TRUE except during bootstrap log creation.  The
1994
 * caller must *not* hold the lock at call.
1995 1996
 *
 * Returns TRUE if file installed, FALSE if not installed because of
1997
 * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1998 1999
 */
static bool
2000 2001
InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
					   bool find_free, int *max_advance,
2002 2003 2004
					   bool use_lock)
{
	char		path[MAXPGPATH];
2005
	struct stat stat_buf;
2006

2007
	XLogFilePath(path, ThisTimeLineID, *log, *seg);
2008 2009 2010 2011 2012

	/*
	 * We want to be sure that only one process does this at a time.
	 */
	if (use_lock)
2013
		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2014

2015 2016 2017 2018 2019
	if (!find_free)
	{
		/* Force installation: get rid of any pre-existing segment file */
		unlink(path);
	}
2020 2021
	else
	{
2022
		/* Find a free slot to put it in */
2023
		while (stat(path, &stat_buf) == 0)
2024
		{
2025
			if (*max_advance <= 0)
2026 2027 2028
			{
				/* Failed to find a free slot within specified range */
				if (use_lock)
2029
					LWLockRelease(ControlFileLock);
2030 2031
				return false;
			}
2032 2033 2034
			NextLogSeg(*log, *seg);
			(*max_advance)--;
			XLogFilePath(path, ThisTimeLineID, *log, *seg);
2035 2036 2037 2038 2039 2040 2041
		}
	}

	/*
	 * Prefer link() to rename() here just to be really sure that we don't
	 * overwrite an existing logfile.  However, there shouldn't be one, so
	 * rename() is an acceptable substitute except for the truly paranoid.
2042
	 */
2043
#if HAVE_WORKING_LINK
2044
	if (link(tmppath, path) < 0)
2045
		ereport(ERROR,
2046
				(errcode_for_file_access(),
2047
				 errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2048
						tmppath, path, *log, *seg)));
2049
	unlink(tmppath);
2050
#else
2051
	if (rename(tmppath, path) < 0)
2052
		ereport(ERROR,
2053
				(errcode_for_file_access(),
2054
				 errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2055
						tmppath, path, *log, *seg)));
2056
#endif
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2057

2058
	if (use_lock)
2059
		LWLockRelease(ControlFileLock);
2060

2061
	return true;
2062 2063
}

Tom Lane's avatar
Tom Lane committed
2064
/*
2065
 * Open a pre-existing logfile segment for writing.
Tom Lane's avatar
Tom Lane committed
2066
 */
2067
static int
2068
XLogFileOpen(uint32 log, uint32 seg)
2069
{
2070 2071
	char		path[MAXPGPATH];
	int			fd;
2072

2073
	XLogFilePath(path, ThisTimeLineID, log, seg);
2074

2075 2076
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
2077
	if (fd < 0)
2078 2079
		ereport(PANIC,
				(errcode_for_file_access(),
2080 2081
		   errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
				  path, log, seg)));
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095

	return fd;
}

/*
 * Open a logfile segment for reading (during recovery).
 */
static int
XLogFileRead(uint32 log, uint32 seg, int emode)
{
	char		path[MAXPGPATH];
	char		xlogfname[MAXFNAMELEN];
	ListCell   *cell;
	int			fd;
2096

2097
	/*
2098 2099
	 * Loop looking for a suitable timeline ID: we might need to read any of
	 * the timelines listed in expectedTLIs.
2100
	 *
Bruce Momjian's avatar
Bruce Momjian committed
2101
	 * We expect curFileTLI on entry to be the TLI of the preceding file in
2102 2103 2104 2105
	 * sequence, or 0 if there was no predecessor.	We do not allow curFileTLI
	 * to go backwards; this prevents us from picking up the wrong file when a
	 * parent timeline extends to higher segment numbers than the child we
	 * want to read.
2106
	 */
2107 2108 2109
	foreach(cell, expectedTLIs)
	{
		TimeLineID	tli = (TimeLineID) lfirst_int(cell);
2110

2111 2112 2113 2114 2115 2116 2117
		if (tli < curFileTLI)
			break;				/* don't bother looking at too-old TLIs */

		if (InArchiveRecovery)
		{
			XLogFileName(xlogfname, tli, log, seg);
			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2118 2119
													  "RECOVERYXLOG",
													  XLogSegSize);
2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133
		}
		else
			XLogFilePath(path, tli, log, seg);

		fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
		if (fd >= 0)
		{
			/* Success! */
			curFileTLI = tli;
			return fd;
		}
		if (errno != ENOENT)	/* unexpected failure? */
			ereport(PANIC,
					(errcode_for_file_access(),
2134 2135
			errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
				   path, log, seg)));
2136 2137 2138 2139 2140 2141 2142
	}

	/* Couldn't find it.  For simplicity, complain about front timeline */
	XLogFilePath(path, recoveryTargetTLI, log, seg);
	errno = ENOENT;
	ereport(emode,
			(errcode_for_file_access(),
2143 2144
		   errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
				  path, log, seg)));
2145
	return -1;
2146 2147
}

2148 2149 2150 2151 2152 2153 2154 2155
/*
 * Close the current logfile segment for writing.
 */
static void
XLogFileClose(void)
{
	Assert(openLogFile >= 0);

2156
#if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2157
	/*
2158 2159 2160 2161 2162
	 * WAL segment files will not be re-read in normal operation, so we advise
	 * OS to release any cached pages.  But do not do so if WAL archiving is
	 * active, because archiver process could use the cache to read the WAL
	 * segment.
	 *
2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177
	 * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync()
	 * and O_SYNC, and some platforms only have posix_fadvise().
	 */
	if (!XLogArchivingActive())
		posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
#endif

	if (close(openLogFile))
		ereport(PANIC,
			(errcode_for_file_access(),
			errmsg("could not close log file %u, segment %u: %m",
				   openLogId, openLogSeg)));
	openLogFile = -1;
}

2178
/*
2179
 * Attempt to retrieve the specified file from off-line archival storage.
2180
 * If successful, fill "path" with its complete path (note that this will be
2181 2182
 * a temp file name that doesn't follow the normal naming convention), and
 * return TRUE.
2183
 *
2184 2185 2186
 * If not successful, fill "path" with the name of the normal on-line file
 * (which may or may not actually exist, but we'll try to use it), and return
 * FALSE.
2187 2188 2189 2190
 *
 * For fixed-size files, the caller may pass the expected size as an
 * additional crosscheck on successful recovery.  If the file size is not
 * known, set expectedSize = 0.
2191
 */
2192 2193
static bool
RestoreArchivedFile(char *path, const char *xlogfname,
2194
					const char *recovername, off_t expectedSize)
2195
{
Bruce Momjian's avatar
Bruce Momjian committed
2196 2197 2198 2199
	char		xlogpath[MAXPGPATH];
	char		xlogRestoreCmd[MAXPGPATH];
	char	   *dp;
	char	   *endp;
2200
	const char *sp;
Bruce Momjian's avatar
Bruce Momjian committed
2201
	int			rc;
2202 2203 2204
	struct stat stat_buf;

	/*
2205 2206 2207 2208
	 * When doing archive recovery, we always prefer an archived log file even
	 * if a file of the same name exists in XLOGDIR.  The reason is that the
	 * file in XLOGDIR could be an old, un-filled or partly-filled version
	 * that was copied and restored as part of backing up $PGDATA.
2209
	 *
Bruce Momjian's avatar
Bruce Momjian committed
2210
	 * We could try to optimize this slightly by checking the local copy
2211 2212 2213 2214
	 * lastchange timestamp against the archived copy, but we have no API to
	 * do this, nor can we guarantee that the lastchange timestamp was
	 * preserved correctly when we copied to archive. Our aim is robustness,
	 * so we elect not to do this.
2215
	 *
2216 2217 2218
	 * If we cannot obtain the log file from the archive, however, we will try
	 * to use the XLOGDIR file if it exists.  This is so that we can make use
	 * of log segments that weren't yet transferred to the archive.
2219
	 *
2220 2221 2222 2223
	 * Notice that we don't actually overwrite any files when we copy back
	 * from archive because the recoveryRestoreCommand may inadvertently
	 * restore inappropriate xlogs, or they may be corrupt, so we may wish to
	 * fallback to the segments remaining in current XLOGDIR later. The
2224 2225
	 * copy-from-archive filename is always the same, ensuring that we don't
	 * run out of disk space on long recoveries.
2226
	 */
2227
	snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2228 2229

	/*
2230
	 * Make sure there is no existing file named recovername.
2231 2232 2233 2234 2235 2236
	 */
	if (stat(xlogpath, &stat_buf) != 0)
	{
		if (errno != ENOENT)
			ereport(FATAL,
					(errcode_for_file_access(),
Peter Eisentraut's avatar
Peter Eisentraut committed
2237
					 errmsg("could not stat file \"%s\": %m",
2238 2239 2240 2241 2242 2243 2244
							xlogpath)));
	}
	else
	{
		if (unlink(xlogpath) != 0)
			ereport(FATAL,
					(errcode_for_file_access(),
2245
					 errmsg("could not remove file \"%s\": %m",
2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264
							xlogpath)));
	}

	/*
	 * construct the command to be executed
	 */
	dp = xlogRestoreCmd;
	endp = xlogRestoreCmd + MAXPGPATH - 1;
	*endp = '\0';

	for (sp = recoveryRestoreCommand; *sp; sp++)
	{
		if (*sp == '%')
		{
			switch (sp[1])
			{
				case 'p':
					/* %p: full path of target file */
					sp++;
Bruce Momjian's avatar
Bruce Momjian committed
2265
					StrNCpy(dp, xlogpath, endp - dp);
2266
					make_native_path(dp);
2267 2268 2269 2270 2271
					dp += strlen(dp);
					break;
				case 'f':
					/* %f: filename of desired file */
					sp++;
Bruce Momjian's avatar
Bruce Momjian committed
2272
					StrNCpy(dp, xlogfname, endp - dp);
2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296
					dp += strlen(dp);
					break;
				case '%':
					/* convert %% to a single % */
					sp++;
					if (dp < endp)
						*dp++ = *sp;
					break;
				default:
					/* otherwise treat the % as not special */
					if (dp < endp)
						*dp++ = *sp;
					break;
			}
		}
		else
		{
			if (dp < endp)
				*dp++ = *sp;
		}
	}
	*dp = '\0';

	ereport(DEBUG3,
Bruce Momjian's avatar
Bruce Momjian committed
2297
			(errmsg_internal("executing restore command \"%s\"",
2298 2299 2300
							 xlogRestoreCmd)));

	/*
2301
	 * Copy xlog from archival storage to XLOGDIR
2302 2303 2304 2305
	 */
	rc = system(xlogRestoreCmd);
	if (rc == 0)
	{
2306 2307 2308 2309
		/*
		 * command apparently succeeded, but let's make sure the file is
		 * really there now and has the correct size.
		 *
2310 2311 2312 2313 2314
		 * XXX I made wrong-size a fatal error to ensure the DBA would notice
		 * it, but is that too strong?	We could try to plow ahead with a
		 * local copy of the file ... but the problem is that there probably
		 * isn't one, and we'd incorrectly conclude we've reached the end of
		 * WAL and we're done recovering ...
2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338
		 */
		if (stat(xlogpath, &stat_buf) == 0)
		{
			if (expectedSize > 0 && stat_buf.st_size != expectedSize)
				ereport(FATAL,
						(errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
								xlogfname,
								(unsigned long) stat_buf.st_size,
								(unsigned long) expectedSize)));
			else
			{
				ereport(LOG,
						(errmsg("restored log file \"%s\" from archive",
								xlogfname)));
				strcpy(path, xlogpath);
				return true;
			}
		}
		else
		{
			/* stat failed */
			if (errno != ENOENT)
				ereport(FATAL,
						(errcode_for_file_access(),
Peter Eisentraut's avatar
Peter Eisentraut committed
2339
						 errmsg("could not stat file \"%s\": %m",
2340
								xlogpath)));
2341 2342 2343 2344
		}
	}

	/*
Bruce Momjian's avatar
Bruce Momjian committed
2345 2346
	 * remember, we rollforward UNTIL the restore fails so failure here is
	 * just part of the process... that makes it difficult to determine
2347 2348 2349
	 * whether the restore failed because there isn't an archive to restore,
	 * or because the administrator has specified the restore program
	 * incorrectly.  We have to assume the former.
2350
	 */
2351
	ereport(DEBUG2,
2352 2353
		(errmsg("could not restore file \"%s\" from archive: return code %d",
				xlogfname, rc)));
2354 2355

	/*
2356 2357
	 * if an archived file is not available, there might still be a version of
	 * this file in XLOGDIR, so return that as the filename to open.
2358
	 *
Bruce Momjian's avatar
Bruce Momjian committed
2359 2360
	 * In many recovery scenarios we expect this to fail also, but if so that
	 * just means we've reached the end of WAL.
2361
	 */
2362
	snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2363
	return false;
2364 2365
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2366
/*
Tom Lane's avatar
Tom Lane committed
2367 2368 2369
 * Preallocate log files beyond the specified log endpoint, according to
 * the XLOGfile user parameter.
 */
2370
static int
Tom Lane's avatar
Tom Lane committed
2371 2372
PreallocXlogFiles(XLogRecPtr endptr)
{
2373
	int			nsegsadded = 0;
Tom Lane's avatar
Tom Lane committed
2374 2375 2376
	uint32		_logId;
	uint32		_logSeg;
	int			lf;
2377
	bool		use_existent;
Tom Lane's avatar
Tom Lane committed
2378 2379

	XLByteToPrevSeg(endptr, _logId, _logSeg);
Bruce Momjian's avatar
Bruce Momjian committed
2380
	if ((endptr.xrecoff - 1) % XLogSegSize >=
Bruce Momjian's avatar
Bruce Momjian committed
2381
		(uint32) (0.75 * XLogSegSize))
Tom Lane's avatar
Tom Lane committed
2382 2383
	{
		NextLogSeg(_logId, _logSeg);
2384 2385
		use_existent = true;
		lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
Tom Lane's avatar
Tom Lane committed
2386
		close(lf);
2387 2388
		if (!use_existent)
			nsegsadded++;
Tom Lane's avatar
Tom Lane committed
2389
	}
2390
	return nsegsadded;
Tom Lane's avatar
Tom Lane committed
2391 2392 2393 2394
}

/*
 * Remove or move offline all log files older or equal to passed log/seg#
2395 2396 2397
 *
 * endptr is current (or recent) end of xlog; this is used to determine
 * whether we want to recycle rather than delete no-longer-wanted log files.
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2398 2399
 */
static void
2400 2401
MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
				int *nsegsremoved, int *nsegsrecycled)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2402
{
2403 2404
	uint32		endlogId;
	uint32		endlogSeg;
2405
	int			max_advance;
2406 2407
	DIR		   *xldir;
	struct dirent *xlde;
2408
	char		lastoff[MAXFNAMELEN];
2409
	char		path[MAXPGPATH];
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2410

2411 2412 2413
	*nsegsremoved = 0;
	*nsegsrecycled = 0;

2414 2415 2416 2417
	/*
	 * Initialize info about where to try to recycle to.  We allow recycling
	 * segments up to XLOGfileslop segments beyond the current XLOG location.
	 */
2418
	XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2419
	max_advance = XLOGfileslop;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2420

2421
	xldir = AllocateDir(XLOGDIR);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2422
	if (xldir == NULL)
2423
		ereport(ERROR,
2424
				(errcode_for_file_access(),
2425 2426
				 errmsg("could not open transaction log directory \"%s\": %m",
						XLOGDIR)));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2427

2428
	XLogFileName(lastoff, ThisTimeLineID, log, seg);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2429

2430
	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2431
	{
2432
		/*
2433
		 * We ignore the timeline part of the XLOG segment identifiers in
2434 2435 2436 2437 2438
		 * deciding whether a segment is still needed.	This ensures that we
		 * won't prematurely remove a segment from a parent timeline. We could
		 * probably be a little more proactive about removing segments of
		 * non-parent timelines, but that would be a whole lot more
		 * complicated.
2439
		 *
2440 2441
		 * We use the alphanumeric sorting property of the filenames to decide
		 * which ones are earlier than the lastoff segment.
2442
		 */
2443 2444 2445
		if (strlen(xlde->d_name) == 24 &&
			strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
			strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2446
		{
2447
			if (XLogArchiveCheckDone(xlde->d_name))
2448
			{
2449
				snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2450

2451
				/*
2452 2453
				 * Before deleting the file, see if it can be recycled as a
				 * future log segment.
2454
				 */
2455 2456
				if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
										   true, &max_advance,
2457 2458
										   true))
				{
2459
					ereport(DEBUG2,
2460 2461
							(errmsg("recycled transaction log file \"%s\"",
									xlde->d_name)));
2462
					(*nsegsrecycled)++;
2463 2464 2465 2466 2467 2468
					/* Needn't recheck that slot on future iterations */
					if (max_advance > 0)
					{
						NextLogSeg(endlogId, endlogSeg);
						max_advance--;
					}
2469 2470 2471 2472
				}
				else
				{
					/* No need for any more future segments... */
2473
					ereport(DEBUG2,
2474 2475
							(errmsg("removing transaction log file \"%s\"",
									xlde->d_name)));
2476
					unlink(path);
2477
					(*nsegsremoved)++;
2478
				}
2479 2480

				XLogArchiveCleanup(xlde->d_name);
2481
			}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2482 2483
		}
	}
Bruce Momjian's avatar
Bruce Momjian committed
2484

2485
	FreeDir(xldir);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2486 2487
}

2488
/*
2489 2490 2491
 * Remove previous backup history files.  This also retries creation of
 * .ready files for any backup history files for which XLogArchiveNotify
 * failed earlier.
2492 2493
 */
static void
2494
CleanupBackupHistory(void)
2495 2496 2497 2498 2499
{
	DIR		   *xldir;
	struct dirent *xlde;
	char		path[MAXPGPATH];

2500
	xldir = AllocateDir(XLOGDIR);
2501 2502 2503
	if (xldir == NULL)
		ereport(ERROR,
				(errcode_for_file_access(),
2504 2505
				 errmsg("could not open transaction log directory \"%s\": %m",
						XLOGDIR)));
2506

2507
	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2508 2509 2510 2511 2512 2513
	{
		if (strlen(xlde->d_name) > 24 &&
			strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
			strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
				   ".backup") == 0)
		{
2514
			if (XLogArchiveCheckDone(xlde->d_name))
2515 2516
			{
				ereport(DEBUG2,
2517 2518
				(errmsg("removing transaction log backup history file \"%s\"",
						xlde->d_name)));
2519
				snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2520 2521 2522 2523 2524 2525 2526 2527 2528
				unlink(path);
				XLogArchiveCleanup(xlde->d_name);
			}
		}
	}

	FreeDir(xldir);
}

Tom Lane's avatar
Tom Lane committed
2529 2530 2531 2532
/*
 * Restore the backup blocks present in an XLOG record, if any.
 *
 * We assume all of the record has been read into memory at *record.
2533 2534 2535 2536 2537 2538 2539 2540 2541
 *
 * Note: when a backup block is available in XLOG, we restore it
 * unconditionally, even if the page in the database appears newer.
 * This is to protect ourselves against database pages that were partially
 * or incorrectly written during a crash.  We assume that the XLOG data
 * must be good because it has passed a CRC check, while the database
 * page might not be.  This will force us to replay all subsequent
 * modifications of the page that appear in XLOG, rather than possibly
 * ignoring them as already applied, but that's not a huge drawback.
Tom Lane's avatar
Tom Lane committed
2542
 */
2543 2544 2545 2546 2547 2548 2549 2550 2551 2552
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
	Relation	reln;
	Buffer		buffer;
	Page		page;
	BkpBlock	bkpb;
	char	   *blk;
	int			i;

2553
	blk = (char *) XLogRecGetData(record) + record->xl_len;
Tom Lane's avatar
Tom Lane committed
2554
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2555
	{
Tom Lane's avatar
Tom Lane committed
2556
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2557 2558
			continue;

2559
		memcpy(&bkpb, blk, sizeof(BkpBlock));
2560 2561
		blk += sizeof(BkpBlock);

2562
		reln = XLogOpenRelation(bkpb.node);
2563 2564 2565
		buffer = XLogReadBuffer(reln, bkpb.block, true);
		Assert(BufferIsValid(buffer));
		page = (Page) BufferGetPage(buffer);
2566

2567
		if (bkpb.hole_length == 0)
2568
		{
2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
			memcpy((char *) page, blk, BLCKSZ);
		}
		else
		{
			/* must zero-fill the hole */
			MemSet((char *) page, 0, BLCKSZ);
			memcpy((char *) page, blk, bkpb.hole_offset);
			memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
				   blk + bkpb.hole_offset,
				   BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2579 2580
		}

2581 2582
		PageSetLSN(page, lsn);
		PageSetTLI(page, ThisTimeLineID);
2583 2584
		MarkBufferDirty(buffer);
		UnlockReleaseBuffer(buffer);
2585

2586
		blk += BLCKSZ - bkpb.hole_length;
2587 2588 2589
	}
}

Tom Lane's avatar
Tom Lane committed
2590 2591 2592 2593 2594 2595 2596
/*
 * CRC-check an XLOG record.  We do not believe the contents of an XLOG
 * record (other than to the minimal extent of computing the amount of
 * data to read in) until we've checked the CRCs.
 *
 * We assume all of the record has been read into memory at *record.
 */
2597 2598 2599
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
2600
	pg_crc32	crc;
2601 2602
	int			i;
	uint32		len = record->xl_len;
2603
	BkpBlock	bkpb;
2604 2605
	char	   *blk;

2606 2607 2608
	/* First the rmgr data */
	INIT_CRC32(crc);
	COMP_CRC32(crc, XLogRecGetData(record), len);
2609

2610
	/* Add in the backup blocks, if any */
2611
	blk = (char *) XLogRecGetData(record) + len;
Tom Lane's avatar
Tom Lane committed
2612
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2613
	{
2614
		uint32		blen;
2615

Tom Lane's avatar
Tom Lane committed
2616
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2617 2618
			continue;

2619 2620
		memcpy(&bkpb, blk, sizeof(BkpBlock));
		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2621
		{
2622
			ereport(emode,
2623 2624 2625
					(errmsg("incorrect hole size in record at %X/%X",
							recptr.xlogid, recptr.xrecoff)));
			return false;
2626
		}
2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648
		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
		COMP_CRC32(crc, blk, blen);
		blk += blen;
	}

	/* Check that xl_tot_len agrees with our calculation */
	if (blk != (char *) record + record->xl_tot_len)
	{
		ereport(emode,
				(errmsg("incorrect total length in record at %X/%X",
						recptr.xlogid, recptr.xrecoff)));
		return false;
	}

	/* Finally include the record header */
	COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
			   SizeOfXLogRecord - sizeof(pg_crc32));
	FIN_CRC32(crc);

	if (!EQ_CRC32(record->xl_crc, crc))
	{
		ereport(emode,
2649 2650
		(errmsg("incorrect resource manager data checksum in record at %X/%X",
				recptr.xlogid, recptr.xrecoff)));
2651
		return false;
2652 2653
	}

2654
	return true;
2655 2656
}

Tom Lane's avatar
Tom Lane committed
2657 2658 2659 2660 2661 2662
/*
 * Attempt to read an XLOG record.
 *
 * If RecPtr is not NULL, try to read a record at that position.  Otherwise
 * try to read a record just after the last one previously read.
 *
2663 2664
 * If no valid record is available, returns NULL, or fails if emode is PANIC.
 * (emode must be either PANIC or LOG.)
Tom Lane's avatar
Tom Lane committed
2665
 *
2666 2667
 * The record is copied into readRecordBuf, so that on successful return,
 * the returned record pointer always points there.
Tom Lane's avatar
Tom Lane committed
2668
 */
2669
static XLogRecord *
2670
ReadRecord(XLogRecPtr *RecPtr, int emode)
2671
{
2672
	XLogRecord *record;
2673
	char	   *buffer;
2674
	XLogRecPtr	tmpRecPtr = EndRecPtr;
2675
	bool		randAccess = false;
Tom Lane's avatar
Tom Lane committed
2676 2677 2678
	uint32		len,
				total_len;
	uint32		targetPageOff;
2679 2680
	uint32		targetRecOff;
	uint32		pageHeaderSize;
Tom Lane's avatar
Tom Lane committed
2681 2682 2683 2684

	if (readBuf == NULL)
	{
		/*
2685 2686 2687 2688 2689
		 * First time through, permanently allocate readBuf.  We do it this
		 * way, rather than just making a static array, for two reasons: (1)
		 * no need to waste the storage in most instantiations of the backend;
		 * (2) a static char array isn't guaranteed to have any particular
		 * alignment, whereas malloc() will provide MAXALIGN'd storage.
Tom Lane's avatar
Tom Lane committed
2690
		 */
2691
		readBuf = (char *) malloc(XLOG_BLCKSZ);
Tom Lane's avatar
Tom Lane committed
2692 2693
		Assert(readBuf != NULL);
	}
2694

Tom Lane's avatar
Tom Lane committed
2695
	if (RecPtr == NULL)
2696
	{
2697
		RecPtr = &tmpRecPtr;
Tom Lane's avatar
Tom Lane committed
2698
		/* fast case if next record is on same page */
2699 2700 2701 2702 2703
		if (nextRecord != NULL)
		{
			record = nextRecord;
			goto got_record;
		}
Tom Lane's avatar
Tom Lane committed
2704
		/* align old recptr to next page */
2705 2706
		if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
			tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2707 2708 2709 2710 2711
		if (tmpRecPtr.xrecoff >= XLogFileSize)
		{
			(tmpRecPtr.xlogid)++;
			tmpRecPtr.xrecoff = 0;
		}
2712 2713 2714 2715 2716 2717 2718 2719
		/* We will account for page header size below */
	}
	else
	{
		if (!XRecOffIsValid(RecPtr->xrecoff))
			ereport(PANIC,
					(errmsg("invalid record offset at %X/%X",
							RecPtr->xlogid, RecPtr->xrecoff)));
Bruce Momjian's avatar
Bruce Momjian committed
2720

2721
		/*
2722 2723 2724 2725 2726
		 * Since we are going to a random position in WAL, forget any prior
		 * state about what timeline we were in, and allow it to be any
		 * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
		 * to go backwards (but we can't reset that variable right here, since
		 * we might not change files at all).
2727 2728 2729
		 */
		lastPageTLI = 0;		/* see comment in ValidXLOGHeader */
		randAccess = true;		/* allow curFileTLI to go backwards too */
2730 2731
	}

Tom Lane's avatar
Tom Lane committed
2732
	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2733
	{
2734 2735
		close(readFile);
		readFile = -1;
2736
	}
Tom Lane's avatar
Tom Lane committed
2737
	XLByteToSeg(*RecPtr, readId, readSeg);
2738
	if (readFile < 0)
2739
	{
2740 2741 2742 2743 2744
		/* Now it's okay to reset curFileTLI if random fetch */
		if (randAccess)
			curFileTLI = 0;

		readFile = XLogFileRead(readId, readSeg, emode);
2745 2746
		if (readFile < 0)
			goto next_record_is_invalid;
2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765

		/*
		 * Whenever switching to a new WAL segment, we read the first page of
		 * the file and validate its header, even if that's not where the
		 * target record is.  This is so that we can check the additional
		 * identification info that is present in the first page's "long"
		 * header.
		 */
		readOff = 0;
		if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
		{
			ereport(emode,
					(errcode_for_file_access(),
					 errmsg("could not read from log file %u, segment %u, offset %u: %m",
							readId, readSeg, readOff)));
			goto next_record_is_invalid;
		}
		if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
			goto next_record_is_invalid;
2766 2767
	}

2768
	targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
Tom Lane's avatar
Tom Lane committed
2769
	if (readOff != targetPageOff)
2770
	{
Tom Lane's avatar
Tom Lane committed
2771 2772 2773
		readOff = targetPageOff;
		if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
		{
2774 2775
			ereport(emode,
					(errcode_for_file_access(),
2776
					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2777
							readId, readSeg, readOff)));
Tom Lane's avatar
Tom Lane committed
2778 2779
			goto next_record_is_invalid;
		}
2780
		if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
Tom Lane's avatar
Tom Lane committed
2781
		{
2782 2783
			ereport(emode,
					(errcode_for_file_access(),
2784
					 errmsg("could not read from log file %u, segment %u at offset %u: %m",
2785
							readId, readSeg, readOff)));
Tom Lane's avatar
Tom Lane committed
2786 2787
			goto next_record_is_invalid;
		}
2788
		if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2789 2790
			goto next_record_is_invalid;
	}
2791
	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2792
	targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
2793 2794 2795
	if (targetRecOff == 0)
	{
		/*
2796 2797 2798
		 * Can only get here in the continuing-from-prev-page case, because
		 * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
		 * to skip over the new page's header.
2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809
		 */
		tmpRecPtr.xrecoff += pageHeaderSize;
		targetRecOff = pageHeaderSize;
	}
	else if (targetRecOff < pageHeaderSize)
	{
		ereport(emode,
				(errmsg("invalid record offset at %X/%X",
						RecPtr->xlogid, RecPtr->xrecoff)));
		goto next_record_is_invalid;
	}
Tom Lane's avatar
Tom Lane committed
2810
	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2811
		targetRecOff == pageHeaderSize)
2812
	{
2813 2814 2815
		ereport(emode,
				(errmsg("contrecord is requested by %X/%X",
						RecPtr->xlogid, RecPtr->xrecoff)));
2816 2817
		goto next_record_is_invalid;
	}
2818
	record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
2819 2820

got_record:;
2821

Tom Lane's avatar
Tom Lane committed
2822
	/*
2823 2824
	 * Currently, xl_len == 0 must be bad data, but that might not be true
	 * forever.  See note in XLogInsert.
Tom Lane's avatar
Tom Lane committed
2825
	 */
2826 2827
	if (record->xl_len == 0)
	{
2828 2829 2830
		ereport(emode,
				(errmsg("record with zero length at %X/%X",
						RecPtr->xlogid, RecPtr->xrecoff)));
2831 2832
		goto next_record_is_invalid;
	}
2833 2834 2835 2836 2837 2838 2839 2840 2841
	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
	{
		ereport(emode,
				(errmsg("invalid record length at %X/%X",
						RecPtr->xlogid, RecPtr->xrecoff)));
		goto next_record_is_invalid;
	}
2842 2843 2844 2845
	if (record->xl_rmid > RM_MAX_ID)
	{
		ereport(emode,
				(errmsg("invalid resource manager ID %u at %X/%X",
2846
						record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
2847 2848
		goto next_record_is_invalid;
	}
2849 2850 2851
	if (randAccess)
	{
		/*
2852 2853
		 * We can't exactly verify the prev-link, but surely it should be less
		 * than the record's own address.
2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866
		 */
		if (!XLByteLT(record->xl_prev, *RecPtr))
		{
			ereport(emode,
					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
							record->xl_prev.xlogid, record->xl_prev.xrecoff,
							RecPtr->xlogid, RecPtr->xrecoff)));
			goto next_record_is_invalid;
		}
	}
	else
	{
		/*
2867 2868 2869
		 * Record's prev-link should exactly match our previous location. This
		 * check guards against torn WAL pages where a stale but valid-looking
		 * WAL record starts on a sector boundary.
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879
		 */
		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
		{
			ereport(emode,
					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
							record->xl_prev.xlogid, record->xl_prev.xrecoff,
							RecPtr->xlogid, RecPtr->xrecoff)));
			goto next_record_is_invalid;
		}
	}
2880

Tom Lane's avatar
Tom Lane committed
2881
	/*
2882
	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
2883 2884 2885 2886
	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
	 * enough for all "normal" records, but very large commit or abort records
	 * might need more space.)
Tom Lane's avatar
Tom Lane committed
2887
	 */
2888
	total_len = record->xl_tot_len;
2889
	if (total_len > readRecordBufSize)
2890
	{
2891 2892
		uint32		newSize = total_len;

2893 2894
		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907
		if (readRecordBuf)
			free(readRecordBuf);
		readRecordBuf = (char *) malloc(newSize);
		if (!readRecordBuf)
		{
			readRecordBufSize = 0;
			/* We treat this as a "bogus data" condition */
			ereport(emode,
					(errmsg("record length %u at %X/%X too long",
							total_len, RecPtr->xlogid, RecPtr->xrecoff)));
			goto next_record_is_invalid;
		}
		readRecordBufSize = newSize;
2908
	}
2909 2910

	buffer = readRecordBuf;
2911
	nextRecord = NULL;
2912
	len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
Tom Lane's avatar
Tom Lane committed
2913
	if (total_len > len)
2914
	{
Tom Lane's avatar
Tom Lane committed
2915 2916
		/* Need to reassemble record */
		XLogContRecord *contrecord;
2917
		uint32		gotlen = len;
2918

Tom Lane's avatar
Tom Lane committed
2919
		memcpy(buffer, record, len);
2920
		record = (XLogRecord *) buffer;
Tom Lane's avatar
Tom Lane committed
2921
		buffer += len;
2922
		for (;;)
2923
		{
2924
			readOff += XLOG_BLCKSZ;
Tom Lane's avatar
Tom Lane committed
2925
			if (readOff >= XLogSegSize)
2926 2927
			{
				close(readFile);
Tom Lane's avatar
Tom Lane committed
2928 2929
				readFile = -1;
				NextLogSeg(readId, readSeg);
2930
				readFile = XLogFileRead(readId, readSeg, emode);
2931 2932
				if (readFile < 0)
					goto next_record_is_invalid;
Tom Lane's avatar
Tom Lane committed
2933
				readOff = 0;
2934
			}
2935
			if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
Tom Lane's avatar
Tom Lane committed
2936
			{
2937 2938
				ereport(emode,
						(errcode_for_file_access(),
Tom Lane's avatar
Tom Lane committed
2939
						 errmsg("could not read from log file %u, segment %u, offset %u: %m",
2940
								readId, readSeg, readOff)));
Tom Lane's avatar
Tom Lane committed
2941 2942
				goto next_record_is_invalid;
			}
2943
			if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2944
				goto next_record_is_invalid;
Tom Lane's avatar
Tom Lane committed
2945
			if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
2946
			{
2947 2948 2949
				ereport(emode,
						(errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
								readId, readSeg, readOff)));
2950 2951
				goto next_record_is_invalid;
			}
2952 2953
			pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
			contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
2954
			if (contrecord->xl_rem_len == 0 ||
Tom Lane's avatar
Tom Lane committed
2955
				total_len != (contrecord->xl_rem_len + gotlen))
2956
			{
2957 2958 2959 2960
				ereport(emode,
						(errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
								contrecord->xl_rem_len,
								readId, readSeg, readOff)));
2961 2962
				goto next_record_is_invalid;
			}
2963
			len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
Tom Lane's avatar
Tom Lane committed
2964
			if (contrecord->xl_rem_len > len)
2965
			{
2966
				memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
Tom Lane's avatar
Tom Lane committed
2967 2968 2969 2970 2971 2972 2973 2974 2975 2976
				gotlen += len;
				buffer += len;
				continue;
			}
			memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
				   contrecord->xl_rem_len);
			break;
		}
		if (!RecordIsValid(record, *RecPtr, emode))
			goto next_record_is_invalid;
2977
		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2978
		if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
2979
			MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
Tom Lane's avatar
Tom Lane committed
2980
		{
2981
			nextRecord = (XLogRecord *) ((char *) contrecord +
2982
					MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
Tom Lane's avatar
Tom Lane committed
2983 2984 2985
		}
		EndRecPtr.xlogid = readId;
		EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2986 2987
			pageHeaderSize +
			MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
Tom Lane's avatar
Tom Lane committed
2988 2989
		ReadRecPtr = *RecPtr;
		return record;
2990 2991
	}

Tom Lane's avatar
Tom Lane committed
2992 2993 2994
	/* Record does not cross a page boundary */
	if (!RecordIsValid(record, *RecPtr, emode))
		goto next_record_is_invalid;
2995
	if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
Tom Lane's avatar
Tom Lane committed
2996 2997 2998 2999 3000 3001 3002
		MAXALIGN(total_len))
		nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
	EndRecPtr.xlogid = RecPtr->xlogid;
	EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
	ReadRecPtr = *RecPtr;
	memcpy(buffer, record, total_len);
	return (XLogRecord *) buffer;
3003

Tom Lane's avatar
Tom Lane committed
3004 3005 3006 3007 3008
next_record_is_invalid:;
	close(readFile);
	readFile = -1;
	nextRecord = NULL;
	return NULL;
3009 3010
}

3011 3012 3013 3014
/*
 * Check whether the xlog header of a page just read in looks valid.
 *
 * This is just a convenience subroutine to avoid duplicated code in
3015
 * ReadRecord.	It's not intended for use from anywhere else.
3016 3017
 */
static bool
3018
ValidXLOGHeader(XLogPageHeader hdr, int emode)
3019
{
3020 3021
	XLogRecPtr	recaddr;

3022 3023
	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
	{
3024 3025 3026
		ereport(emode,
				(errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
						hdr->xlp_magic, readId, readSeg, readOff)));
3027 3028 3029 3030
		return false;
	}
	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
	{
3031 3032 3033
		ereport(emode,
				(errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
						hdr->xlp_info, readId, readSeg, readOff)));
3034 3035
		return false;
	}
3036
	if (hdr->xlp_info & XLP_LONG_HEADER)
3037
	{
3038
		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3039

3040
		if (longhdr->xlp_sysid != ControlFile->system_identifier)
3041
		{
3042 3043
			char		fhdrident_str[32];
			char		sysident_str[32];
3044

3045
			/*
3046 3047
			 * Format sysids separately to keep platform-dependent format code
			 * out of the translatable message string.
3048 3049 3050 3051 3052 3053 3054
			 */
			snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
					 longhdr->xlp_sysid);
			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
					 ControlFile->system_identifier);
			ereport(emode,
					(errmsg("WAL file is from different system"),
3055 3056
					 errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
							   fhdrident_str, sysident_str)));
3057 3058 3059 3060 3061 3062
			return false;
		}
		if (longhdr->xlp_seg_size != XLogSegSize)
		{
			ereport(emode,
					(errmsg("WAL file is from different system"),
3063
					 errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3064 3065
			return false;
		}
3066 3067 3068 3069 3070 3071 3072
		if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
		{
			ereport(emode,
					(errmsg("WAL file is from different system"),
					 errdetail("Incorrect XLOG_BLCKSZ in page header.")));
			return false;
		}
3073
	}
3074 3075 3076 3077 3078 3079 3080 3081 3082
	else if (readOff == 0)
	{
		/* hmm, first page of file doesn't have a long header? */
		ereport(emode,
				(errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
						hdr->xlp_info, readId, readSeg, readOff)));
		return false;
	}

3083 3084 3085 3086 3087 3088
	recaddr.xlogid = readId;
	recaddr.xrecoff = readSeg * XLogSegSize + readOff;
	if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
	{
		ereport(emode,
				(errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3089
						hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110
						readId, readSeg, readOff)));
		return false;
	}

	/*
	 * Check page TLI is one of the expected values.
	 */
	if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
	{
		ereport(emode,
				(errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
						hdr->xlp_tli,
						readId, readSeg, readOff)));
		return false;
	}

	/*
	 * Since child timelines are always assigned a TLI greater than their
	 * immediate parent's TLI, we should never see TLI go backwards across
	 * successive pages of a consistent WAL sequence.
	 *
3111 3112 3113
	 * Of course this check should only be applied when advancing sequentially
	 * across pages; therefore ReadRecord resets lastPageTLI to zero when
	 * going to a random page.
3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130
	 */
	if (hdr->xlp_tli < lastPageTLI)
	{
		ereport(emode,
				(errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
						hdr->xlp_tli, lastPageTLI,
						readId, readSeg, readOff)));
		return false;
	}
	lastPageTLI = hdr->xlp_tli;
	return true;
}

/*
 * Try to read a timeline's history file.
 *
 * If successful, return the list of component TLIs (the given TLI followed by
Bruce Momjian's avatar
Bruce Momjian committed
3131
 * its ancestor TLIs).	If we can't find the history file, assume that the
3132 3133 3134 3135 3136 3137 3138 3139 3140 3141
 * timeline has no parents, and return a list of just the specified timeline
 * ID.
 */
static List *
readTimeLineHistory(TimeLineID targetTLI)
{
	List	   *result;
	char		path[MAXPGPATH];
	char		histfname[MAXFNAMELEN];
	char		fline[MAXPGPATH];
Bruce Momjian's avatar
Bruce Momjian committed
3142
	FILE	   *fd;
3143 3144 3145 3146

	if (InArchiveRecovery)
	{
		TLHistoryFileName(histfname, targetTLI);
3147
		RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3148 3149 3150 3151
	}
	else
		TLHistoryFilePath(path, targetTLI);

Bruce Momjian's avatar
Bruce Momjian committed
3152
	fd = AllocateFile(path, "r");
3153 3154 3155 3156 3157
	if (fd == NULL)
	{
		if (errno != ENOENT)
			ereport(FATAL,
					(errcode_for_file_access(),
Peter Eisentraut's avatar
Peter Eisentraut committed
3158
					 errmsg("could not open file \"%s\": %m", path)));
3159 3160 3161 3162 3163 3164
		/* Not there, so assume no parents */
		return list_make1_int((int) targetTLI);
	}

	result = NIL;

Bruce Momjian's avatar
Bruce Momjian committed
3165 3166 3167 3168
	/*
	 * Parse the file...
	 */
	while (fgets(fline, MAXPGPATH, fd) != NULL)
3169 3170
	{
		/* skip leading whitespace and check for # comment */
Bruce Momjian's avatar
Bruce Momjian committed
3171 3172 3173
		char	   *ptr;
		char	   *endptr;
		TimeLineID	tli;
3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193

		for (ptr = fline; *ptr; ptr++)
		{
			if (!isspace((unsigned char) *ptr))
				break;
		}
		if (*ptr == '\0' || *ptr == '#')
			continue;

		/* expect a numeric timeline ID as first field of line */
		tli = (TimeLineID) strtoul(ptr, &endptr, 0);
		if (endptr == ptr)
			ereport(FATAL,
					(errmsg("syntax error in history file: %s", fline),
					 errhint("Expected a numeric timeline ID.")));

		if (result &&
			tli <= (TimeLineID) linitial_int(result))
			ereport(FATAL,
					(errmsg("invalid data in history file: %s", fline),
3194
				   errhint("Timeline IDs must be in increasing sequence.")));
3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207

		/* Build list with newest item first */
		result = lcons_int((int) tli, result);

		/* we ignore the remainder of each line */
	}

	FreeFile(fd);

	if (result &&
		targetTLI <= (TimeLineID) linitial_int(result))
		ereport(FATAL,
				(errmsg("invalid data in history file \"%s\"", path),
3208
			errhint("Timeline IDs must be less than child timeline's ID.")));
3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226

	result = lcons_int((int) targetTLI, result);

	ereport(DEBUG3,
			(errmsg_internal("history of timeline %u is %s",
							 targetTLI, nodeToString(result))));

	return result;
}

/*
 * Probe whether a timeline history file exists for the given timeline ID
 */
static bool
existsTimeLineHistory(TimeLineID probeTLI)
{
	char		path[MAXPGPATH];
	char		histfname[MAXFNAMELEN];
Bruce Momjian's avatar
Bruce Momjian committed
3227
	FILE	   *fd;
3228 3229 3230 3231

	if (InArchiveRecovery)
	{
		TLHistoryFileName(histfname, probeTLI);
3232
		RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247
	}
	else
		TLHistoryFilePath(path, probeTLI);

	fd = AllocateFile(path, "r");
	if (fd != NULL)
	{
		FreeFile(fd);
		return true;
	}
	else
	{
		if (errno != ENOENT)
			ereport(FATAL,
					(errcode_for_file_access(),
Peter Eisentraut's avatar
Peter Eisentraut committed
3248
					 errmsg("could not open file \"%s\": %m", path)));
3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266
		return false;
	}
}

/*
 * Find the newest existing timeline, assuming that startTLI exists.
 *
 * Note: while this is somewhat heuristic, it does positively guarantee
 * that (result + 1) is not a known timeline, and therefore it should
 * be safe to assign that ID to a new timeline.
 */
static TimeLineID
findNewestTimeLine(TimeLineID startTLI)
{
	TimeLineID	newestTLI;
	TimeLineID	probeTLI;

	/*
3267 3268
	 * The algorithm is just to probe for the existence of timeline history
	 * files.  XXX is it useful to allow gaps in the sequence?
3269 3270 3271
	 */
	newestTLI = startTLI;

Bruce Momjian's avatar
Bruce Momjian committed
3272
	for (probeTLI = startTLI + 1;; probeTLI++)
3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295
	{
		if (existsTimeLineHistory(probeTLI))
		{
			newestTLI = probeTLI;		/* probeTLI exists */
		}
		else
		{
			/* doesn't exist, assume we're done */
			break;
		}
	}

	return newestTLI;
}

/*
 * Create a new timeline history file.
 *
 *	newTLI: ID of the new timeline
 *	parentTLI: ID of its immediate parent
 *	endTLI et al: ID of the last used WAL file, for annotation purposes
 *
 * Currently this is only used during recovery, and so there are no locking
Bruce Momjian's avatar
Bruce Momjian committed
3296
 * considerations.	But we should be just as tense as XLogFileInit to avoid
3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311
 * emplacing a bogus file.
 */
static void
writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
					 TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
{
	char		path[MAXPGPATH];
	char		tmppath[MAXPGPATH];
	char		histfname[MAXFNAMELEN];
	char		xlogfname[MAXFNAMELEN];
	char		buffer[BLCKSZ];
	int			srcfd;
	int			fd;
	int			nbytes;

Bruce Momjian's avatar
Bruce Momjian committed
3312
	Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3313 3314 3315 3316

	/*
	 * Write into a temp file name.
	 */
3317
	snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3318 3319 3320 3321 3322 3323 3324

	unlink(tmppath);

	/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
					   S_IRUSR | S_IWUSR);
	if (fd < 0)
3325
		ereport(ERROR,
3326 3327 3328 3329 3330 3331 3332 3333 3334
				(errcode_for_file_access(),
				 errmsg("could not create file \"%s\": %m", tmppath)));

	/*
	 * If a history file exists for the parent, copy it verbatim
	 */
	if (InArchiveRecovery)
	{
		TLHistoryFileName(histfname, parentTLI);
3335
		RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3336 3337 3338 3339 3340 3341 3342 3343
	}
	else
		TLHistoryFilePath(path, parentTLI);

	srcfd = BasicOpenFile(path, O_RDONLY, 0);
	if (srcfd < 0)
	{
		if (errno != ENOENT)
3344
			ereport(ERROR,
3345
					(errcode_for_file_access(),
Peter Eisentraut's avatar
Peter Eisentraut committed
3346
					 errmsg("could not open file \"%s\": %m", path)));
3347 3348 3349 3350 3351 3352 3353 3354 3355
		/* Not there, so assume parent has no parents */
	}
	else
	{
		for (;;)
		{
			errno = 0;
			nbytes = (int) read(srcfd, buffer, sizeof(buffer));
			if (nbytes < 0 || errno != 0)
3356
				ereport(ERROR,
3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370
						(errcode_for_file_access(),
						 errmsg("could not read file \"%s\": %m", path)));
			if (nbytes == 0)
				break;
			errno = 0;
			if ((int) write(fd, buffer, nbytes) != nbytes)
			{
				int			save_errno = errno;

				/*
				 * If we fail to make the file, delete it to release disk
				 * space
				 */
				unlink(tmppath);
Bruce Momjian's avatar
Bruce Momjian committed
3371 3372

				/*
3373
				 * if write didn't set errno, assume problem is no disk space
Bruce Momjian's avatar
Bruce Momjian committed
3374
				 */
3375 3376
				errno = save_errno ? save_errno : ENOSPC;

3377
				ereport(ERROR,
3378
						(errcode_for_file_access(),
3379
					 errmsg("could not write to file \"%s\": %m", tmppath)));
3380 3381 3382 3383 3384 3385 3386 3387
			}
		}
		close(srcfd);
	}

	/*
	 * Append one line with the details of this timeline split.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
3388 3389
	 * If we did have a parent file, insert an extra newline just in case the
	 * parent file failed to end with one.
3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408
	 */
	XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);

	snprintf(buffer, sizeof(buffer),
			 "%s%u\t%s\t%s transaction %u at %s\n",
			 (srcfd < 0) ? "" : "\n",
			 parentTLI,
			 xlogfname,
			 recoveryStopAfter ? "after" : "before",
			 recoveryStopXid,
			 str_time(recoveryStopTime));

	nbytes = strlen(buffer);
	errno = 0;
	if ((int) write(fd, buffer, nbytes) != nbytes)
	{
		int			save_errno = errno;

		/*
Bruce Momjian's avatar
Bruce Momjian committed
3409
		 * If we fail to make the file, delete it to release disk space
3410 3411 3412 3413 3414
		 */
		unlink(tmppath);
		/* if write didn't set errno, assume problem is no disk space */
		errno = save_errno ? save_errno : ENOSPC;

3415
		ereport(ERROR,
3416 3417 3418 3419 3420
				(errcode_for_file_access(),
				 errmsg("could not write to file \"%s\": %m", tmppath)));
	}

	if (pg_fsync(fd) != 0)
3421
		ereport(ERROR,
3422 3423 3424 3425
				(errcode_for_file_access(),
				 errmsg("could not fsync file \"%s\": %m", tmppath)));

	if (close(fd))
3426
		ereport(ERROR,
3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442
				(errcode_for_file_access(),
				 errmsg("could not close file \"%s\": %m", tmppath)));


	/*
	 * Now move the completed history file into place with its final name.
	 */
	TLHistoryFilePath(path, newTLI);

	/*
	 * Prefer link() to rename() here just to be really sure that we don't
	 * overwrite an existing logfile.  However, there shouldn't be one, so
	 * rename() is an acceptable substitute except for the truly paranoid.
	 */
#if HAVE_WORKING_LINK
	if (link(tmppath, path) < 0)
3443
		ereport(ERROR,
3444 3445 3446 3447 3448 3449
				(errcode_for_file_access(),
				 errmsg("could not link file \"%s\" to \"%s\": %m",
						tmppath, path)));
	unlink(tmppath);
#else
	if (rename(tmppath, path) < 0)
3450
		ereport(ERROR,
3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462
				(errcode_for_file_access(),
				 errmsg("could not rename file \"%s\" to \"%s\": %m",
						tmppath, path)));
#endif

	/* The history file can be archived immediately. */
	TLHistoryFileName(histfname, newTLI);
	XLogArchiveNotify(histfname);
}

/*
 * I/O routines for pg_control
3463 3464
 *
 * *ControlFile is a buffer in shared memory that holds an image of the
3465
 * contents of pg_control.	WriteControlFile() initializes pg_control
3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478
 * given a preloaded buffer, ReadControlFile() loads the buffer from
 * the pg_control file (during postmaster or standalone-backend startup),
 * and UpdateControlFile() rewrites pg_control after we modify xlog state.
 *
 * For simplicity, WriteControlFile() initializes the fields of pg_control
 * that are related to checking backend/database compatibility, and
 * ReadControlFile() verifies they are correct.  We could split out the
 * I/O and compatibility-check functions, but there seems no need currently.
 */
static void
WriteControlFile(void)
{
	int			fd;
3479
	char		buffer[PG_CONTROL_SIZE]; /* need not be aligned */
3480 3481 3482
	char	   *localeptr;

	/*
Tom Lane's avatar
Tom Lane committed
3483
	 * Initialize version and compatibility-check fields
3484
	 */
Tom Lane's avatar
Tom Lane committed
3485 3486
	ControlFile->pg_control_version = PG_CONTROL_VERSION;
	ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3487 3488 3489 3490

	ControlFile->maxAlign = MAXIMUM_ALIGNOF;
	ControlFile->floatFormat = FLOATFORMAT_VALUE;

3491 3492
	ControlFile->blcksz = BLCKSZ;
	ControlFile->relseg_size = RELSEG_SIZE;
3493
	ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3494
	ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3495 3496

	ControlFile->nameDataLen = NAMEDATALEN;
3497
	ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3498 3499 3500 3501 3502 3503 3504 3505

#ifdef HAVE_INT64_TIMESTAMP
	ControlFile->enableIntTimes = TRUE;
#else
	ControlFile->enableIntTimes = FALSE;
#endif

	ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3506 3507
	localeptr = setlocale(LC_COLLATE, NULL);
	if (!localeptr)
3508 3509
		ereport(PANIC,
				(errmsg("invalid LC_COLLATE setting")));
3510 3511 3512
	StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
	localeptr = setlocale(LC_CTYPE, NULL);
	if (!localeptr)
3513 3514
		ereport(PANIC,
				(errmsg("invalid LC_CTYPE setting")));
3515
	StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3516

Tom Lane's avatar
Tom Lane committed
3517
	/* Contents are protected with a CRC */
3518 3519 3520 3521 3522
	INIT_CRC32(ControlFile->crc);
	COMP_CRC32(ControlFile->crc,
			   (char *) ControlFile,
			   offsetof(ControlFileData, crc));
	FIN_CRC32(ControlFile->crc);
Tom Lane's avatar
Tom Lane committed
3523

3524
	/*
3525 3526 3527 3528 3529
	 * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
	 * excess over sizeof(ControlFileData).  This reduces the odds of
	 * premature-EOF errors when reading pg_control.  We'll still fail when we
	 * check the contents of the file, but hopefully with a more specific
	 * error than "couldn't read pg_control".
3530
	 */
3531 3532
	if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
		elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3533

3534
	memset(buffer, 0, PG_CONTROL_SIZE);
3535 3536
	memcpy(buffer, ControlFile, sizeof(ControlFileData));

3537 3538
	fd = BasicOpenFile(XLOG_CONTROL_FILE,
					   O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3539
					   S_IRUSR | S_IWUSR);
3540
	if (fd < 0)
3541 3542 3543
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not create control file \"%s\": %m",
3544
						XLOG_CONTROL_FILE)));
3545

3546
	errno = 0;
3547
	if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3548 3549 3550 3551
	{
		/* if write didn't set errno, assume problem is no disk space */
		if (errno == 0)
			errno = ENOSPC;
3552 3553
		ereport(PANIC,
				(errcode_for_file_access(),
3554
				 errmsg("could not write to control file: %m")));
3555
	}
3556

3557
	if (pg_fsync(fd) != 0)
3558 3559
		ereport(PANIC,
				(errcode_for_file_access(),
3560
				 errmsg("could not fsync control file: %m")));
3561

3562 3563 3564 3565
	if (close(fd))
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not close control file: %m")));
3566 3567 3568 3569 3570
}

static void
ReadControlFile(void)
{
3571
	pg_crc32	crc;
3572 3573 3574 3575 3576
	int			fd;

	/*
	 * Read data...
	 */
3577 3578 3579
	fd = BasicOpenFile(XLOG_CONTROL_FILE,
					   O_RDWR | PG_BINARY,
					   S_IRUSR | S_IWUSR);
3580
	if (fd < 0)
3581 3582 3583
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not open control file \"%s\": %m",
3584
						XLOG_CONTROL_FILE)));
3585 3586

	if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3587 3588
		ereport(PANIC,
				(errcode_for_file_access(),
3589
				 errmsg("could not read from control file: %m")));
3590 3591 3592

	close(fd);

Tom Lane's avatar
Tom Lane committed
3593
	/*
3594 3595 3596 3597
	 * Check for expected pg_control format version.  If this is wrong, the
	 * CRC check will likely fail because we'll be checking the wrong number
	 * of bytes.  Complaining about wrong version will probably be more
	 * enlightening than complaining about wrong CRC.
Tom Lane's avatar
Tom Lane committed
3598 3599
	 */
	if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3600 3601 3602
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3603 3604
				  " but the server was compiled with PG_CONTROL_VERSION %d.",
						ControlFile->pg_control_version, PG_CONTROL_VERSION),
3605
				 errhint("It looks like you need to initdb.")));
Tom Lane's avatar
Tom Lane committed
3606
	/* Now check the CRC. */
3607 3608 3609 3610 3611
	INIT_CRC32(crc);
	COMP_CRC32(crc,
			   (char *) ControlFile,
			   offsetof(ControlFileData, crc));
	FIN_CRC32(crc);
3612

3613
	if (!EQ_CRC32(crc, ControlFile->crc))
3614
		ereport(FATAL,
3615
				(errmsg("incorrect checksum in control file")));
3616

3617
	/*
3618
	 * Do compatibility checking immediately.  We do this here for 2 reasons:
3619
	 *
3620 3621
	 * (1) if the database isn't compatible with the backend executable, we
	 * want to abort before we can possibly do any damage;
3622 3623
	 *
	 * (2) this code is executed in the postmaster, so the setlocale() will
3624 3625
	 * propagate to forked backends, which aren't going to read this file for
	 * themselves.	(These locale settings are considered critical
3626 3627
	 * compatibility items because they can affect sort order of indexes.)
	 */
Tom Lane's avatar
Tom Lane committed
3628
	if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3629 3630 3631
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3632 3633
				  " but the server was compiled with CATALOG_VERSION_NO %d.",
						ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3634
				 errhint("It looks like you need to initdb.")));
3635 3636 3637
	if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
3638 3639 3640 3641
		   errdetail("The database cluster was initialized with MAXALIGN %d,"
					 " but the server was compiled with MAXALIGN %d.",
					 ControlFile->maxAlign, MAXIMUM_ALIGNOF),
				 errhint("It looks like you need to initdb.")));
3642 3643 3644
	if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
Peter Eisentraut's avatar
Peter Eisentraut committed
3645
				 errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3646
				 errhint("It looks like you need to initdb.")));
3647
	if (ControlFile->blcksz != BLCKSZ)
3648 3649
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
3650 3651 3652 3653
			 errdetail("The database cluster was initialized with BLCKSZ %d,"
					   " but the server was compiled with BLCKSZ %d.",
					   ControlFile->blcksz, BLCKSZ),
				 errhint("It looks like you need to recompile or initdb.")));
3654
	if (ControlFile->relseg_size != RELSEG_SIZE)
3655 3656
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
3657 3658 3659 3660
		errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
				  " but the server was compiled with RELSEG_SIZE %d.",
				  ControlFile->relseg_size, RELSEG_SIZE),
				 errhint("It looks like you need to recompile or initdb.")));
3661 3662 3663 3664 3665 3666 3667
	if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
			 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
					   " but the server was compiled with XLOG_BLCKSZ %d.",
					   ControlFile->xlog_blcksz, XLOG_BLCKSZ),
				 errhint("It looks like you need to recompile or initdb.")));
3668 3669 3670 3671
	if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3672
					   " but the server was compiled with XLOG_SEG_SIZE %d.",
3673
						   ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3674
				 errhint("It looks like you need to recompile or initdb.")));
3675
	if (ControlFile->nameDataLen != NAMEDATALEN)
3676 3677
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
3678 3679 3680 3681
		errdetail("The database cluster was initialized with NAMEDATALEN %d,"
				  " but the server was compiled with NAMEDATALEN %d.",
				  ControlFile->nameDataLen, NAMEDATALEN),
				 errhint("It looks like you need to recompile or initdb.")));
3682
	if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3683 3684
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
3685
				 errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3686
					  " but the server was compiled with INDEX_MAX_KEYS %d.",
3687
						   ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3688
				 errhint("It looks like you need to recompile or initdb.")));
3689 3690 3691

#ifdef HAVE_INT64_TIMESTAMP
	if (ControlFile->enableIntTimes != TRUE)
3692 3693 3694
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3695 3696
				  " but the server was compiled with HAVE_INT64_TIMESTAMP."),
				 errhint("It looks like you need to recompile or initdb.")));
3697 3698
#else
	if (ControlFile->enableIntTimes != FALSE)
3699 3700 3701
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3702 3703
			   " but the server was compiled without HAVE_INT64_TIMESTAMP."),
				 errhint("It looks like you need to recompile or initdb.")));
3704 3705 3706
#endif

	if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3707 3708 3709
		ereport(FATAL,
				(errmsg("database files are incompatible with server"),
				 errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3710
				  " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3711
						   ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3712
				 errhint("It looks like you need to recompile or initdb.")));
3713
	if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3714
		ereport(FATAL,
3715 3716 3717 3718 3719
			(errmsg("database files are incompatible with operating system"),
			 errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
					   " which is not recognized by setlocale().",
					   ControlFile->lc_collate),
			 errhint("It looks like you need to initdb or install locale support.")));
3720
	if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3721
		ereport(FATAL,
3722 3723 3724 3725 3726
			(errmsg("database files are incompatible with operating system"),
		errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
				  " which is not recognized by setlocale().",
				  ControlFile->lc_ctype),
			 errhint("It looks like you need to initdb or install locale support.")));
3727 3728 3729 3730 3731 3732

	/* Make the fixed locale settings visible as GUC variables, too */
	SetConfigOption("lc_collate", ControlFile->lc_collate,
					PGC_INTERNAL, PGC_S_OVERRIDE);
	SetConfigOption("lc_ctype", ControlFile->lc_ctype,
					PGC_INTERNAL, PGC_S_OVERRIDE);
3733 3734
}

3735
void
3736
UpdateControlFile(void)
3737
{
3738
	int			fd;
3739

3740 3741 3742 3743 3744
	INIT_CRC32(ControlFile->crc);
	COMP_CRC32(ControlFile->crc,
			   (char *) ControlFile,
			   offsetof(ControlFileData, crc));
	FIN_CRC32(ControlFile->crc);
3745

3746 3747 3748
	fd = BasicOpenFile(XLOG_CONTROL_FILE,
					   O_RDWR | PG_BINARY,
					   S_IRUSR | S_IWUSR);
3749
	if (fd < 0)
3750 3751 3752
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not open control file \"%s\": %m",
3753
						XLOG_CONTROL_FILE)));
3754

3755
	errno = 0;
3756
	if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3757 3758 3759 3760
	{
		/* if write didn't set errno, assume problem is no disk space */
		if (errno == 0)
			errno = ENOSPC;
3761 3762
		ereport(PANIC,
				(errcode_for_file_access(),
3763
				 errmsg("could not write to control file: %m")));
3764
	}
3765

3766
	if (pg_fsync(fd) != 0)
3767 3768
		ereport(PANIC,
				(errcode_for_file_access(),
3769
				 errmsg("could not fsync control file: %m")));
3770

3771 3772 3773 3774
	if (close(fd))
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not close control file: %m")));
3775 3776
}

3777
/*
Tom Lane's avatar
Tom Lane committed
3778
 * Initialization of shared memory for XLOG
3779
 */
3780
Size
3781
XLOGShmemSize(void)
3782
{
3783
	Size		size;
3784

3785 3786 3787 3788 3789 3790 3791
	/* XLogCtl */
	size = sizeof(XLogCtlData);
	/* xlblocks array */
	size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
	/* extra alignment padding for XLOG I/O buffers */
	size = add_size(size, ALIGNOF_XLOG_BUFFER);
	/* and the buffers themselves */
3792
	size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
3793 3794

	/*
3795 3796 3797
	 * Note: we don't count ControlFileData, it comes out of the "slop factor"
	 * added by CreateSharedMemoryAndSemaphores.  This lets us use this
	 * routine again below to compute the actual allocation size.
3798 3799 3800
	 */

	return size;
3801 3802 3803 3804 3805
}

void
XLOGShmemInit(void)
{
3806 3807
	bool		foundCFile,
				foundXLog;
3808
	char	   *allocptr;
3809

3810
	ControlFile = (ControlFileData *)
3811
		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
3812 3813
	XLogCtl = (XLogCtlData *)
		ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
3814

3815
	if (foundCFile || foundXLog)
3816 3817
	{
		/* both should be present or neither */
3818
		Assert(foundCFile && foundXLog);
3819 3820
		return;
	}
3821

Tom Lane's avatar
Tom Lane committed
3822
	memset(XLogCtl, 0, sizeof(XLogCtlData));
3823

Tom Lane's avatar
Tom Lane committed
3824
	/*
3825 3826 3827
	 * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
	 * multiple of the alignment for same, so no extra alignment padding is
	 * needed here.
Tom Lane's avatar
Tom Lane committed
3828
	 */
3829 3830
	allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
	XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
Tom Lane's avatar
Tom Lane committed
3831
	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
3832
	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
3833

Tom Lane's avatar
Tom Lane committed
3834
	/*
3835
	 * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
Tom Lane's avatar
Tom Lane committed
3836
	 */
3837 3838
	allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
	XLogCtl->pages = allocptr;
3839
	memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
Tom Lane's avatar
Tom Lane committed
3840 3841

	/*
3842 3843
	 * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
	 * in additional info.)
Tom Lane's avatar
Tom Lane committed
3844
	 */
3845
	XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ * XLOGbuffers;
3846

Tom Lane's avatar
Tom Lane committed
3847 3848
	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
3849
	SpinLockInit(&XLogCtl->info_lck);
Tom Lane's avatar
Tom Lane committed
3850

3851
	/*
3852 3853 3854
	 * If we are not in bootstrap mode, pg_control should already exist. Read
	 * and validate it immediately (see comments in ReadControlFile() for the
	 * reasons why).
3855 3856 3857
	 */
	if (!IsBootstrapProcessingMode())
		ReadControlFile();
3858 3859 3860
}

/*
Tom Lane's avatar
Tom Lane committed
3861 3862
 * This func must be called ONCE on system install.  It creates pg_control
 * and the initial XLOG segment.
3863 3864
 */
void
Tom Lane's avatar
Tom Lane committed
3865
BootStrapXLOG(void)
3866
{
3867
	CheckPoint	checkPoint;
Tom Lane's avatar
Tom Lane committed
3868 3869
	char	   *buffer;
	XLogPageHeader page;
3870
	XLogLongPageHeader longpage;
3871
	XLogRecord *record;
3872
	bool		use_existent;
3873 3874
	uint64		sysidentifier;
	struct timeval tv;
3875
	pg_crc32	crc;
3876

3877
	/*
3878 3879 3880 3881 3882 3883 3884 3885 3886 3887
	 * Select a hopefully-unique system identifier code for this installation.
	 * We use the result of gettimeofday(), including the fractional seconds
	 * field, as being about as unique as we can easily get.  (Think not to
	 * use random(), since it hasn't been seeded and there's no portable way
	 * to seed it other than the system clock value...)  The upper half of the
	 * uint64 value is just the tv_sec part, while the lower half is the XOR
	 * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
	 * unnecessarily if "uint64" is really only 32 bits wide.  A person
	 * knowing this encoding can determine the initialization time of the
	 * installation, which could perhaps be useful sometimes.
3888 3889 3890 3891 3892
	 */
	gettimeofday(&tv, NULL);
	sysidentifier = ((uint64) tv.tv_sec) << 32;
	sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);

3893 3894 3895
	/* First timeline ID is always 1 */
	ThisTimeLineID = 1;

3896
	/* page buffer must be aligned suitably for O_DIRECT */
3897
	buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
3898
	page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
3899
	memset(page, 0, XLOG_BLCKSZ);
Tom Lane's avatar
Tom Lane committed
3900

3901
	/* Set up information for the initial checkpoint record */
3902
	checkPoint.redo.xlogid = 0;
3903
	checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
3904
	checkPoint.undo = checkPoint.redo;
3905
	checkPoint.ThisTimeLineID = ThisTimeLineID;
3906
	checkPoint.nextXid = FirstNormalTransactionId;
3907
	checkPoint.nextOid = FirstBootstrapObjectId;
3908
	checkPoint.nextMulti = FirstMultiXactId;
3909
	checkPoint.nextMultiOffset = 0;
Tom Lane's avatar
Tom Lane committed
3910
	checkPoint.time = time(NULL);
3911

3912 3913 3914
	ShmemVariableCache->nextXid = checkPoint.nextXid;
	ShmemVariableCache->nextOid = checkPoint.nextOid;
	ShmemVariableCache->oidCount = 0;
3915
	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
3916

3917
	/* Set up the XLOG page header */
3918
	page->xlp_magic = XLOG_PAGE_MAGIC;
3919 3920
	page->xlp_info = XLP_LONG_HEADER;
	page->xlp_tli = ThisTimeLineID;
3921 3922
	page->xlp_pageaddr.xlogid = 0;
	page->xlp_pageaddr.xrecoff = 0;
3923 3924 3925
	longpage = (XLogLongPageHeader) page;
	longpage->xlp_sysid = sysidentifier;
	longpage->xlp_seg_size = XLogSegSize;
3926
	longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
3927 3928

	/* Insert the initial checkpoint record */
3929
	record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
3930
	record->xl_prev.xlogid = 0;
3931
	record->xl_prev.xrecoff = 0;
3932
	record->xl_xid = InvalidTransactionId;
3933
	record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
3934
	record->xl_len = sizeof(checkPoint);
Tom Lane's avatar
Tom Lane committed
3935
	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
3936
	record->xl_rmid = RM_XLOG_ID;
Tom Lane's avatar
Tom Lane committed
3937
	memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
3938

3939 3940 3941 3942 3943
	INIT_CRC32(crc);
	COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
	COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
			   SizeOfXLogRecord - sizeof(pg_crc32));
	FIN_CRC32(crc);
3944 3945
	record->xl_crc = crc;

3946
	/* Create first XLOG segment file */
3947 3948
	use_existent = false;
	openLogFile = XLogFileInit(0, 0, &use_existent, false);
3949

3950
	/* Write the first page with the initial record */
3951
	errno = 0;
3952
	if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3953 3954 3955 3956
	{
		/* if write didn't set errno, assume problem is no disk space */
		if (errno == 0)
			errno = ENOSPC;
3957 3958
		ereport(PANIC,
				(errcode_for_file_access(),
3959
			  errmsg("could not write bootstrap transaction log file: %m")));
3960
	}
3961

Tom Lane's avatar
Tom Lane committed
3962
	if (pg_fsync(openLogFile) != 0)
3963 3964
		ereport(PANIC,
				(errcode_for_file_access(),
3965
			  errmsg("could not fsync bootstrap transaction log file: %m")));
3966

3967 3968 3969
	if (close(openLogFile))
		ereport(PANIC,
				(errcode_for_file_access(),
3970
			  errmsg("could not close bootstrap transaction log file: %m")));
3971

Tom Lane's avatar
Tom Lane committed
3972
	openLogFile = -1;
3973

3974 3975
	/* Now create pg_control */

3976
	memset(ControlFile, 0, sizeof(ControlFileData));
Tom Lane's avatar
Tom Lane committed
3977
	/* Initialize pg_control status fields */
3978
	ControlFile->system_identifier = sysidentifier;
Tom Lane's avatar
Tom Lane committed
3979 3980
	ControlFile->state = DB_SHUTDOWNED;
	ControlFile->time = checkPoint.time;
3981 3982 3983
	ControlFile->logId = 0;
	ControlFile->logSeg = 1;
	ControlFile->checkPoint = checkPoint.redo;
Tom Lane's avatar
Tom Lane committed
3984
	ControlFile->checkPointCopy = checkPoint;
3985
	/* some additional ControlFile fields are set in WriteControlFile() */
3986

3987
	WriteControlFile();
3988 3989 3990

	/* Bootstrap the commit log, too */
	BootStrapCLOG();
3991
	BootStrapSUBTRANS();
3992
	BootStrapMultiXact();
3993

3994
	pfree(buffer);
3995 3996
}

3997
static char *
3998 3999
str_time(time_t tnow)
{
4000
	static char buf[128];
4001

4002
	strftime(buf, sizeof(buf),
Tom Lane's avatar
Tom Lane committed
4003
			 "%Y-%m-%d %H:%M:%S %Z",
4004
			 localtime(&tnow));
4005

4006
	return buf;
4007 4008
}

4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019
/*
 * See if there is a recovery command file (recovery.conf), and if so
 * read in parameters for archive recovery.
 *
 * XXX longer term intention is to expand this to
 * cater for additional parameters and controls
 * possibly use a flex lexer similar to the GUC one
 */
static void
readRecoveryCommandFile(void)
{
Bruce Momjian's avatar
Bruce Momjian committed
4020 4021 4022 4023 4024 4025
	FILE	   *fd;
	char		cmdline[MAXPGPATH];
	TimeLineID	rtli = 0;
	bool		rtliGiven = false;
	bool		syntaxError = false;

4026
	fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4027 4028 4029 4030 4031
	if (fd == NULL)
	{
		if (errno == ENOENT)
			return;				/* not there, so no archive recovery */
		ereport(FATAL,
Bruce Momjian's avatar
Bruce Momjian committed
4032
				(errcode_for_file_access(),
4033
				 errmsg("could not open recovery command file \"%s\": %m",
4034
						RECOVERY_COMMAND_FILE)));
4035 4036 4037
	}

	ereport(LOG,
Bruce Momjian's avatar
Bruce Momjian committed
4038
			(errmsg("starting archive recovery")));
4039

Bruce Momjian's avatar
Bruce Momjian committed
4040 4041 4042 4043
	/*
	 * Parse the file...
	 */
	while (fgets(cmdline, MAXPGPATH, fd) != NULL)
4044 4045
	{
		/* skip leading whitespace and check for # comment */
Bruce Momjian's avatar
Bruce Momjian committed
4046 4047 4048
		char	   *ptr;
		char	   *tok1;
		char	   *tok2;
4049 4050 4051 4052 4053 4054 4055 4056 4057 4058

		for (ptr = cmdline; *ptr; ptr++)
		{
			if (!isspace((unsigned char) *ptr))
				break;
		}
		if (*ptr == '\0' || *ptr == '#')
			continue;

		/* identify the quoted parameter value */
Bruce Momjian's avatar
Bruce Momjian committed
4059
		tok1 = strtok(ptr, "'");
4060 4061 4062 4063 4064
		if (!tok1)
		{
			syntaxError = true;
			break;
		}
Bruce Momjian's avatar
Bruce Momjian committed
4065
		tok2 = strtok(NULL, "'");
4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078
		if (!tok2)
		{
			syntaxError = true;
			break;
		}
		/* reparse to get just the parameter name */
		tok1 = strtok(ptr, " \t=");
		if (!tok1)
		{
			syntaxError = true;
			break;
		}

Bruce Momjian's avatar
Bruce Momjian committed
4079 4080
		if (strcmp(tok1, "restore_command") == 0)
		{
4081
			recoveryRestoreCommand = pstrdup(tok2);
4082 4083 4084 4085
			ereport(LOG,
					(errmsg("restore_command = \"%s\"",
							recoveryRestoreCommand)));
		}
Bruce Momjian's avatar
Bruce Momjian committed
4086 4087
		else if (strcmp(tok1, "recovery_target_timeline") == 0)
		{
4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106
			rtliGiven = true;
			if (strcmp(tok2, "latest") == 0)
				rtli = 0;
			else
			{
				errno = 0;
				rtli = (TimeLineID) strtoul(tok2, NULL, 0);
				if (errno == EINVAL || errno == ERANGE)
					ereport(FATAL,
							(errmsg("recovery_target_timeline is not a valid number: \"%s\"",
									tok2)));
			}
			if (rtli)
				ereport(LOG,
						(errmsg("recovery_target_timeline = %u", rtli)));
			else
				ereport(LOG,
						(errmsg("recovery_target_timeline = latest")));
		}
Bruce Momjian's avatar
Bruce Momjian committed
4107 4108
		else if (strcmp(tok1, "recovery_target_xid") == 0)
		{
4109 4110 4111 4112
			errno = 0;
			recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
			if (errno == EINVAL || errno == ERANGE)
				ereport(FATAL,
4113 4114
				 (errmsg("recovery_target_xid is not a valid number: \"%s\"",
						 tok2)));
4115 4116 4117 4118 4119 4120
			ereport(LOG,
					(errmsg("recovery_target_xid = %u",
							recoveryTargetXid)));
			recoveryTarget = true;
			recoveryTargetExact = true;
		}
Bruce Momjian's avatar
Bruce Momjian committed
4121 4122
		else if (strcmp(tok1, "recovery_target_time") == 0)
		{
4123 4124 4125 4126 4127 4128 4129 4130
			/*
			 * if recovery_target_xid specified, then this overrides
			 * recovery_target_time
			 */
			if (recoveryTargetExact)
				continue;
			recoveryTarget = true;
			recoveryTargetExact = false;
Bruce Momjian's avatar
Bruce Momjian committed
4131

4132
			/*
4133 4134 4135
			 * Convert the time string given by the user to the time_t format.
			 * We use type abstime's input converter because we know abstime
			 * has the same representation as time_t.
4136
			 */
4137 4138
			recoveryTargetTime = (time_t)
				DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4139
													 CStringGetDatum(tok2)));
4140 4141
			ereport(LOG,
					(errmsg("recovery_target_time = %s",
4142 4143
							DatumGetCString(DirectFunctionCall1(abstimeout,
				AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4144
		}
Bruce Momjian's avatar
Bruce Momjian committed
4145 4146
		else if (strcmp(tok1, "recovery_target_inclusive") == 0)
		{
4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167
			/*
			 * does nothing if a recovery_target is not also set
			 */
			if (strcmp(tok2, "true") == 0)
				recoveryTargetInclusive = true;
			else
			{
				recoveryTargetInclusive = false;
				tok2 = "false";
			}
			ereport(LOG,
					(errmsg("recovery_target_inclusive = %s", tok2)));
		}
		else
			ereport(FATAL,
					(errmsg("unrecognized recovery parameter \"%s\"",
							tok1)));
	}

	FreeFile(fd);

Bruce Momjian's avatar
Bruce Momjian committed
4168 4169
	if (syntaxError)
		ereport(FATAL,
4170 4171
				(errmsg("syntax error in recovery command file: %s",
						cmdline),
4172
			  errhint("Lines should have the format parameter = 'value'.")));
4173 4174

	/* Check that required parameters were supplied */
4175
	if (recoveryRestoreCommand == NULL)
4176 4177
		ereport(FATAL,
				(errmsg("recovery command file \"%s\" did not specify restore_command",
4178
						RECOVERY_COMMAND_FILE)));
4179

4180 4181 4182
	/* Enable fetching from archive recovery area */
	InArchiveRecovery = true;

4183
	/*
4184 4185 4186 4187
	 * If user specified recovery_target_timeline, validate it or compute the
	 * "latest" value.	We can't do this until after we've gotten the restore
	 * command and set InArchiveRecovery, because we need to fetch timeline
	 * history files from the archive.
4188
	 */
4189 4190 4191 4192 4193 4194 4195
	if (rtliGiven)
	{
		if (rtli)
		{
			/* Timeline 1 does not have a history file, all else should */
			if (rtli != 1 && !existsTimeLineHistory(rtli))
				ereport(FATAL,
4196 4197
						(errmsg("recovery_target_timeline %u does not exist",
								rtli)));
4198 4199 4200 4201 4202 4203 4204 4205
			recoveryTargetTLI = rtli;
		}
		else
		{
			/* We start the "latest" search from pg_control's timeline */
			recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
		}
	}
4206 4207 4208 4209 4210 4211
}

/*
 * Exit archive-recovery state
 */
static void
4212
exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4213
{
Bruce Momjian's avatar
Bruce Momjian committed
4214 4215
	char		recoveryPath[MAXPGPATH];
	char		xlogpath[MAXPGPATH];
4216 4217

	/*
4218
	 * We are no longer in archive recovery state.
4219 4220 4221 4222
	 */
	InArchiveRecovery = false;

	/*
4223 4224 4225
	 * We should have the ending log segment currently open.  Verify, and then
	 * close it (to avoid problems on Windows with trying to rename or delete
	 * an open file).
4226 4227 4228 4229 4230 4231 4232 4233 4234
	 */
	Assert(readFile >= 0);
	Assert(readId == endLogId);
	Assert(readSeg == endLogSeg);

	close(readFile);
	readFile = -1;

	/*
4235 4236 4237 4238 4239 4240 4241
	 * If the segment was fetched from archival storage, we want to replace
	 * the existing xlog segment (if any) with the archival version.  This is
	 * because whatever is in XLOGDIR is very possibly older than what we have
	 * from the archives, since it could have come from restoring a PGDATA
	 * backup.	In any case, the archival version certainly is more
	 * descriptive of what our current database state is, because that is what
	 * we replayed from.
4242
	 *
4243 4244 4245
	 * Note that if we are establishing a new timeline, ThisTimeLineID is
	 * already set to the new value, and so we will create a new file instead
	 * of overwriting any existing file.
4246
	 */
4247
	snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4248
	XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4249 4250 4251 4252 4253 4254 4255 4256 4257 4258

	if (restoredFromArchive)
	{
		ereport(DEBUG3,
				(errmsg_internal("moving last restored xlog to \"%s\"",
								 xlogpath)));
		unlink(xlogpath);		/* might or might not exist */
		if (rename(recoveryPath, xlogpath) != 0)
			ereport(FATAL,
					(errcode_for_file_access(),
4259
					 errmsg("could not rename file \"%s\" to \"%s\": %m",
4260 4261 4262 4263 4264 4265 4266 4267 4268 4269
							recoveryPath, xlogpath)));
		/* XXX might we need to fix permissions on the file? */
	}
	else
	{
		/*
		 * If the latest segment is not archival, but there's still a
		 * RECOVERYXLOG laying about, get rid of it.
		 */
		unlink(recoveryPath);	/* ignore any error */
Bruce Momjian's avatar
Bruce Momjian committed
4270

4271
		/*
4272 4273 4274
		 * If we are establishing a new timeline, we have to copy data from
		 * the last WAL segment of the old timeline to create a starting WAL
		 * segment for the new timeline.
4275 4276 4277 4278
		 */
		if (endTLI != ThisTimeLineID)
			XLogFileCopy(endLogId, endLogSeg,
						 endTLI, endLogId, endLogSeg);
4279 4280 4281
	}

	/*
4282 4283
	 * Let's just make real sure there are not .ready or .done flags posted
	 * for the new segment.
4284
	 */
4285 4286
	XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
	XLogArchiveCleanup(xlogpath);
4287

4288
	/* Get rid of any remaining recovered timeline-history file, too */
4289
	snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
Bruce Momjian's avatar
Bruce Momjian committed
4290
	unlink(recoveryPath);		/* ignore any error */
4291 4292

	/*
4293 4294
	 * Rename the config file out of the way, so that we don't accidentally
	 * re-enter archive recovery mode in a subsequent crash.
4295
	 */
4296 4297
	unlink(RECOVERY_COMMAND_DONE);
	if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4298 4299
		ereport(FATAL,
				(errcode_for_file_access(),
4300
				 errmsg("could not rename file \"%s\" to \"%s\": %m",
4301
						RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312

	ereport(LOG,
			(errmsg("archive recovery complete")));
}

/*
 * For point-in-time recovery, this function decides whether we want to
 * stop applying the XLOG at or after the current record.
 *
 * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
 * *includeThis is set TRUE if we should apply this record before stopping.
4313 4314
 * Also, some information is saved in recoveryStopXid et al for use in
 * annotating the new timeline's history file.
4315 4316 4317 4318 4319
 */
static bool
recoveryStopsHere(XLogRecord *record, bool *includeThis)
{
	bool		stopsHere;
Bruce Momjian's avatar
Bruce Momjian committed
4320 4321
	uint8		record_info;
	time_t		recordXtime;
4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332

	/* Do we have a PITR target at all? */
	if (!recoveryTarget)
		return false;

	/* We only consider stopping at COMMIT or ABORT records */
	if (record->xl_rmid != RM_XACT_ID)
		return false;
	record_info = record->xl_info & ~XLR_INFO_MASK;
	if (record_info == XLOG_XACT_COMMIT)
	{
Bruce Momjian's avatar
Bruce Momjian committed
4333
		xl_xact_commit *recordXactCommitData;
4334 4335 4336 4337 4338 4339

		recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
		recordXtime = recordXactCommitData->xtime;
	}
	else if (record_info == XLOG_XACT_ABORT)
	{
Bruce Momjian's avatar
Bruce Momjian committed
4340
		xl_xact_abort *recordXactAbortData;
4341 4342 4343 4344 4345 4346 4347 4348 4349 4350

		recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
		recordXtime = recordXactAbortData->xtime;
	}
	else
		return false;

	if (recoveryTargetExact)
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
4351 4352
		 * there can be only one transaction end record with this exact
		 * transactionid
4353
		 *
Bruce Momjian's avatar
Bruce Momjian committed
4354
		 * when testing for an xid, we MUST test for equality only, since
4355 4356 4357
		 * transactions are numbered in the order they start, not the order
		 * they complete. A higher numbered xid will complete before you about
		 * 50% of the time...
4358 4359 4360 4361 4362 4363 4364 4365
		 */
		stopsHere = (record->xl_xid == recoveryTargetXid);
		if (stopsHere)
			*includeThis = recoveryTargetInclusive;
	}
	else
	{
		/*
4366 4367 4368
		 * there can be many transactions that share the same commit time, so
		 * we stop after the last one, if we are inclusive, or stop at the
		 * first one if we are exclusive
4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379
		 */
		if (recoveryTargetInclusive)
			stopsHere = (recordXtime > recoveryTargetTime);
		else
			stopsHere = (recordXtime >= recoveryTargetTime);
		if (stopsHere)
			*includeThis = false;
	}

	if (stopsHere)
	{
4380 4381 4382 4383
		recoveryStopXid = record->xl_xid;
		recoveryStopTime = recordXtime;
		recoveryStopAfter = *includeThis;

4384 4385
		if (record_info == XLOG_XACT_COMMIT)
		{
4386
			if (recoveryStopAfter)
4387 4388
				ereport(LOG,
						(errmsg("recovery stopping after commit of transaction %u, time %s",
4389
							  recoveryStopXid, str_time(recoveryStopTime))));
4390 4391 4392
			else
				ereport(LOG,
						(errmsg("recovery stopping before commit of transaction %u, time %s",
4393
							  recoveryStopXid, str_time(recoveryStopTime))));
4394 4395 4396
		}
		else
		{
4397
			if (recoveryStopAfter)
4398 4399
				ereport(LOG,
						(errmsg("recovery stopping after abort of transaction %u, time %s",
4400
							  recoveryStopXid, str_time(recoveryStopTime))));
4401 4402 4403
			else
				ereport(LOG,
						(errmsg("recovery stopping before abort of transaction %u, time %s",
4404
							  recoveryStopXid, str_time(recoveryStopTime))));
4405 4406 4407 4408 4409 4410
		}
	}

	return stopsHere;
}

4411
/*
Tom Lane's avatar
Tom Lane committed
4412
 * This must be called ONCE during postmaster or standalone-backend startup
4413 4414
 */
void
Tom Lane's avatar
Tom Lane committed
4415
StartupXLOG(void)
4416
{
4417 4418
	XLogCtlInsert *Insert;
	CheckPoint	checkPoint;
Tom Lane's avatar
Tom Lane committed
4419
	bool		wasShutdown;
4420
	bool		needNewTimeLine = false;
4421
	XLogRecPtr	RecPtr,
Tom Lane's avatar
Tom Lane committed
4422 4423 4424
				LastRec,
				checkPointLoc,
				EndOfLog;
4425 4426
	uint32		endLogId;
	uint32		endLogSeg;
4427
	XLogRecord *record;
4428
	uint32		freespace;
4429
	TransactionId oldestActiveXID;
4430

Tom Lane's avatar
Tom Lane committed
4431
	CritSectionCount++;
4432 4433

	/*
4434 4435
	 * Read control file and check XLOG status looks valid.
	 *
4436 4437
	 * Note: in most control paths, *ControlFile is already valid and we need
	 * not do ReadControlFile() here, but might as well do it to be sure.
4438
	 */
4439
	ReadControlFile();
4440

4441 4442 4443
	if (ControlFile->logSeg == 0 ||
		ControlFile->state < DB_SHUTDOWNED ||
		ControlFile->state > DB_IN_PRODUCTION ||
4444
		!XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4445 4446
		ereport(FATAL,
				(errmsg("control file contains invalid data")));
4447 4448

	if (ControlFile->state == DB_SHUTDOWNED)
4449 4450 4451
		ereport(LOG,
				(errmsg("database system was shut down at %s",
						str_time(ControlFile->time))));
4452
	else if (ControlFile->state == DB_SHUTDOWNING)
4453 4454 4455
		ereport(LOG,
				(errmsg("database system shutdown was interrupted at %s",
						str_time(ControlFile->time))));
4456
	else if (ControlFile->state == DB_IN_RECOVERY)
4457
		ereport(LOG,
4458 4459 4460 4461
		   (errmsg("database system was interrupted while in recovery at %s",
				   str_time(ControlFile->time)),
			errhint("This probably means that some data is corrupted and"
					" you will have to use the last backup for recovery.")));
4462
	else if (ControlFile->state == DB_IN_PRODUCTION)
4463 4464 4465
		ereport(LOG,
				(errmsg("database system was interrupted at %s",
						str_time(ControlFile->time))));
4466

4467 4468
	/* This is just to allow attaching to startup process with a debugger */
#ifdef XLOG_REPLAY_DELAY
4469
	if (ControlFile->state != DB_SHUTDOWNED)
4470
		pg_usleep(60000000L);
4471 4472
#endif

4473
	/*
4474 4475
	 * Initialize on the assumption we want to recover to the same timeline
	 * that's active according to pg_control.
4476 4477 4478
	 */
	recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;

4479
	/*
Bruce Momjian's avatar
Bruce Momjian committed
4480 4481
	 * Check for recovery control file, and if so set up state for offline
	 * recovery
4482 4483 4484
	 */
	readRecoveryCommandFile();

4485 4486 4487
	/* Now we can determine the list of expected TLIs */
	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);

4488 4489 4490 4491 4492 4493
	/*
	 * If pg_control's timeline is not in expectedTLIs, then we cannot
	 * proceed: the backup is not part of the history of the requested
	 * timeline.
	 */
	if (!list_member_int(expectedTLIs,
4494
						 (int) ControlFile->checkPointCopy.ThisTimeLineID))
4495 4496 4497 4498 4499
		ereport(FATAL,
				(errmsg("requested timeline %u is not a child of database system timeline %u",
						recoveryTargetTLI,
						ControlFile->checkPointCopy.ThisTimeLineID)));

4500
	if (read_backup_label(&checkPointLoc))
Tom Lane's avatar
Tom Lane committed
4501
	{
4502
		/*
4503 4504
		 * When a backup_label file is present, we want to roll forward from
		 * the checkpoint it identifies, rather than using pg_control.
4505
		 */
4506
		record = ReadCheckpointRecord(checkPointLoc, 0);
4507 4508 4509 4510
		if (record != NULL)
		{
			ereport(LOG,
					(errmsg("checkpoint record is at %X/%X",
4511
							checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4512 4513 4514 4515 4516
			InRecovery = true;	/* force recovery even if SHUTDOWNED */
		}
		else
		{
			ereport(PANIC,
4517 4518
					(errmsg("could not locate required checkpoint record"),
					 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4519
		}
Tom Lane's avatar
Tom Lane committed
4520 4521 4522
	}
	else
	{
4523
		/*
4524 4525
		 * Get the last valid checkpoint record.  If the latest one according
		 * to pg_control is broken, try the next-to-last one.
4526 4527
		 */
		checkPointLoc = ControlFile->checkPoint;
4528
		record = ReadCheckpointRecord(checkPointLoc, 1);
Tom Lane's avatar
Tom Lane committed
4529 4530
		if (record != NULL)
		{
4531
			ereport(LOG,
4532
					(errmsg("checkpoint record is at %X/%X",
4533
							checkPointLoc.xlogid, checkPointLoc.xrecoff)));
Tom Lane's avatar
Tom Lane committed
4534 4535
		}
		else
4536 4537
		{
			checkPointLoc = ControlFile->prevCheckPoint;
4538
			record = ReadCheckpointRecord(checkPointLoc, 2);
4539 4540 4541
			if (record != NULL)
			{
				ereport(LOG,
4542 4543 4544
						(errmsg("using previous checkpoint record at %X/%X",
							  checkPointLoc.xlogid, checkPointLoc.xrecoff)));
				InRecovery = true;		/* force recovery even if SHUTDOWNED */
4545 4546 4547
			}
			else
				ereport(PANIC,
4548
					 (errmsg("could not locate a valid checkpoint record")));
4549
		}
Tom Lane's avatar
Tom Lane committed
4550
	}
4551

Tom Lane's avatar
Tom Lane committed
4552 4553 4554
	LastRec = RecPtr = checkPointLoc;
	memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
	wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4555

4556
	ereport(LOG,
4557 4558 4559 4560
	 (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
			 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
			 checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
			 wasShutdown ? "TRUE" : "FALSE")));
4561
	ereport(LOG,
4562 4563 4564 4565 4566
			(errmsg("next transaction ID: %u; next OID: %u",
					checkPoint.nextXid, checkPoint.nextOid)));
	ereport(LOG,
			(errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
					checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4567
	if (!TransactionIdIsNormal(checkPoint.nextXid))
4568
		ereport(PANIC,
4569
				(errmsg("invalid next transaction ID")));
4570 4571 4572

	ShmemVariableCache->nextXid = checkPoint.nextXid;
	ShmemVariableCache->nextOid = checkPoint.nextOid;
4573
	ShmemVariableCache->oidCount = 0;
4574
	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4575

4576
	/*
4577 4578 4579
	 * We must replay WAL entries using the same TimeLineID they were created
	 * under, so temporarily adopt the TLI indicated by the checkpoint (see
	 * also xlog_redo()).
4580
	 */
4581
	ThisTimeLineID = checkPoint.ThisTimeLineID;
4582

4583
	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4584

4585
	if (XLByteLT(RecPtr, checkPoint.redo))
4586 4587
		ereport(PANIC,
				(errmsg("invalid redo in checkpoint record")));
4588 4589 4590
	if (checkPoint.undo.xrecoff == 0)
		checkPoint.undo = RecPtr;

4591
	/*
Bruce Momjian's avatar
Bruce Momjian committed
4592
	 * Check whether we need to force recovery from WAL.  If it appears to
4593 4594
	 * have been a clean shutdown and we did not have a recovery.conf file,
	 * then assume no recovery needed.
4595
	 */
4596
	if (XLByteLT(checkPoint.undo, RecPtr) ||
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
4597
		XLByteLT(checkPoint.redo, RecPtr))
4598
	{
Tom Lane's avatar
Tom Lane committed
4599
		if (wasShutdown)
4600
			ereport(PANIC,
4601
				(errmsg("invalid redo/undo record in shutdown checkpoint")));
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4602
		InRecovery = true;
4603 4604
	}
	else if (ControlFile->state != DB_SHUTDOWNED)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4605
		InRecovery = true;
4606 4607 4608 4609 4610
	else if (InArchiveRecovery)
	{
		/* force recovery due to presence of recovery.conf */
		InRecovery = true;
	}
4611

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4612
	/* REDO */
4613
	if (InRecovery)
4614
	{
Bruce Momjian's avatar
Bruce Momjian committed
4615
		int			rmid;
4616

4617
		if (InArchiveRecovery)
4618
			ereport(LOG,
4619
					(errmsg("automatic recovery in progress")));
4620 4621
		else
			ereport(LOG,
4622 4623
					(errmsg("database system was not properly shut down; "
							"automatic recovery in progress")));
4624 4625 4626 4627
		ControlFile->state = DB_IN_RECOVERY;
		ControlFile->time = time(NULL);
		UpdateControlFile();

4628
		/* Start up the recovery environment */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4629
		XLogInitRelationCache();
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
4630

4631 4632 4633 4634 4635 4636
		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
		{
			if (RmgrTable[rmid].rm_startup != NULL)
				RmgrTable[rmid].rm_startup();
		}

4637
		/*
4638 4639
		 * Find the first record that logically follows the checkpoint --- it
		 * might physically precede it, though.
4640
		 */
4641
		if (XLByteLT(checkPoint.redo, RecPtr))
4642 4643
		{
			/* back up to find the record */
4644
			record = ReadRecord(&(checkPoint.redo), PANIC);
4645
		}
4646
		else
4647
		{
4648
			/* just have to read next record after CheckPoint */
4649
			record = ReadRecord(NULL, LOG);
4650
		}
4651

Tom Lane's avatar
Tom Lane committed
4652
		if (record != NULL)
4653
		{
4654 4655
			bool		recoveryContinue = true;
			bool		recoveryApply = true;
4656
			ErrorContextCallback	errcontext;
4657

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4658
			InRedo = true;
4659 4660 4661
			ereport(LOG,
					(errmsg("redo starts at %X/%X",
							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4662 4663 4664 4665

			/*
			 * main redo apply loop
			 */
4666 4667
			do
			{
4668
#ifdef WAL_DEBUG
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4669 4670
				if (XLOG_DEBUG)
				{
4671
					StringInfoData	buf;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4672

4673 4674
					initStringInfo(&buf);
					appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
4675 4676
							ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
							EndRecPtr.xlogid, EndRecPtr.xrecoff);
4677 4678 4679 4680 4681 4682 4683
					xlog_outrec(&buf, record);
					appendStringInfo(&buf, " - ");
					RmgrTable[record->xl_rmid].rm_desc(&buf,
													   record->xl_info,
													   XLogRecGetData(record));
					elog(LOG, "%s", buf.data);
					pfree(buf.data);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4684
				}
4685
#endif
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4686

4687 4688 4689 4690 4691
				/*
				 * Have we reached our recovery target?
				 */
				if (recoveryStopsHere(record, &recoveryApply))
				{
Bruce Momjian's avatar
Bruce Momjian committed
4692
					needNewTimeLine = true;		/* see below */
4693 4694 4695 4696 4697
					recoveryContinue = false;
					if (!recoveryApply)
						break;
				}

4698 4699 4700 4701 4702 4703
				/* Setup error traceback support for ereport() */
				errcontext.callback = rm_redo_error_callback;
				errcontext.arg = (void *) record;
				errcontext.previous = error_context_stack;
				error_context_stack = &errcontext;

4704 4705
				/* nextXid must be beyond record's xid */
				if (TransactionIdFollowsOrEquals(record->xl_xid,
4706
												 ShmemVariableCache->nextXid))
4707 4708 4709 4710 4711
				{
					ShmemVariableCache->nextXid = record->xl_xid;
					TransactionIdAdvance(ShmemVariableCache->nextXid);
				}

Tom Lane's avatar
Tom Lane committed
4712
				if (record->xl_info & XLR_BKP_BLOCK_MASK)
4713 4714
					RestoreBkpBlocks(record, EndRecPtr);

4715
				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4716

4717 4718 4719
				/* Pop the error context stack */
				error_context_stack = errcontext.previous;

4720 4721
				LastRec = ReadRecPtr;

4722
				record = ReadRecord(NULL, LOG);
4723
			} while (record != NULL && recoveryContinue);
Bruce Momjian's avatar
Bruce Momjian committed
4724

4725 4726 4727 4728
			/*
			 * end of main redo apply loop
			 */

4729 4730 4731
			ereport(LOG,
					(errmsg("redo done at %X/%X",
							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4732
			InRedo = false;
4733 4734
		}
		else
4735 4736
		{
			/* there are no WAL records following the checkpoint */
4737 4738
			ereport(LOG,
					(errmsg("redo is not required")));
4739
		}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4740 4741
	}

Tom Lane's avatar
Tom Lane committed
4742
	/*
4743 4744
	 * Re-fetch the last valid or last applied record, so we can identify the
	 * exact endpoint of what we consider the valid portion of WAL.
Tom Lane's avatar
Tom Lane committed
4745
	 */
4746
	record = ReadRecord(&LastRec, PANIC);
Tom Lane's avatar
Tom Lane committed
4747
	EndOfLog = EndRecPtr;
4748 4749
	XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);

4750 4751 4752 4753 4754 4755 4756 4757 4758
	/*
	 * Complain if we did not roll forward far enough to render the backup
	 * dump consistent.
	 */
	if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
	{
		if (needNewTimeLine)	/* stopped because of stop request */
			ereport(FATAL,
					(errmsg("requested recovery stop point is before end time of backup dump")));
Bruce Momjian's avatar
Bruce Momjian committed
4759
		else
4760
			/* ran off end of WAL */
4761 4762 4763 4764
			ereport(FATAL,
					(errmsg("WAL ends before end time of backup dump")));
	}

4765 4766 4767
	/*
	 * Consider whether we need to assign a new timeline ID.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
4768 4769
	 * If we stopped short of the end of WAL during recovery, then we are
	 * generating a new timeline and must assign it a unique new ID.
4770 4771
	 * Otherwise, we can just extend the timeline we were in when we ran out
	 * of WAL.
4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784
	 */
	if (needNewTimeLine)
	{
		ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
		ereport(LOG,
				(errmsg("selected new timeline ID: %u", ThisTimeLineID)));
		writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
							 curFileTLI, endLogId, endLogSeg);
	}

	/* Save the selected TimeLineID in shared memory, too */
	XLogCtl->ThisTimeLineID = ThisTimeLineID;

4785
	/*
4786 4787 4788 4789
	 * We are now done reading the old WAL.  Turn off archive fetching if it
	 * was active, and make a writable copy of the last WAL segment. (Note
	 * that we also have a copy of the last block of the old WAL in readBuf;
	 * we will use that below.)
4790 4791
	 */
	if (InArchiveRecovery)
4792
		exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
4793 4794 4795 4796 4797 4798 4799 4800

	/*
	 * Prepare to write WAL starting at EndOfLog position, and init xlog
	 * buffer cache using the block containing the last record from the
	 * previous incarnation.
	 */
	openLogId = endLogId;
	openLogSeg = endLogSeg;
4801
	openLogFile = XLogFileOpen(openLogId, openLogSeg);
Tom Lane's avatar
Tom Lane committed
4802 4803 4804
	openLogOff = 0;
	ControlFile->logId = openLogId;
	ControlFile->logSeg = openLogSeg + 1;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4805
	Insert = &XLogCtl->Insert;
4806
	Insert->PrevRecord = LastRec;
4807 4808
	XLogCtl->xlblocks[0].xlogid = openLogId;
	XLogCtl->xlblocks[0].xrecoff =
4809
		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
4810 4811

	/*
4812 4813 4814
	 * Tricky point here: readBuf contains the *last* block that the LastRec
	 * record spans, not the one it starts in.	The last block is indeed the
	 * one we want to use.
Tom Lane's avatar
Tom Lane committed
4815
	 */
4816 4817
	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
4818
	Insert->currpos = (char *) Insert->currpage +
4819
		(EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4820

Tom Lane's avatar
Tom Lane committed
4821
	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4822

Tom Lane's avatar
Tom Lane committed
4823 4824 4825
	XLogCtl->Write.LogwrtResult = LogwrtResult;
	Insert->LogwrtResult = LogwrtResult;
	XLogCtl->LogwrtResult = LogwrtResult;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4826

Tom Lane's avatar
Tom Lane committed
4827 4828
	XLogCtl->LogwrtRqst.Write = EndOfLog;
	XLogCtl->LogwrtRqst.Flush = EndOfLog;
4829

4830 4831 4832 4833 4834 4835 4836 4837 4838 4839
	freespace = INSERT_FREESPACE(Insert);
	if (freespace > 0)
	{
		/* Make sure rest of page is zero */
		MemSet(Insert->currpos, 0, freespace);
		XLogCtl->Write.curridx = 0;
	}
	else
	{
		/*
4840 4841
		 * Whenever Write.LogwrtResult points to exactly the end of a page,
		 * Write.curridx must point to the *next* page (see XLogWrite()).
4842
		 *
Bruce Momjian's avatar
Bruce Momjian committed
4843
		 * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
Bruce Momjian's avatar
Bruce Momjian committed
4844
		 * this is sufficient.	The first actual attempt to insert a log
4845
		 * record will advance the insert state.
4846 4847 4848 4849
		 */
		XLogCtl->Write.curridx = NextBufIdx(0);
	}

4850 4851
	/* Pre-scan prepared transactions to find out the range of XIDs present */
	oldestActiveXID = PrescanPreparedTransactions();
4852

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4853
	if (InRecovery)
4854
	{
Bruce Momjian's avatar
Bruce Momjian committed
4855
		int			rmid;
4856 4857 4858 4859 4860 4861 4862 4863 4864 4865

		/*
		 * Allow resource managers to do any required cleanup.
		 */
		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
		{
			if (RmgrTable[rmid].rm_cleanup != NULL)
				RmgrTable[rmid].rm_cleanup();
		}

4866 4867 4868 4869 4870 4871
		/*
		 * Check to see if the XLOG sequence contained any unresolved
		 * references to uninitialized pages.
		 */
		XLogCheckInvalidPages();

4872 4873 4874 4875 4876
		/*
		 * Reset pgstat data, because it may be invalid after recovery.
		 */
		pgstat_reset_all();

Tom Lane's avatar
Tom Lane committed
4877
		/*
4878
		 * Perform a new checkpoint to update our recovery activity to disk.
4879
		 *
4880 4881 4882 4883 4884
		 * Note that we write a shutdown checkpoint rather than an on-line
		 * one. This is not particularly critical, but since we may be
		 * assigning a new TLI, using a shutdown checkpoint allows us to have
		 * the rule that TLI only changes in shutdown checkpoints, which
		 * allows some extra error checking in xlog_redo.
4885
		 *
4886 4887
		 * In case we had to use the secondary checkpoint, make sure that it
		 * will still be shown as the secondary checkpoint after this
Tom Lane's avatar
Tom Lane committed
4888 4889 4890
		 * CreateCheckPoint operation; we don't want the broken primary
		 * checkpoint to become prevCheckPoint...
		 */
4891 4892 4893
		if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
			ControlFile->checkPoint = checkPointLoc;

4894
		CreateCheckPoint(true, true);
4895 4896 4897 4898

		/*
		 * Close down recovery environment
		 */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4899
		XLogCloseRelationCache();
4900 4901

		/*
4902 4903
		 * Now that we've checkpointed the recovery, it's safe to flush old
		 * backup_label, if present.
4904 4905
		 */
		remove_backup_label();
4906
	}
4907

Tom Lane's avatar
Tom Lane committed
4908 4909 4910
	/*
	 * Preallocate additional log files, if wanted.
	 */
4911
	(void) PreallocXlogFiles(EndOfLog);
4912

4913 4914 4915
	/*
	 * Okay, we're officially UP.
	 */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
4916
	InRecovery = false;
4917 4918 4919 4920 4921

	ControlFile->state = DB_IN_PRODUCTION;
	ControlFile->time = time(NULL);
	UpdateControlFile();

4922
	/* Start up the commit log and related stuff, too */
4923
	StartupCLOG();
4924
	StartupSUBTRANS(oldestActiveXID);
4925
	StartupMultiXact();
4926

4927 4928 4929
	/* Reload shared-memory state for prepared transactions */
	RecoverPreparedTransactions();

4930 4931
	ereport(LOG,
			(errmsg("database system is ready")));
4932
	CritSectionCount--;
4933

Tom Lane's avatar
Tom Lane committed
4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944
	/* Shut down readFile facility, free space */
	if (readFile >= 0)
	{
		close(readFile);
		readFile = -1;
	}
	if (readBuf)
	{
		free(readBuf);
		readBuf = NULL;
	}
4945 4946 4947 4948 4949 4950
	if (readRecordBuf)
	{
		free(readRecordBuf);
		readRecordBuf = NULL;
		readRecordBufSize = 0;
	}
Tom Lane's avatar
Tom Lane committed
4951 4952
}

4953 4954
/*
 * Subroutine to try to fetch and validate a prior checkpoint record.
4955 4956 4957
 *
 * whichChkpt identifies the checkpoint (merely for reporting purposes).
 * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
4958
 */
Tom Lane's avatar
Tom Lane committed
4959
static XLogRecord *
4960
ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
Tom Lane's avatar
Tom Lane committed
4961 4962 4963 4964 4965
{
	XLogRecord *record;

	if (!XRecOffIsValid(RecPtr.xrecoff))
	{
4966 4967 4968 4969
		switch (whichChkpt)
		{
			case 1:
				ereport(LOG,
4970
				(errmsg("invalid primary checkpoint link in control file")));
4971 4972 4973 4974 4975 4976 4977
				break;
			case 2:
				ereport(LOG,
						(errmsg("invalid secondary checkpoint link in control file")));
				break;
			default:
				ereport(LOG,
4978
				   (errmsg("invalid checkpoint link in backup_label file")));
4979 4980
				break;
		}
Tom Lane's avatar
Tom Lane committed
4981 4982 4983
		return NULL;
	}

4984
	record = ReadRecord(&RecPtr, LOG);
Tom Lane's avatar
Tom Lane committed
4985 4986 4987

	if (record == NULL)
	{
4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002
		switch (whichChkpt)
		{
			case 1:
				ereport(LOG,
						(errmsg("invalid primary checkpoint record")));
				break;
			case 2:
				ereport(LOG,
						(errmsg("invalid secondary checkpoint record")));
				break;
			default:
				ereport(LOG,
						(errmsg("invalid checkpoint record")));
				break;
		}
Tom Lane's avatar
Tom Lane committed
5003 5004 5005 5006
		return NULL;
	}
	if (record->xl_rmid != RM_XLOG_ID)
	{
5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018
		switch (whichChkpt)
		{
			case 1:
				ereport(LOG,
						(errmsg("invalid resource manager ID in primary checkpoint record")));
				break;
			case 2:
				ereport(LOG,
						(errmsg("invalid resource manager ID in secondary checkpoint record")));
				break;
			default:
				ereport(LOG,
5019
				(errmsg("invalid resource manager ID in checkpoint record")));
5020 5021
				break;
		}
Tom Lane's avatar
Tom Lane committed
5022 5023 5024 5025 5026
		return NULL;
	}
	if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
		record->xl_info != XLOG_CHECKPOINT_ONLINE)
	{
5027 5028 5029 5030
		switch (whichChkpt)
		{
			case 1:
				ereport(LOG,
5031
				   (errmsg("invalid xl_info in primary checkpoint record")));
5032 5033 5034
				break;
			case 2:
				ereport(LOG,
5035
				 (errmsg("invalid xl_info in secondary checkpoint record")));
5036 5037 5038 5039 5040 5041
				break;
			default:
				ereport(LOG,
						(errmsg("invalid xl_info in checkpoint record")));
				break;
		}
Tom Lane's avatar
Tom Lane committed
5042 5043
		return NULL;
	}
5044 5045
	if (record->xl_len != sizeof(CheckPoint) ||
		record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
Tom Lane's avatar
Tom Lane committed
5046
	{
5047 5048 5049 5050
		switch (whichChkpt)
		{
			case 1:
				ereport(LOG,
5051
					(errmsg("invalid length of primary checkpoint record")));
5052 5053 5054
				break;
			case 2:
				ereport(LOG,
5055
				  (errmsg("invalid length of secondary checkpoint record")));
5056 5057 5058 5059 5060 5061
				break;
			default:
				ereport(LOG,
						(errmsg("invalid length of checkpoint record")));
				break;
		}
Tom Lane's avatar
Tom Lane committed
5062 5063 5064
		return NULL;
	}
	return record;
5065 5066
}

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5067
/*
5068 5069
 * This must be called during startup of a backend process, except that
 * it need not be called in a standalone backend (which does StartupXLOG
5070
 * instead).  We need to initialize the local copies of ThisTimeLineID and
5071 5072
 * RedoRecPtr.
 *
5073
 * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5074
 * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5075
 * unnecessary however, since the postmaster itself never touches XLOG anyway.
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5076 5077
 */
void
5078
InitXLOGAccess(void)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5079
{
5080 5081
	/* ThisTimeLineID doesn't change so we need no lock to copy it */
	ThisTimeLineID = XLogCtl->ThisTimeLineID;
5082 5083
	/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
	(void) GetRedoRecPtr();
5084 5085 5086 5087 5088 5089 5090 5091
}

/*
 * Once spawned, a backend may update its local RedoRecPtr from
 * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
 * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
 */
XLogRecPtr
5092 5093
GetRedoRecPtr(void)
{
5094 5095 5096
	/* use volatile pointer to prevent code rearrangement */
	volatile XLogCtlData *xlogctl = XLogCtl;

5097
	SpinLockAcquire(&xlogctl->info_lck);
5098 5099
	Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
	RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5100
	SpinLockRelease(&xlogctl->info_lck);
5101 5102

	return RedoRecPtr;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5103 5104
}

5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132 5133 5134
/*
 * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
 *
 * This is currently used only by the autovacuum daemon.  To check for
 * impending XID wraparound, autovac needs an approximate idea of the current
 * XID counter, and it needs it before choosing which DB to attach to, hence
 * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
 * has attached to shared memory, and so we can let it reach into the shared
 * ControlFile structure and pull out the last checkpoint nextXID.
 *
 * Since we don't take any sort of lock, we have to assume that reading a
 * TransactionId is atomic ... but that assumption is made elsewhere, too,
 * and in any case the worst possible consequence of a bogus result is that
 * autovac issues an unnecessary database-wide VACUUM.
 *
 * Note: we could also choose to read ShmemVariableCache->nextXid in an
 * unlocked fashion, thus getting a more up-to-date result; but since that
 * changes far more frequently than the controlfile checkpoint copy, it would
 * pose a far higher risk of bogus result if we did have a nonatomic-read
 * problem.
 *
 * A (theoretically) completely safe answer is to read the actual pg_control
 * file into local process memory, but that certainly seems like overkill.
 */
TransactionId
GetRecentNextXid(void)
{
	return ControlFile->checkPointCopy.nextXid;
}

5135
/*
Tom Lane's avatar
Tom Lane committed
5136
 * This must be called ONCE during postmaster or standalone-backend shutdown
5137 5138
 */
void
5139
ShutdownXLOG(int code, Datum arg)
5140
{
5141 5142
	ereport(LOG,
			(errmsg("shutting down")));
5143

5144
	CritSectionCount++;
5145
	CreateCheckPoint(true, true);
5146
	ShutdownCLOG();
5147
	ShutdownSUBTRANS();
5148
	ShutdownMultiXact();
5149
	CritSectionCount--;
5150

5151 5152
	ereport(LOG,
			(errmsg("database system is shut down")));
5153 5154
}

Tom Lane's avatar
Tom Lane committed
5155 5156
/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
5157 5158 5159
 *
 * If force is true, we force a checkpoint regardless of whether any XLOG
 * activity has occurred since the last one.
Tom Lane's avatar
Tom Lane committed
5160
 */
5161
void
5162
CreateCheckPoint(bool shutdown, bool force)
5163
{
5164 5165 5166
	CheckPoint	checkPoint;
	XLogRecPtr	recptr;
	XLogCtlInsert *Insert = &XLogCtl->Insert;
5167
	XLogRecData rdata;
5168
	uint32		freespace;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
5169 5170
	uint32		_logId;
	uint32		_logSeg;
5171 5172 5173
	int			nsegsadded = 0;
	int			nsegsremoved = 0;
	int			nsegsrecycled = 0;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
5174

5175
	/*
5176 5177 5178 5179
	 * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
	 * (This is just pro forma, since in the present system structure there is
	 * only one process that is allowed to issue checkpoints at any given
	 * time.)
5180
	 */
5181
	LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5182

5183 5184 5185
	/*
	 * Use a critical section to force system panic if we have trouble.
	 */
5186 5187
	START_CRIT_SECTION();

5188 5189 5190 5191 5192 5193
	if (shutdown)
	{
		ControlFile->state = DB_SHUTDOWNING;
		ControlFile->time = time(NULL);
		UpdateControlFile();
	}
Tom Lane's avatar
Tom Lane committed
5194

5195
	MemSet(&checkPoint, 0, sizeof(checkPoint));
5196
	checkPoint.ThisTimeLineID = ThisTimeLineID;
Tom Lane's avatar
Tom Lane committed
5197
	checkPoint.time = time(NULL);
5198

5199
	/*
5200 5201 5202 5203
	 * We must hold CheckpointStartLock while determining the checkpoint REDO
	 * pointer.  This ensures that any concurrent transaction commits will be
	 * either not yet logged, or logged and recorded in pg_clog. See notes in
	 * RecordTransactionCommit().
5204 5205 5206 5207
	 */
	LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);

	/* And we need WALInsertLock too */
5208
	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
Tom Lane's avatar
Tom Lane committed
5209 5210

	/*
5211 5212 5213 5214 5215 5216 5217 5218
	 * If this isn't a shutdown or forced checkpoint, and we have not inserted
	 * any XLOG records since the start of the last checkpoint, skip the
	 * checkpoint.	The idea here is to avoid inserting duplicate checkpoints
	 * when the system is idle. That wastes log space, and more importantly it
	 * exposes us to possible loss of both current and previous checkpoint
	 * records if the machine crashes just as we're writing the update.
	 * (Perhaps it'd make even more sense to checkpoint only when the previous
	 * checkpoint record is in a different xlog page?)
Tom Lane's avatar
Tom Lane committed
5219
	 *
5220 5221 5222 5223
	 * We have to make two tests to determine that nothing has happened since
	 * the start of the last checkpoint: current insertion point must match
	 * the end of the last checkpoint record, and its redo pointer must point
	 * to itself.
Tom Lane's avatar
Tom Lane committed
5224
	 */
5225
	if (!shutdown && !force)
Tom Lane's avatar
Tom Lane committed
5226 5227 5228 5229 5230 5231 5232 5233 5234 5235 5236 5237
	{
		XLogRecPtr	curInsert;

		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
		if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
			curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
			ControlFile->checkPoint.xlogid ==
			ControlFile->checkPointCopy.redo.xlogid &&
			ControlFile->checkPoint.xrecoff ==
			ControlFile->checkPointCopy.redo.xrecoff)
		{
5238
			LWLockRelease(WALInsertLock);
5239
			LWLockRelease(CheckpointStartLock);
5240
			LWLockRelease(CheckpointLock);
Tom Lane's avatar
Tom Lane committed
5241 5242 5243 5244 5245 5246 5247 5248
			END_CRIT_SECTION();
			return;
		}
	}

	/*
	 * Compute new REDO record ptr = location of next XLOG record.
	 *
5249 5250 5251 5252
	 * NB: this is NOT necessarily where the checkpoint record itself will be,
	 * since other backends may insert more XLOG records while we're off doing
	 * the buffer flush work.  Those XLOG records are logically after the
	 * checkpoint, even though physically before it.  Got that?
Tom Lane's avatar
Tom Lane committed
5253 5254
	 */
	freespace = INSERT_FREESPACE(Insert);
5255 5256
	if (freespace < SizeOfXLogRecord)
	{
Tom Lane's avatar
Tom Lane committed
5257 5258
		(void) AdvanceXLInsertBuffer();
		/* OK to ignore update return flag, since we will do flush anyway */
5259
		freespace = INSERT_FREESPACE(Insert);
5260
	}
Tom Lane's avatar
Tom Lane committed
5261
	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5262

Tom Lane's avatar
Tom Lane committed
5263
	/*
5264 5265
	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
	 * must be done while holding the insert lock AND the info_lck.
5266
	 *
Bruce Momjian's avatar
Bruce Momjian committed
5267
	 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5268 5269 5270 5271 5272
	 * pointing past where it really needs to point.  This is okay; the only
	 * consequence is that XLogInsert might back up whole buffers that it
	 * didn't really need to.  We can't postpone advancing RedoRecPtr because
	 * XLogInserts that happen while we are dumping buffers must assume that
	 * their buffer changes are not included in the checkpoint.
Tom Lane's avatar
Tom Lane committed
5273
	 */
5274 5275 5276 5277
	{
		/* use volatile pointer to prevent code rearrangement */
		volatile XLogCtlData *xlogctl = XLogCtl;

5278
		SpinLockAcquire(&xlogctl->info_lck);
5279
		RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5280
		SpinLockRelease(&xlogctl->info_lck);
5281
	}
5282

Tom Lane's avatar
Tom Lane committed
5283
	/*
5284 5285
	 * Now we can release insert lock and checkpoint start lock, allowing
	 * other xacts to proceed even while we are flushing disk buffers.
Tom Lane's avatar
Tom Lane committed
5286
	 */
5287
	LWLockRelease(WALInsertLock);
5288

5289 5290
	LWLockRelease(CheckpointStartLock);

5291 5292 5293
	/*
	 * Get the other info we need for the checkpoint record.
	 */
5294
	LWLockAcquire(XidGenLock, LW_SHARED);
5295
	checkPoint.nextXid = ShmemVariableCache->nextXid;
5296
	LWLockRelease(XidGenLock);
Tom Lane's avatar
Tom Lane committed
5297

5298
	LWLockAcquire(OidGenLock, LW_SHARED);
5299
	checkPoint.nextOid = ShmemVariableCache->nextOid;
5300 5301
	if (!shutdown)
		checkPoint.nextOid += ShmemVariableCache->oidCount;
5302
	LWLockRelease(OidGenLock);
5303

5304 5305 5306
	MultiXactGetCheckptMulti(shutdown,
							 &checkPoint.nextMulti,
							 &checkPoint.nextMultiOffset);
5307

Tom Lane's avatar
Tom Lane committed
5308
	/*
5309 5310
	 * Having constructed the checkpoint record, ensure all shmem disk buffers
	 * and commit-log buffers are flushed to disk.
5311
	 *
5312 5313 5314 5315 5316
	 * This I/O could fail for various reasons.  If so, we will fail to
	 * complete the checkpoint, but there is no reason to force a system
	 * panic. Accordingly, exit critical section while doing it.  (If we are
	 * doing a shutdown checkpoint, we probably *should* panic --- but that
	 * will happen anyway because we'll still be inside the critical section
5317
	 * established by ShutdownXLOG.)
Tom Lane's avatar
Tom Lane committed
5318
	 */
5319 5320
	END_CRIT_SECTION();

5321
	if (!shutdown)
5322
		ereport(DEBUG2,
5323 5324
				(errmsg("checkpoint starting")));

5325
	CheckPointCLOG();
5326
	CheckPointSUBTRANS();
5327
	CheckPointMultiXact();
5328
	FlushBufferPool();
5329 5330
	/* We deliberately delay 2PC checkpointing as long as possible */
	CheckPointTwoPhase(checkPoint.redo);
5331

5332 5333
	START_CRIT_SECTION();

Tom Lane's avatar
Tom Lane committed
5334 5335 5336
	/*
	 * Now insert the checkpoint record into XLOG.
	 */
5337
	rdata.data = (char *) (&checkPoint);
5338
	rdata.len = sizeof(checkPoint);
5339
	rdata.buffer = InvalidBuffer;
5340 5341
	rdata.next = NULL;

Tom Lane's avatar
Tom Lane committed
5342 5343 5344 5345 5346 5347
	recptr = XLogInsert(RM_XLOG_ID,
						shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
						XLOG_CHECKPOINT_ONLINE,
						&rdata);

	XLogFlush(recptr);
5348

Tom Lane's avatar
Tom Lane committed
5349
	/*
5350 5351
	 * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
	 * = end of actual checkpoint record.
Tom Lane's avatar
Tom Lane committed
5352 5353
	 */
	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5354 5355
		ereport(PANIC,
				(errmsg("concurrent transaction log activity while database system is shutting down")));
5356

Tom Lane's avatar
Tom Lane committed
5357
	/*
5358 5359
	 * Select point at which we can truncate the log, which we base on the
	 * prior checkpoint's earliest info.
Tom Lane's avatar
Tom Lane committed
5360
	 */
5361
	XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5362

Tom Lane's avatar
Tom Lane committed
5363 5364 5365
	/*
	 * Update the control file.
	 */
5366
	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5367 5368
	if (shutdown)
		ControlFile->state = DB_SHUTDOWNED;
Tom Lane's avatar
Tom Lane committed
5369 5370 5371
	ControlFile->prevCheckPoint = ControlFile->checkPoint;
	ControlFile->checkPoint = ProcLastRecPtr;
	ControlFile->checkPointCopy = checkPoint;
5372 5373
	ControlFile->time = time(NULL);
	UpdateControlFile();
5374
	LWLockRelease(ControlFileLock);
5375

5376
	/*
5377 5378
	 * We are now done with critical updates; no need for system panic if we
	 * have trouble while fooling with offline log segments.
5379 5380 5381
	 */
	END_CRIT_SECTION();

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
5382
	/*
Tom Lane's avatar
Tom Lane committed
5383 5384
	 * Delete offline log files (those no longer needed even for previous
	 * checkpoint).
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
5385 5386 5387
	 */
	if (_logId || _logSeg)
	{
Tom Lane's avatar
Tom Lane committed
5388
		PrevLogSeg(_logId, _logSeg);
5389 5390
		MoveOfflineLogs(_logId, _logSeg, recptr,
						&nsegsremoved, &nsegsrecycled);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
5391 5392
	}

Tom Lane's avatar
Tom Lane committed
5393
	/*
5394 5395
	 * Make more log segments if needed.  (Do this after deleting offline log
	 * segments, to avoid having peak disk space usage higher than necessary.)
Tom Lane's avatar
Tom Lane committed
5396 5397
	 */
	if (!shutdown)
5398
		nsegsadded = PreallocXlogFiles(recptr);
Tom Lane's avatar
Tom Lane committed
5399

5400
	/*
5401 5402 5403 5404 5405
	 * Truncate pg_subtrans if possible.  We can throw away all data before
	 * the oldest XMIN of any running transaction.	No future transaction will
	 * attempt to reference any pg_subtrans entry older than that (see Asserts
	 * in subtrans.c).	During recovery, though, we mustn't do this because
	 * StartupSUBTRANS hasn't been called yet.
5406
	 */
5407 5408
	if (!InRecovery)
		TruncateSUBTRANS(GetOldestXmin(true));
5409

5410
	if (!shutdown)
5411
		ereport(DEBUG2,
5412 5413 5414
				(errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
						nsegsadded, nsegsremoved, nsegsrecycled)));

5415
	LWLockRelease(CheckpointLock);
5416
}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5417

Tom Lane's avatar
Tom Lane committed
5418 5419 5420
/*
 * Write a NEXTOID log record
 */
5421 5422 5423
void
XLogPutNextOid(Oid nextOid)
{
5424
	XLogRecData rdata;
5425

5426
	rdata.data = (char *) (&nextOid);
5427
	rdata.len = sizeof(Oid);
5428
	rdata.buffer = InvalidBuffer;
5429 5430
	rdata.next = NULL;
	(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5431

5432 5433
	/*
	 * We need not flush the NEXTOID record immediately, because any of the
5434 5435 5436 5437 5438
	 * just-allocated OIDs could only reach disk as part of a tuple insert or
	 * update that would have its own XLOG record that must follow the NEXTOID
	 * record.	Therefore, the standard buffer LSN interlock applied to those
	 * records will ensure no such OID reaches disk before the NEXTOID record
	 * does.
5439 5440 5441
	 */
}

Tom Lane's avatar
Tom Lane committed
5442 5443 5444
/*
 * XLOG resource manager's routines
 */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5445 5446 5447
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
5448
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
5449

5450
	if (info == XLOG_NEXTOID)
5451
	{
5452
		Oid			nextOid;
5453 5454 5455

		memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
		if (ShmemVariableCache->nextOid < nextOid)
Tom Lane's avatar
Tom Lane committed
5456
		{
5457
			ShmemVariableCache->nextOid = nextOid;
Tom Lane's avatar
Tom Lane committed
5458 5459 5460 5461 5462 5463 5464 5465 5466 5467 5468 5469
			ShmemVariableCache->oidCount = 0;
		}
	}
	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
		/* In a SHUTDOWN checkpoint, believe the counters exactly */
		ShmemVariableCache->nextXid = checkPoint.nextXid;
		ShmemVariableCache->nextOid = checkPoint.nextOid;
		ShmemVariableCache->oidCount = 0;
5470 5471
		MultiXactSetNextMXact(checkPoint.nextMulti,
							  checkPoint.nextMultiOffset);
Bruce Momjian's avatar
Bruce Momjian committed
5472

5473
		/*
5474
		 * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5475 5476 5477 5478 5479 5480 5481 5482
		 */
		if (checkPoint.ThisTimeLineID != ThisTimeLineID)
		{
			if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
				!list_member_int(expectedTLIs,
								 (int) checkPoint.ThisTimeLineID))
				ereport(PANIC,
						(errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5483
								checkPoint.ThisTimeLineID, ThisTimeLineID)));
5484 5485 5486
			/* Following WAL records should be run with new TLI */
			ThisTimeLineID = checkPoint.ThisTimeLineID;
		}
Tom Lane's avatar
Tom Lane committed
5487 5488 5489 5490 5491 5492
	}
	else if (info == XLOG_CHECKPOINT_ONLINE)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5493
		/* In an ONLINE checkpoint, treat the counters like NEXTOID */
5494 5495
		if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
								  checkPoint.nextXid))
Tom Lane's avatar
Tom Lane committed
5496 5497 5498 5499 5500 5501
			ShmemVariableCache->nextXid = checkPoint.nextXid;
		if (ShmemVariableCache->nextOid < checkPoint.nextOid)
		{
			ShmemVariableCache->nextOid = checkPoint.nextOid;
			ShmemVariableCache->oidCount = 0;
		}
5502 5503
		MultiXactAdvanceNextMXact(checkPoint.nextMulti,
								  checkPoint.nextMultiOffset);
5504 5505
		/* TLI should not change in an on-line checkpoint */
		if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5506
			ereport(PANIC,
5507 5508
					(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
							checkPoint.ThisTimeLineID, ThisTimeLineID)));
5509
	}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5510
}
5511

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5512
void
5513
xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5514
{
5515
	uint8			info = xl_info & ~XLR_INFO_MASK;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5516

Tom Lane's avatar
Tom Lane committed
5517 5518
	if (info == XLOG_CHECKPOINT_SHUTDOWN ||
		info == XLOG_CHECKPOINT_ONLINE)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5519
	{
5520 5521
		CheckPoint *checkpoint = (CheckPoint *) rec;

5522
		appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
5523
				"tli %u; xid %u; oid %u; multi %u; offset %u; %s",
5524 5525
				checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
				checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5526
				checkpoint->ThisTimeLineID, checkpoint->nextXid,
5527
				checkpoint->nextOid,
5528
				checkpoint->nextMulti,
5529
				checkpoint->nextMultiOffset,
5530
				(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
Tom Lane's avatar
Tom Lane committed
5531
	}
5532 5533
	else if (info == XLOG_NEXTOID)
	{
5534
		Oid			nextOid;
5535 5536

		memcpy(&nextOid, rec, sizeof(Oid));
5537
		appendStringInfo(buf, "nextOid: %u", nextOid);
5538
	}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5539
	else
5540
		appendStringInfo(buf, "UNKNOWN");
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5541 5542
}

5543
#ifdef WAL_DEBUG
5544

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5545
static void
5546
xlog_outrec(StringInfo buf, XLogRecord *record)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5547
{
5548
	int			i;
5549

5550
	appendStringInfo(buf, "prev %X/%X; xid %u",
5551 5552
					 record->xl_prev.xlogid, record->xl_prev.xrecoff,
					 record->xl_xid);
5553

5554
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5555
	{
5556 5557
		if (record->xl_info & XLR_SET_BKP_BLOCK(i))
			appendStringInfo(buf, "; bkpb%d", i+1);
5558 5559
	}

5560
	appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
5561
}
Bruce Momjian's avatar
Bruce Momjian committed
5562
#endif   /* WAL_DEBUG */
5563 5564 5565


/*
5566
 * GUC support
5567
 */
5568
const char *
5569
assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5570
{
5571 5572
	int			new_sync_method;
	int			new_sync_bit;
5573

5574
	if (pg_strcasecmp(method, "fsync") == 0)
5575 5576 5577 5578
	{
		new_sync_method = SYNC_METHOD_FSYNC;
		new_sync_bit = 0;
	}
5579 5580 5581 5582 5583 5584 5585
#ifdef HAVE_FSYNC_WRITETHROUGH
	else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
	{
		new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
		new_sync_bit = 0;
	}
#endif
5586
#ifdef HAVE_FDATASYNC
5587
	else if (pg_strcasecmp(method, "fdatasync") == 0)
5588 5589 5590 5591 5592 5593
	{
		new_sync_method = SYNC_METHOD_FDATASYNC;
		new_sync_bit = 0;
	}
#endif
#ifdef OPEN_SYNC_FLAG
5594
	else if (pg_strcasecmp(method, "open_sync") == 0)
5595 5596 5597 5598 5599 5600
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_SYNC_FLAG;
	}
#endif
#ifdef OPEN_DATASYNC_FLAG
5601
	else if (pg_strcasecmp(method, "open_datasync") == 0)
5602 5603 5604 5605 5606 5607
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_DATASYNC_FLAG;
	}
#endif
	else
5608
		return NULL;
5609

5610 5611 5612
	if (!doit)
		return method;

5613 5614 5615
	if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
	{
		/*
5616 5617
		 * To ensure that no blocks escape unsynced, force an fsync on the
		 * currently open log segment (if any).  Also, if the open flag is
5618 5619
		 * changing, close the log file so it will be reopened (with new flag
		 * bit) at next use.
5620 5621 5622 5623
		 */
		if (openLogFile >= 0)
		{
			if (pg_fsync(openLogFile) != 0)
5624 5625
				ereport(PANIC,
						(errcode_for_file_access(),
5626 5627
						 errmsg("could not fsync log file %u, segment %u: %m",
								openLogId, openLogSeg)));
5628
			if (open_sync_bit != new_sync_bit)
5629
				XLogFileClose();
5630 5631 5632 5633
		}
		sync_method = new_sync_method;
		open_sync_bit = new_sync_bit;
	}
5634 5635

	return method;
5636 5637 5638 5639 5640 5641 5642 5643 5644 5645 5646
}


/*
 * Issue appropriate kind of fsync (if any) on the current XLOG output file
 */
static void
issue_xlog_fsync(void)
{
	switch (sync_method)
	{
5647
		case SYNC_METHOD_FSYNC:
5648
			if (pg_fsync_no_writethrough(openLogFile) != 0)
5649 5650
				ereport(PANIC,
						(errcode_for_file_access(),
5651 5652
						 errmsg("could not fsync log file %u, segment %u: %m",
								openLogId, openLogSeg)));
5653
			break;
5654 5655 5656 5657 5658
#ifdef HAVE_FSYNC_WRITETHROUGH
		case SYNC_METHOD_FSYNC_WRITETHROUGH:
			if (pg_fsync_writethrough(openLogFile) != 0)
				ereport(PANIC,
						(errcode_for_file_access(),
5659 5660
						 errmsg("could not fsync write-through log file %u, segment %u: %m",
								openLogId, openLogSeg)));
5661 5662
			break;
#endif
5663 5664 5665
#ifdef HAVE_FDATASYNC
		case SYNC_METHOD_FDATASYNC:
			if (pg_fdatasync(openLogFile) != 0)
5666 5667
				ereport(PANIC,
						(errcode_for_file_access(),
5668 5669
					errmsg("could not fdatasync log file %u, segment %u: %m",
						   openLogId, openLogSeg)));
5670 5671 5672 5673 5674 5675
			break;
#endif
		case SYNC_METHOD_OPEN:
			/* write synced it already */
			break;
		default:
5676
			elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
5677 5678 5679
			break;
	}
}
5680 5681 5682 5683 5684 5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696


/*
 * pg_start_backup: set up for taking an on-line backup dump
 *
 * Essentially what this does is to create a backup label file in $PGDATA,
 * where it will be archived as part of the backup dump.  The label file
 * contains the user-supplied label string (typically this would be used
 * to tell where the backup dump will be stored) and the starting time and
 * starting WAL offset for the dump.
 */
Datum
pg_start_backup(PG_FUNCTION_ARGS)
{
	text	   *backupid = PG_GETARG_TEXT_P(0);
	text	   *result;
	char	   *backupidstr;
5697
	XLogRecPtr	checkpointloc;
5698
	XLogRecPtr	startpoint;
Bruce Momjian's avatar
Bruce Momjian committed
5699
	time_t		stamp_time;
5700 5701 5702 5703 5704 5705 5706
	char		strfbuf[128];
	char		xlogfilename[MAXFNAMELEN];
	uint32		_logId;
	uint32		_logSeg;
	struct stat stat_buf;
	FILE	   *fp;

Bruce Momjian's avatar
Bruce Momjian committed
5707
	if (!superuser())
5708 5709 5710
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 (errmsg("must be superuser to run a backup"))));
5711 5712 5713 5714

	if (!XLogArchivingActive())
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5715
				 (errmsg("WAL archiving is not active"),
5716 5717
				  (errhint("archive_command must be defined before "
						   "online backups can be made safely.")))));
5718

5719
	backupidstr = DatumGetCString(DirectFunctionCall1(textout,
5720
												 PointerGetDatum(backupid)));
Bruce Momjian's avatar
Bruce Momjian committed
5721

5722
	/*
5723 5724 5725 5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737
	 * Mark backup active in shared memory.  We must do full-page WAL writes
	 * during an on-line backup even if not doing so at other times, because
	 * it's quite possible for the backup dump to obtain a "torn" (partially
	 * written) copy of a database page if it reads the page concurrently
	 * with our write to the same page.  This can be fixed as long as the
	 * first write to the page in the WAL sequence is a full-page write.
	 * Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
	 * ensure there are no dirty pages in shared memory that might get
	 * dumped while the backup is in progress without having a corresponding
	 * WAL record.  (Once the backup is complete, we need not force full-page
	 * writes anymore, since we expect that any pages not modified during
	 * the backup interval must have been correctly captured by the backup.)
	 *
	 * We must hold WALInsertLock to change the value of forcePageWrites,
	 * to ensure adequate interlocking against XLogInsert().
5738
	 */
5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749
	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
	if (XLogCtl->Insert.forcePageWrites)
	{
		LWLockRelease(WALInsertLock);
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("a backup is already in progress"),
				 errhint("Run pg_stop_backup() and try again.")));
	}
	XLogCtl->Insert.forcePageWrites = true;
	LWLockRelease(WALInsertLock);
Bruce Momjian's avatar
Bruce Momjian committed
5750

5751 5752 5753 5754 5755 5756 5757 5758 5759 5760
	/* Use a TRY block to ensure we release forcePageWrites if fail below */
	PG_TRY();
	{
		/*
		 * Force a CHECKPOINT.  Aside from being necessary to prevent torn
		 * page problems, this guarantees that two successive backup runs will
		 * have different checkpoint positions and hence different history
		 * file names, even if nothing happened in between.
		 */
		RequestCheckpoint(true, false);
5761

5762 5763 5764 5765 5766 5767 5768 5769 5770
		/*
		 * Now we need to fetch the checkpoint record location, and also its
		 * REDO pointer.  The oldest point in WAL that would be needed to
		 * restore starting from the checkpoint is precisely the REDO pointer.
		 */
		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
		checkpointloc = ControlFile->checkPoint;
		startpoint = ControlFile->checkPointCopy.redo;
		LWLockRelease(ControlFileLock);
Bruce Momjian's avatar
Bruce Momjian committed
5771

5772 5773
		XLByteToSeg(startpoint, _logId, _logSeg);
		XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
Bruce Momjian's avatar
Bruce Momjian committed
5774

5775 5776 5777 5778 5779 5780 5781 5782 5783 5784 5785 5786 5787 5788 5789 5790 5791 5792 5793 5794 5795 5796 5797 5798 5799 5800 5801 5802 5803 5804 5805 5806 5807 5808 5809 5810
		/*
		 * We deliberately use strftime/localtime not the src/timezone
		 * functions, so that backup labels will consistently be recorded in
		 * the same timezone regardless of TimeZone setting.  This matches
		 * elog.c's practice.
		 */
		stamp_time = time(NULL);
		strftime(strfbuf, sizeof(strfbuf),
				 "%Y-%m-%d %H:%M:%S %Z",
				 localtime(&stamp_time));

		/*
		 * Check for existing backup label --- implies a backup is already
		 * running.  (XXX given that we checked forcePageWrites above, maybe
		 * it would be OK to just unlink any such label file?)
		 */
		if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
		{
			if (errno != ENOENT)
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not stat file \"%s\": %m",
								BACKUP_LABEL_FILE)));
		}
		else
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("a backup is already in progress"),
					 errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
							 BACKUP_LABEL_FILE)));

		/*
		 * Okay, write the file
		 */
		fp = AllocateFile(BACKUP_LABEL_FILE, "w");
		if (!fp)
5811 5812
			ereport(ERROR,
					(errcode_for_file_access(),
5813 5814 5815 5816 5817 5818 5819 5820 5821 5822 5823 5824
					 errmsg("could not create file \"%s\": %m",
							BACKUP_LABEL_FILE)));
		fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
				startpoint.xlogid, startpoint.xrecoff, xlogfilename);
		fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
				checkpointloc.xlogid, checkpointloc.xrecoff);
		fprintf(fp, "START TIME: %s\n", strfbuf);
		fprintf(fp, "LABEL: %s\n", backupidstr);
		if (fflush(fp) || ferror(fp) || FreeFile(fp))
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not write file \"%s\": %m",
5825
							BACKUP_LABEL_FILE)));
5826
	}
5827 5828 5829 5830 5831 5832
	PG_CATCH();
	{
		/* Turn off forcePageWrites on failure */
		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
		XLogCtl->Insert.forcePageWrites = false;
		LWLockRelease(WALInsertLock);
Bruce Momjian's avatar
Bruce Momjian committed
5833

5834 5835 5836
		PG_RE_THROW();
	}
	PG_END_TRY();
Bruce Momjian's avatar
Bruce Momjian committed
5837

5838 5839 5840 5841 5842 5843
	/*
	 * We're done.  As a convenience, return the starting WAL offset.
	 */
	snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
			 startpoint.xlogid, startpoint.xrecoff);
	result = DatumGetTextP(DirectFunctionCall1(textin,
5844
											 CStringGetDatum(xlogfilename)));
5845 5846 5847 5848 5849 5850 5851 5852 5853 5854 5855 5856 5857 5858 5859 5860 5861 5862
	PG_RETURN_TEXT_P(result);
}

/*
 * pg_stop_backup: finish taking an on-line backup dump
 *
 * We remove the backup label file created by pg_start_backup, and instead
 * create a backup history file in pg_xlog (whence it will immediately be
 * archived).  The backup history file contains the same info found in
 * the label file, plus the backup-end time and WAL offset.
 */
Datum
pg_stop_backup(PG_FUNCTION_ARGS)
{
	text	   *result;
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogRecPtr	startpoint;
	XLogRecPtr	stoppoint;
Bruce Momjian's avatar
Bruce Momjian committed
5863
	time_t		stamp_time;
5864
	char		strfbuf[128];
5865
	char		histfilepath[MAXPGPATH];
5866 5867 5868 5869 5870 5871 5872 5873 5874
	char		startxlogfilename[MAXFNAMELEN];
	char		stopxlogfilename[MAXFNAMELEN];
	uint32		_logId;
	uint32		_logSeg;
	FILE	   *lfp;
	FILE	   *fp;
	char		ch;
	int			ich;

Bruce Momjian's avatar
Bruce Momjian committed
5875
	if (!superuser())
5876 5877 5878
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 (errmsg("must be superuser to run a backup"))));
Bruce Momjian's avatar
Bruce Momjian committed
5879

5880
	/*
5881
	 * Get the current end-of-WAL position; it will be unsafe to use this dump
5882 5883
	 * to restore to a point in advance of this time.  We can also clear
	 * forcePageWrites here.
5884 5885 5886
	 */
	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
	INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
5887
	XLogCtl->Insert.forcePageWrites = false;
5888 5889 5890 5891
	LWLockRelease(WALInsertLock);

	XLByteToSeg(stoppoint, _logId, _logSeg);
	XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
Bruce Momjian's avatar
Bruce Momjian committed
5892

5893
	/*
5894 5895 5896 5897
	 * We deliberately use strftime/localtime not the src/timezone functions,
	 * so that backup labels will consistently be recorded in the same
	 * timezone regardless of TimeZone setting.  This matches elog.c's
	 * practice.
5898 5899 5900 5901 5902
	 */
	stamp_time = time(NULL);
	strftime(strfbuf, sizeof(strfbuf),
			 "%Y-%m-%d %H:%M:%S %Z",
			 localtime(&stamp_time));
Bruce Momjian's avatar
Bruce Momjian committed
5903

5904 5905 5906
	/*
	 * Open the existing label file
	 */
5907
	lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5908 5909 5910 5911 5912 5913
	if (!lfp)
	{
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m",
5914
							BACKUP_LABEL_FILE)));
5915 5916 5917 5918
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("a backup is not in progress")));
	}
Bruce Momjian's avatar
Bruce Momjian committed
5919

5920
	/*
5921 5922
	 * Read and parse the START WAL LOCATION line (this code is pretty crude,
	 * but we are not expecting any variability in the file format).
5923 5924 5925 5926 5927 5928
	 */
	if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
			   &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
			   &ch) != 4 || ch != '\n')
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5929
				 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
Bruce Momjian's avatar
Bruce Momjian committed
5930

5931 5932 5933 5934
	/*
	 * Write the backup history file
	 */
	XLByteToSeg(startpoint, _logId, _logSeg);
5935
	BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
5936
						  startpoint.xrecoff % XLogSegSize);
5937
	fp = AllocateFile(histfilepath, "w");
5938 5939 5940 5941
	if (!fp)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not create file \"%s\": %m",
5942
						histfilepath)));
5943 5944 5945 5946
	fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
			startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
	fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
			stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
5947
	/* transfer remaining lines from label to history file */
5948 5949 5950 5951 5952 5953 5954
	while ((ich = fgetc(lfp)) != EOF)
		fputc(ich, fp);
	fprintf(fp, "STOP TIME: %s\n", strfbuf);
	if (fflush(fp) || ferror(fp) || FreeFile(fp))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not write file \"%s\": %m",
5955
						histfilepath)));
Bruce Momjian's avatar
Bruce Momjian committed
5956

5957 5958 5959 5960 5961 5962 5963
	/*
	 * Close and remove the backup label file
	 */
	if (ferror(lfp) || FreeFile(lfp))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not read file \"%s\": %m",
5964 5965
						BACKUP_LABEL_FILE)));
	if (unlink(BACKUP_LABEL_FILE) != 0)
5966 5967 5968
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not remove file \"%s\": %m",
5969
						BACKUP_LABEL_FILE)));
Bruce Momjian's avatar
Bruce Momjian committed
5970

5971
	/*
5972 5973 5974
	 * Clean out any no-longer-needed history files.  As a side effect,
	 * this will post a .ready file for the newly created history file,
	 * notifying the archiver that history file may be archived immediately.
5975
	 */
5976
	CleanupBackupHistory();
Bruce Momjian's avatar
Bruce Momjian committed
5977

5978 5979 5980 5981 5982 5983
	/*
	 * We're done.  As a convenience, return the ending WAL offset.
	 */
	snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
			 stoppoint.xlogid, stoppoint.xrecoff);
	result = DatumGetTextP(DirectFunctionCall1(textin,
5984
										 CStringGetDatum(stopxlogfilename)));
5985 5986
	PG_RETURN_TEXT_P(result);
}
5987 5988 5989 5990 5991 5992

/*
 * read_backup_label: check to see if a backup_label file is present
 *
 * If we see a backup_label during recovery, we assume that we are recovering
 * from a backup dump file, and we therefore roll forward from the checkpoint
Bruce Momjian's avatar
Bruce Momjian committed
5993
 * identified by the label file, NOT what pg_control says.	This avoids the
5994 5995 5996 5997 5998 5999 6000 6001 6002 6003 6004 6005 6006 6007 6008 6009 6010 6011 6012 6013 6014 6015 6016 6017 6018 6019 6020 6021 6022 6023
 * problem that pg_control might have been archived one or more checkpoints
 * later than the start of the dump, and so if we rely on it as the start
 * point, we will fail to restore a consistent database state.
 *
 * We also attempt to retrieve the corresponding backup history file.
 * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
 * points.
 *
 * Returns TRUE if a backup_label was found (and fills the checkpoint
 * location into *checkPointLoc); returns FALSE if not.
 */
static bool
read_backup_label(XLogRecPtr *checkPointLoc)
{
	XLogRecPtr	startpoint;
	XLogRecPtr	stoppoint;
	char		histfilename[MAXFNAMELEN];
	char		histfilepath[MAXPGPATH];
	char		startxlogfilename[MAXFNAMELEN];
	char		stopxlogfilename[MAXFNAMELEN];
	TimeLineID	tli;
	uint32		_logId;
	uint32		_logSeg;
	FILE	   *lfp;
	FILE	   *fp;
	char		ch;

	/*
	 * See if label file is present
	 */
6024
	lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6025 6026 6027 6028 6029 6030
	if (!lfp)
	{
		if (errno != ENOENT)
			ereport(FATAL,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m",
6031
							BACKUP_LABEL_FILE)));
6032 6033
		return false;			/* it's not there, all is fine */
	}
Bruce Momjian's avatar
Bruce Momjian committed
6034

6035
	/*
6036 6037 6038
	 * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
	 * is pretty crude, but we are not expecting any variability in the file
	 * format).
6039 6040 6041 6042 6043 6044
	 */
	if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
			   &startpoint.xlogid, &startpoint.xrecoff, &tli,
			   startxlogfilename, &ch) != 5 || ch != '\n')
		ereport(FATAL,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6045
				 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6046 6047 6048 6049 6050
	if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
			   &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
			   &ch) != 3 || ch != '\n')
		ereport(FATAL,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6051
				 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6052 6053 6054 6055
	if (ferror(lfp) || FreeFile(lfp))
		ereport(FATAL,
				(errcode_for_file_access(),
				 errmsg("could not read file \"%s\": %m",
6056
						BACKUP_LABEL_FILE)));
Bruce Momjian's avatar
Bruce Momjian committed
6057

6058 6059 6060 6061 6062 6063 6064 6065 6066 6067 6068 6069 6070
	/*
	 * Try to retrieve the backup history file (no error if we can't)
	 */
	XLByteToSeg(startpoint, _logId, _logSeg);
	BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
						  startpoint.xrecoff % XLogSegSize);

	if (InArchiveRecovery)
		RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
	else
		BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
							  startpoint.xrecoff % XLogSegSize);

Bruce Momjian's avatar
Bruce Momjian committed
6071
	fp = AllocateFile(histfilepath, "r");
6072 6073 6074 6075 6076 6077
	if (fp)
	{
		/*
		 * Parse history file to identify stop point.
		 */
		if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6078
				   &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6079 6080 6081
				   &ch) != 4 || ch != '\n')
			ereport(FATAL,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6082
					 errmsg("invalid data in file \"%s\"", histfilename)));
6083
		if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6084
				   &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6085 6086 6087
				   &ch) != 4 || ch != '\n')
			ereport(FATAL,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6088
					 errmsg("invalid data in file \"%s\"", histfilename)));
6089 6090 6091 6092 6093 6094 6095 6096 6097 6098 6099 6100 6101 6102 6103 6104 6105 6106 6107 6108 6109
		recoveryMinXlogOffset = stoppoint;
		if (ferror(fp) || FreeFile(fp))
			ereport(FATAL,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m",
							histfilepath)));
	}

	return true;
}

/*
 * remove_backup_label: remove any extant backup_label after successful
 * recovery.  Once we have completed the end-of-recovery checkpoint there
 * is no reason to have to replay from the start point indicated by the
 * label (and indeed we'll probably have removed/recycled the needed WAL
 * segments), so remove the label to prevent trouble in later crash recoveries.
 */
static void
remove_backup_label(void)
{
6110
	if (unlink(BACKUP_LABEL_FILE) != 0)
6111 6112 6113 6114
		if (errno != ENOENT)
			ereport(FATAL,
					(errcode_for_file_access(),
					 errmsg("could not remove file \"%s\": %m",
6115
							BACKUP_LABEL_FILE)));
6116
}
6117 6118 6119 6120 6121 6122 6123 6124 6125 6126 6127 6128 6129 6130 6131 6132 6133 6134 6135 6136 6137

/*
 * Error context callback for errors occurring during rm_redo().
 */
static void
rm_redo_error_callback(void *arg)
{
	XLogRecord		*record = (XLogRecord *) arg;
	StringInfoData	 buf;

	initStringInfo(&buf);
	RmgrTable[record->xl_rmid].rm_desc(&buf, 
									   record->xl_info, 
									   XLogRecGetData(record));

	/* don't bother emitting empty description */
	if (buf.len > 0)
		errcontext("xlog redo %s", buf.data);

	pfree(buf.data);
}