Commit ea42cc18 authored by Robert Haas's avatar Robert Haas

Track the oldest XID that can be safely looked up in CLOG.

This provides infrastructure for looking up arbitrary, user-supplied
XIDs without a risk of scary-looking failures from within the clog
module.  Normally, the oldest XID that can be safely looked up in CLOG
is the same as the oldest XID that can reused without causing
wraparound, and the latter is already tracked.  However, while
truncation is in progress, the values are different, so we must
keep track of them separately.

Craig Ringer, reviewed by Simon Riggs and by me.

Discussion: http://postgr.es/m/CAMsr+YHQiWNEi0daCTboS40T+V5s_+dst3PYv_8v2wNVH+Xx4g@mail.gmail.com
parent 50c956ad
...@@ -1017,6 +1017,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1017,6 +1017,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>OldSnapshotTimeMapLock</></entry> <entry><literal>OldSnapshotTimeMapLock</></entry>
<entry>Waiting to read or update old snapshot control information.</entry> <entry>Waiting to read or update old snapshot control information.</entry>
</row> </row>
<row>
<entry><literal>CLogTruncationLock</></entry>
<entry>Waiting to truncate the transaction log or waiting for transaction log truncation to finish.</entry>
</row>
<row> <row>
<entry><literal>clog</></entry> <entry><literal>clog</></entry>
<entry>Waiting for I/O on a clog (transaction status) buffer.</entry> <entry>Waiting for I/O on a clog (transaction status) buffer.</entry>
......
...@@ -23,12 +23,20 @@ clog_desc(StringInfo buf, XLogReaderState *record) ...@@ -23,12 +23,20 @@ clog_desc(StringInfo buf, XLogReaderState *record)
char *rec = XLogRecGetData(record); char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (info == CLOG_ZEROPAGE || info == CLOG_TRUNCATE) if (info == CLOG_ZEROPAGE)
{ {
int pageno; int pageno;
memcpy(&pageno, rec, sizeof(int)); memcpy(&pageno, rec, sizeof(int));
appendStringInfo(buf, "%d", pageno); appendStringInfo(buf, "page %d", pageno);
}
else if (info == CLOG_TRUNCATE)
{
xl_clog_truncate xlrec;
memcpy(&xlrec, rec, sizeof(xl_clog_truncate));
appendStringInfo(buf, "page %d; oldestXact %u",
xlrec.pageno, xlrec.oldestXact);
} }
} }
......
...@@ -83,7 +83,8 @@ static SlruCtlData ClogCtlData; ...@@ -83,7 +83,8 @@ static SlruCtlData ClogCtlData;
static int ZeroCLOGPage(int pageno, bool writeXlog); static int ZeroCLOGPage(int pageno, bool writeXlog);
static bool CLOGPagePrecedes(int page1, int page2); static bool CLOGPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno); static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno); static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact,
Oid oldestXidDb);
static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids, static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, TransactionId *subxids, XidStatus status,
XLogRecPtr lsn, int pageno); XLogRecPtr lsn, int pageno);
...@@ -640,7 +641,7 @@ ExtendCLOG(TransactionId newestXact) ...@@ -640,7 +641,7 @@ ExtendCLOG(TransactionId newestXact)
* the XLOG flush unless we have confirmed that there is a removable segment. * the XLOG flush unless we have confirmed that there is a removable segment.
*/ */
void void
TruncateCLOG(TransactionId oldestXact) TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
{ {
int cutoffPage; int cutoffPage;
...@@ -654,8 +655,26 @@ TruncateCLOG(TransactionId oldestXact) ...@@ -654,8 +655,26 @@ TruncateCLOG(TransactionId oldestXact)
if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage)) if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
return; /* nothing to remove */ return; /* nothing to remove */
/* Write XLOG record and flush XLOG to disk */ /*
WriteTruncateXlogRec(cutoffPage); * Advance oldestClogXid before truncating clog, so concurrent xact status
* lookups can ensure they don't attempt to access truncated-away clog.
*
* It's only necessary to do this if we will actually truncate away clog
* pages.
*/
AdvanceOldestClogXid(oldestXact);
/* vac_truncate_clog already advanced oldestXid */
Assert(TransactionIdPrecedesOrEquals(oldestXact,
ShmemVariableCache->oldestXid));
/*
* Write XLOG record and flush XLOG to disk. We record the oldest xid we're
* keeping information about here so we can ensure that it's always ahead
* of clog truncation in case we crash, and so a standby finds out the new
* valid xid before the next checkpoint.
*/
WriteTruncateXlogRec(cutoffPage, oldestXact, oldestxid_datoid);
/* Now we can remove the old CLOG segment(s) */ /* Now we can remove the old CLOG segment(s) */
SimpleLruTruncate(ClogCtl, cutoffPage); SimpleLruTruncate(ClogCtl, cutoffPage);
...@@ -704,12 +723,17 @@ WriteZeroPageXlogRec(int pageno) ...@@ -704,12 +723,17 @@ WriteZeroPageXlogRec(int pageno)
* in TruncateCLOG(). * in TruncateCLOG().
*/ */
static void static void
WriteTruncateXlogRec(int pageno) WriteTruncateXlogRec(int pageno, TransactionId oldestXact, Oid oldestXactDb)
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
xl_clog_truncate xlrec;
xlrec.pageno = pageno;
xlrec.oldestXact = oldestXact;
xlrec.oldestXactDb = oldestXactDb;
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) (&pageno), sizeof(int)); XLogRegisterData((char *) (&xlrec), sizeof(xl_clog_truncate));
recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE); recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE);
XLogFlush(recptr); XLogFlush(recptr);
} }
...@@ -742,17 +766,19 @@ clog_redo(XLogReaderState *record) ...@@ -742,17 +766,19 @@ clog_redo(XLogReaderState *record)
} }
else if (info == CLOG_TRUNCATE) else if (info == CLOG_TRUNCATE)
{ {
int pageno; xl_clog_truncate xlrec;
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_clog_truncate));
/* /*
* During XLOG replay, latest_page_number isn't set up yet; insert a * During XLOG replay, latest_page_number isn't set up yet; insert a
* suitable value to bypass the sanity test in SimpleLruTruncate. * suitable value to bypass the sanity test in SimpleLruTruncate.
*/ */
ClogCtl->shared->latest_page_number = pageno; ClogCtl->shared->latest_page_number = xlrec.pageno;
AdvanceOldestClogXid(xlrec.oldestXact);
SimpleLruTruncate(ClogCtl, pageno); SimpleLruTruncate(ClogCtl, xlrec.pageno);
} }
else else
elog(PANIC, "clog_redo: unknown op code %u", info); elog(PANIC, "clog_redo: unknown op code %u", info);
......
...@@ -119,7 +119,7 @@ TransactionLogFetch(TransactionId transactionId) ...@@ -119,7 +119,7 @@ TransactionLogFetch(TransactionId transactionId)
* True iff transaction associated with the identifier did commit. * True iff transaction associated with the identifier did commit.
* *
* Note: * Note:
* Assumes transaction identifier is valid. * Assumes transaction identifier is valid and exists in clog.
*/ */
bool /* true if given transaction committed */ bool /* true if given transaction committed */
TransactionIdDidCommit(TransactionId transactionId) TransactionIdDidCommit(TransactionId transactionId)
...@@ -175,7 +175,7 @@ TransactionIdDidCommit(TransactionId transactionId) ...@@ -175,7 +175,7 @@ TransactionIdDidCommit(TransactionId transactionId)
* True iff transaction associated with the identifier did abort. * True iff transaction associated with the identifier did abort.
* *
* Note: * Note:
* Assumes transaction identifier is valid. * Assumes transaction identifier is valid and exists in clog.
*/ */
bool /* true if given transaction aborted */ bool /* true if given transaction aborted */
TransactionIdDidAbort(TransactionId transactionId) TransactionIdDidAbort(TransactionId transactionId)
......
...@@ -259,7 +259,28 @@ ReadNewTransactionId(void) ...@@ -259,7 +259,28 @@ ReadNewTransactionId(void)
} }
/* /*
* Determine the last safe XID to allocate given the currently oldest * Advance the cluster-wide value for the oldest valid clog entry.
*
* We must acquire CLogTruncationLock to advance the oldestClogXid. It's not
* necessary to hold the lock during the actual clog truncation, only when we
* advance the limit, as code looking up arbitrary xids is required to hold
* CLogTruncationLock from when it tests oldestClogXid through to when it
* completes the clog lookup.
*/
void
AdvanceOldestClogXid(TransactionId oldest_datfrozenxid)
{
LWLockAcquire(CLogTruncationLock, LW_EXCLUSIVE);
if (TransactionIdPrecedes(ShmemVariableCache->oldestClogXid,
oldest_datfrozenxid))
{
ShmemVariableCache->oldestClogXid = oldest_datfrozenxid;
}
LWLockRelease(CLogTruncationLock);
}
/*
* Determine the last safe XID to allocate using the currently oldest
* datfrozenxid (ie, the oldest XID that might exist in any database * datfrozenxid (ie, the oldest XID that might exist in any database
* of our cluster), and the OID of the (or a) database with that value. * of our cluster), and the OID of the (or a) database with that value.
*/ */
......
...@@ -5016,6 +5016,7 @@ BootStrapXLOG(void) ...@@ -5016,6 +5016,7 @@ BootStrapXLOG(void)
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
AdvanceOldestClogXid(checkPoint.oldestXid);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
...@@ -6622,6 +6623,7 @@ StartupXLOG(void) ...@@ -6622,6 +6623,7 @@ StartupXLOG(void)
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
AdvanceOldestClogXid(checkPoint.oldestXid);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
SetCommitTsLimit(checkPoint.oldestCommitTsXid, SetCommitTsLimit(checkPoint.oldestCommitTsXid,
...@@ -8687,6 +8689,11 @@ CreateCheckPoint(int flags) ...@@ -8687,6 +8689,11 @@ CreateCheckPoint(int flags)
/* /*
* Get the other info we need for the checkpoint record. * Get the other info we need for the checkpoint record.
*
* We don't need to save oldestClogXid in the checkpoint, it only matters
* for the short period in which clog is being truncated, and if we crash
* during that we'll redo the clog truncation and fix up oldestClogXid
* there.
*/ */
LWLockAcquire(XidGenLock, LW_SHARED); LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid; checkPoint.nextXid = ShmemVariableCache->nextXid;
...@@ -9616,6 +9623,10 @@ xlog_redo(XLogReaderState *record) ...@@ -9616,6 +9623,10 @@ xlog_redo(XLogReaderState *record)
MultiXactAdvanceOldest(checkPoint.oldestMulti, MultiXactAdvanceOldest(checkPoint.oldestMulti,
checkPoint.oldestMultiDB); checkPoint.oldestMultiDB);
/*
* No need to set oldestClogXid here as well; it'll be set when we
* redo an xl_clog_truncate if it changed since initialization.
*/
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
/* /*
......
...@@ -1194,7 +1194,7 @@ vac_truncate_clog(TransactionId frozenXID, ...@@ -1194,7 +1194,7 @@ vac_truncate_clog(TransactionId frozenXID,
/* /*
* Truncate CLOG, multixact and CommitTs to the oldest computed value. * Truncate CLOG, multixact and CommitTs to the oldest computed value.
*/ */
TruncateCLOG(frozenXID); TruncateCLOG(frozenXID, oldestxid_datoid);
TruncateCommitTs(frozenXID); TruncateCommitTs(frozenXID);
TruncateMultiXact(minMulti, minmulti_datoid); TruncateMultiXact(minMulti, minmulti_datoid);
......
...@@ -49,3 +49,4 @@ MultiXactTruncationLock 41 ...@@ -49,3 +49,4 @@ MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42 OldSnapshotTimeMapLock 42
BackendRandomLock 43 BackendRandomLock 43
LogicalRepWorkerLock 44 LogicalRepWorkerLock 44
CLogTruncationLock 45
...@@ -28,6 +28,12 @@ typedef int XidStatus; ...@@ -28,6 +28,12 @@ typedef int XidStatus;
#define TRANSACTION_STATUS_ABORTED 0x02 #define TRANSACTION_STATUS_ABORTED 0x02
#define TRANSACTION_STATUS_SUB_COMMITTED 0x03 #define TRANSACTION_STATUS_SUB_COMMITTED 0x03
typedef struct xl_clog_truncate
{
int pageno;
TransactionId oldestXact;
Oid oldestXactDb;
} xl_clog_truncate;
extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn); TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
...@@ -42,7 +48,7 @@ extern void TrimCLOG(void); ...@@ -42,7 +48,7 @@ extern void TrimCLOG(void);
extern void ShutdownCLOG(void); extern void ShutdownCLOG(void);
extern void CheckPointCLOG(void); extern void CheckPointCLOG(void);
extern void ExtendCLOG(TransactionId newestXact); extern void ExtendCLOG(TransactionId newestXact);
extern void TruncateCLOG(TransactionId oldestXact); extern void TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid);
/* XLOG stuff */ /* XLOG stuff */
#define CLOG_ZEROPAGE 0x00 #define CLOG_ZEROPAGE 0x00
......
...@@ -134,6 +134,12 @@ typedef struct VariableCacheData ...@@ -134,6 +134,12 @@ typedef struct VariableCacheData
*/ */
TransactionId latestCompletedXid; /* newest XID that has committed or TransactionId latestCompletedXid; /* newest XID that has committed or
* aborted */ * aborted */
/*
* These fields are protected by CLogTruncationLock
*/
TransactionId oldestClogXid; /* oldest it's safe to look up in clog */
} VariableCacheData; } VariableCacheData;
typedef VariableCacheData *VariableCache; typedef VariableCacheData *VariableCache;
...@@ -173,6 +179,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact); ...@@ -173,6 +179,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact);
extern TransactionId ReadNewTransactionId(void); extern TransactionId ReadNewTransactionId(void);
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
Oid oldest_datoid); Oid oldest_datoid);
extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
extern bool ForceTransactionIdLimitUpdate(void); extern bool ForceTransactionIdLimitUpdate(void);
extern Oid GetNewObjectId(void); extern Oid GetNewObjectId(void);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD096 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD097 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment