Commit 01747692 authored by Tom Lane's avatar Tom Lane

Repair two problems with WAL logging of sequence nextvalI() ops, as

per recent pghackers discussion: force a new WAL record at first nextval
after a checkpoint, and ensure that xlog is flushed to disk if a nextval
record is the only thing emitted by a transaction.
parent 0b73fe14
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.117 2002/03/06 06:09:21 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.118 2002/03/15 19:20:29 tgl Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
......@@ -546,32 +546,48 @@ RecordTransactionCommit(void)
xid = GetCurrentTransactionId();
/*
* We needn't write anything in xlog or clog if the transaction was
* read-only, which we check by testing if it made any xlog entries.
* We only need to log the commit in xlog and clog if the transaction made
* any transaction-controlled XLOG entries. (Otherwise, its XID appears
* nowhere in permanent storage, so no one will ever care if it
* committed.) However, we must flush XLOG to disk if we made any XLOG
* entries, whether in or out of transaction control. For example, if we
* reported a nextval() result to the client, this ensures that any XLOG
* record generated by nextval will hit the disk before we report the
* transaction committed.
*/
if (MyLastRecPtr.xrecoff != 0)
if (MyXactMadeXLogEntry)
{
XLogRecData rdata;
xl_xact_commit xlrec;
XLogRecPtr recptr;
BufmgrCommit();
START_CRIT_SECTION();
if (MyLastRecPtr.xrecoff != 0)
{
/* Need to emit a commit record */
XLogRecData rdata;
xl_xact_commit xlrec;
xlrec.xtime = time(NULL);
rdata.buffer = InvalidBuffer;
rdata.data = (char *) (&xlrec);
rdata.len = SizeOfXactCommit;
rdata.next = NULL;
START_CRIT_SECTION();
/*
* SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
* XXX SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata);
}
else
{
/* Just flush through last record written by me */
recptr = ProcLastRecEnd;
}
/*
* Sleep before commit! So we can flush more than one commit
* Sleep before flush! So we can flush more than one commit
* records per single fsync. (The idea is some other backend may
* do the XLogFlush while we're sleeping. This needs work still,
* because on most Unixen, the minimum select() delay is 10msec or
......@@ -593,15 +609,17 @@ RecordTransactionCommit(void)
XLogFlush(recptr);
/* Break the chain of back-links in the XLOG records I output */
MyLastRecPtr.xrecoff = 0;
/* Mark the transaction committed in clog */
/* Mark the transaction committed in clog, if needed */
if (MyLastRecPtr.xrecoff != 0)
TransactionIdCommit(xid);
END_CRIT_SECTION();
}
/* Break the chain of back-links in the XLOG records I output */
MyLastRecPtr.xrecoff = 0;
MyXactMadeXLogEntry = false;
/* Show myself as out of the transaction in PROC array */
MyProc->logRec.xrecoff = 0;
......@@ -689,8 +707,11 @@ RecordTransactionAbort(void)
TransactionId xid = GetCurrentTransactionId();
/*
* We needn't write anything in xlog or clog if the transaction was
* read-only, which we check by testing if it made any xlog entries.
* We only need to log the abort in xlog and clog if the transaction made
* any transaction-controlled XLOG entries. (Otherwise, its XID appears
* nowhere in permanent storage, so no one will ever care if it
* committed.) We do not flush XLOG to disk in any case, since the
* default assumption after a crash would be that we aborted, anyway.
*
* Extra check here is to catch case that we aborted partway through
* RecordTransactionCommit ...
......@@ -714,11 +735,6 @@ RecordTransactionAbort(void)
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);
/*
* There's no need for XLogFlush here, since the default
* assumption would be that we aborted, anyway.
*/
/* Mark the transaction aborted in clog */
TransactionIdAbort(xid);
......@@ -727,6 +743,8 @@ RecordTransactionAbort(void)
/* Break the chain of back-links in the XLOG records I output */
MyLastRecPtr.xrecoff = 0;
MyXactMadeXLogEntry = false;
/* Show myself as out of the transaction in PROC array */
MyProc->logRec.xrecoff = 0;
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.89 2002/03/06 06:09:22 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.90 2002/03/15 19:20:30 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -131,27 +131,36 @@ bool InRecovery = false;
/*
* MyLastRecPtr points to the start of the last XLOG record inserted by the
* current transaction. If MyLastRecPtr.xrecoff == 0, then we are not in
* a transaction or the transaction has not yet made any loggable changes.
* current transaction. If MyLastRecPtr.xrecoff == 0, then the current
* xact hasn't yet inserted any transaction-controlled XLOG records.
*
* Note that XLOG records inserted outside transaction control are not
* reflected into MyLastRecPtr.
* reflected into MyLastRecPtr. They do, however, cause MyXactMadeXLogEntry
* to be set true. The latter can be used to test whether the current xact
* made any loggable changes (including out-of-xact changes, such as
* sequence updates).
*/
XLogRecPtr MyLastRecPtr = {0, 0};
bool MyXactMadeXLogEntry = false;
/*
* ProcLastRecPtr points to the start of the last XLOG record inserted by the
* current backend. It is updated for all inserts, transaction-controlled
* or not.
* or not. ProcLastRecEnd is similar but points to end+1 of last record.
*/
static XLogRecPtr ProcLastRecPtr = {0, 0};
XLogRecPtr ProcLastRecEnd = {0, 0};
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
* hold the Insert lock). See XLogInsert for details.
* hold the Insert lock). See XLogInsert for details. We are also allowed
* to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr.
*/
static XLogRecPtr RedoRecPtr;
......@@ -272,7 +281,8 @@ typedef struct XLogCtlData
StartUpID ThisStartUpID;
/* This value is not protected by *any* lock... */
XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */
/* see SetSavedRedoRecPtr/GetSavedRedoRecPtr */
XLogRecPtr SavedRedoRecPtr;
slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
} XLogCtlData;
......@@ -777,6 +787,7 @@ begin:;
MyLastRecPtr = RecPtr;
ProcLastRecPtr = RecPtr;
Insert->PrevRecord = RecPtr;
MyXactMadeXLogEntry = true;
Insert->currpos += SizeOfXLogRecord;
freespace -= SizeOfXLogRecord;
......@@ -855,6 +866,8 @@ begin:;
SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
}
ProcLastRecEnd = RecPtr;
END_CRIT_SECTION();
return (RecPtr);
......@@ -2538,7 +2551,7 @@ StartupXLOG(void)
ThisStartUpID = checkPoint.ThisStartUpID;
RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
XLogCtl->RedoRecPtr = checkPoint.redo;
XLogCtl->SavedRedoRecPtr = checkPoint.redo;
if (XLByteLT(RecPtr, checkPoint.redo))
elog(PANIC, "invalid redo in checkpoint record");
......@@ -2824,32 +2837,47 @@ void
SetThisStartUpID(void)
{
ThisStartUpID = XLogCtl->ThisStartUpID;
RedoRecPtr = XLogCtl->RedoRecPtr;
RedoRecPtr = XLogCtl->SavedRedoRecPtr;
}
/*
* CheckPoint process called by postmaster saves copy of new RedoRecPtr
* in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster
* calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
* subsequently-spawned backends will start out with a reasonably up-to-date
* local RedoRecPtr. Since these operations are not protected by any lock
* and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
* routines at other times!
*
* Note: once spawned, a backend must update its local RedoRecPtr from
* XLogCtl->Insert.RedoRecPtr while holding the insert lock. This is
* done in XLogInsert().
* in shmem (using SetSavedRedoRecPtr). When checkpointer completes,
* postmaster calls GetSavedRedoRecPtr to update its own copy of RedoRecPtr,
* so that subsequently-spawned backends will start out with a reasonably
* up-to-date local RedoRecPtr. Since these operations are not protected by
* any lock and copying an XLogRecPtr isn't atomic, it's unsafe to use either
* of these routines at other times!
*/
void
SetRedoRecPtr(void)
SetSavedRedoRecPtr(void)
{
XLogCtl->RedoRecPtr = RedoRecPtr;
XLogCtl->SavedRedoRecPtr = RedoRecPtr;
}
void
GetSavedRedoRecPtr(void)
{
RedoRecPtr = XLogCtl->SavedRedoRecPtr;
}
/*
* Once spawned, a backend may update its local RedoRecPtr from
* XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
* to do so. This is done in XLogInsert() or GetRedoRecPtr().
*/
XLogRecPtr
GetRedoRecPtr(void)
{
RedoRecPtr = XLogCtl->RedoRecPtr;
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
RedoRecPtr = xlogctl->Insert.RedoRecPtr;
SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
return RedoRecPtr;
}
/*
......@@ -2862,6 +2890,7 @@ ShutdownXLOG(void)
/* suppress in-transaction check in CreateCheckPoint */
MyLastRecPtr.xrecoff = 0;
MyXactMadeXLogEntry = false;
CritSectionCount++;
CreateDummyCaches();
......@@ -2886,7 +2915,7 @@ CreateCheckPoint(bool shutdown)
uint32 _logId;
uint32 _logSeg;
if (MyLastRecPtr.xrecoff != 0)
if (MyXactMadeXLogEntry)
elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
/*
......@@ -2972,9 +3001,16 @@ CreateCheckPoint(bool shutdown)
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls;
* this must be done while holding the insert lock.
* this must be done while holding the insert lock AND the info_lck.
*/
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
}
/*
* Get UNDO record ptr - this is oldest of PROC->logRec values. We do
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.123 2002/03/08 00:42:09 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.124 2002/03/15 19:20:34 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -386,7 +386,7 @@ BootstrapMain(int argc, char *argv[])
InitDummyProcess(); /* needed to get LWLocks */
CreateDummyCaches();
CreateCheckPoint(false);
SetRedoRecPtr();
SetSavedRedoRecPtr(); /* pass redo ptr back to postmaster */
proc_exit(0); /* done */
case BS_XLOG_STARTUP:
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.70 2002/03/06 06:09:35 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.71 2002/03/15 19:20:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -286,6 +286,7 @@ nextval(PG_FUNCTION_ARGS)
char *seqname = get_seq_name(seqin);
SeqTable elm;
Buffer buf;
Page page;
Form_pg_sequence seq;
int64 incby,
maxv,
......@@ -316,6 +317,7 @@ nextval(PG_FUNCTION_ARGS)
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
page = BufferGetPage(buf);
last = next = result = seq->last_value;
incby = seq->increment_by;
......@@ -331,11 +333,33 @@ nextval(PG_FUNCTION_ARGS)
log--;
}
/*
* Decide whether we should emit a WAL log record. If so, force up
* the fetch count to grab SEQ_LOG_VALS more values than we actually
* need to cache. (These will then be usable without logging.)
*
* If this is the first nextval after a checkpoint, we must force
* a new WAL record to be written anyway, else replay starting from the
* checkpoint would fail to advance the sequence past the logged
* values. In this case we may as well fetch extra values.
*/
if (log < fetch)
{
fetch = log = fetch - log + SEQ_LOG_VALS;
/* forced log to satisfy local demand for values */
fetch = log = fetch + SEQ_LOG_VALS;
logit = true;
}
else
{
XLogRecPtr redoptr = GetRedoRecPtr();
if (XLByteLE(PageGetLSN(page), redoptr))
{
/* last update of seq was before checkpoint */
fetch = log = fetch + SEQ_LOG_VALS;
logit = true;
}
}
while (fetch) /* try to fetch cache [+ log ] numbers */
{
......@@ -386,6 +410,9 @@ nextval(PG_FUNCTION_ARGS)
}
}
log -= fetch; /* adjust for any unfetched numbers */
Assert(log >= 0);
/* save info in local cache */
elm->last = result; /* last returned number */
elm->cached = last; /* last fetched number */
......@@ -396,7 +423,6 @@ nextval(PG_FUNCTION_ARGS)
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node;
rdata[0].buffer = InvalidBuffer;
......@@ -417,15 +443,11 @@ nextval(PG_FUNCTION_ARGS)
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
if (fetch) /* not all numbers were fetched */
log -= fetch;
}
/* update on-disk data */
seq->last_value = last; /* last fetched number */
seq->is_called = true;
Assert(log >= 0);
seq->log_cnt = log; /* how much is logged */
END_CRIT_SECTION();
......
......@@ -37,7 +37,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.270 2002/03/04 01:46:03 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.271 2002/03/15 19:20:35 tgl Exp $
*
* NOTES
*
......@@ -1683,7 +1683,7 @@ CleanupProc(int pid,
{
checkpointed = time(NULL);
/* Update RedoRecPtr for future child backends */
GetRedoRecPtr();
GetSavedRedoRecPtr();
}
}
else
......
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: xlog.h,v 1.28 2001/11/05 17:46:31 momjian Exp $
* $Id: xlog.h,v 1.29 2002/03/15 19:20:36 tgl Exp $
*/
#ifndef XLOG_H
#define XLOG_H
......@@ -178,6 +178,8 @@ typedef struct XLogRecData
extern StartUpID ThisStartUpID; /* current SUI */
extern bool InRecovery;
extern XLogRecPtr MyLastRecPtr;
extern bool MyXactMadeXLogEntry;
extern XLogRecPtr ProcLastRecEnd;
/* these variables are GUC parameters related to XLOG */
extern int CheckPointSegments;
......@@ -205,8 +207,9 @@ extern void ShutdownXLOG(void);
extern void CreateCheckPoint(bool shutdown);
extern void SetThisStartUpID(void);
extern void XLogPutNextOid(Oid nextOid);
extern void SetRedoRecPtr(void);
extern void GetRedoRecPtr(void);
extern void SetSavedRedoRecPtr(void);
extern void GetSavedRedoRecPtr(void);
extern XLogRecPtr GetRedoRecPtr(void);
/* in storage/ipc/sinval.c, but don't want to declare in sinval.h because
* we'd have to include xlog.h into that ...
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment