Commit 2fc7af5e authored by Thomas Munro's avatar Thomas Munro

Add basic infrastructure for 64 bit transaction IDs.

Instead of inferring epoch progress from xids and checkpoints,
introduce a 64 bit FullTransactionId type and use it to track xid
generation.  This fixes an unlikely bug where the epoch is reported
incorrectly if the range of active xids wraps around more than once
between checkpoints.

The only user-visible effect of this commit is to correct the epoch
used by txid_current() and txid_status(), also visible with
pg_controldata, in those rare circumstances.  It also creates some
basic infrastructure so that later patches can use 64 bit
transaction IDs in more places.

The new type is a struct that we pass by value, as a form of strong
typedef.  This prevents the sort of accidental confusion between
TransactionId and FullTransactionId that would be possible if we
were to use a plain old uint64.

Author: Thomas Munro
Reported-by: Amit Kapila
Reviewed-by: Andres Freund, Tom Lane, Heikki Linnakangas
Discussion: https://postgr.es/m/CAA4eK1%2BMv%2Bmb0HFfWM9Srtc6MVe160WFurXV68iAFMcagRZ0dQ%40mail.gmail.com
parent 2a96909a
......@@ -14,6 +14,7 @@
*/
#include "postgres.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "catalog/pg_control.h"
......@@ -52,7 +53,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
checkpoint->ThisTimeLineID,
checkpoint->PrevTimeLineID,
checkpoint->fullPageWrites ? "true" : "false",
checkpoint->nextXidEpoch, checkpoint->nextXid,
EpochFromFullTransactionId(checkpoint->nextFullXid),
XidFromFullTransactionId(checkpoint->nextFullXid),
checkpoint->nextOid,
checkpoint->nextMulti,
checkpoint->nextMultiOffset,
......
......@@ -749,12 +749,12 @@ ZeroCLOGPage(int pageno, bool writeXlog)
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
* after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
*/
void
StartupCLOG(void)
{
TransactionId xid = ShmemVariableCache->nextXid;
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
int pageno = TransactionIdToPage(xid);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
......@@ -773,7 +773,7 @@ StartupCLOG(void)
void
TrimCLOG(void)
{
TransactionId xid = ShmemVariableCache->nextXid;
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
int pageno = TransactionIdToPage(xid);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
......@@ -792,7 +792,7 @@ TrimCLOG(void)
* but makes no WAL entry). Let's just be safe. (We need not worry about
* pages beyond the current one, since those will be zeroed when first
* used. For the same reason, there is no need to do anything when
* nextXid is exactly at a page boundary; and it's likely that the
* nextFullXid is exactly at a page boundary; and it's likely that the
* "current" page doesn't exist yet in that case.)
*/
if (TransactionIdToPgIndex(xid) != 0)
......
......@@ -553,7 +553,7 @@ ZeroCommitTsPage(int pageno, bool writeXlog)
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
* after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
*/
void
StartupCommitTs(void)
......@@ -643,7 +643,7 @@ ActivateCommitTs(void)
}
LWLockRelease(CommitTsLock);
xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
pageno = TransactionIdToCTsPage(xid);
/*
......
......@@ -3267,9 +3267,9 @@ multixact_redo(XLogReaderState *record)
xlrec->moff + xlrec->nmembers);
/*
* Make sure nextXid is beyond any XID mentioned in the record. This
* should be unnecessary, since any XID found here ought to have other
* evidence in the XLOG, but let's be safe.
* Make sure nextFullXid is beyond any XID mentioned in the record.
* This should be unnecessary, since any XID found here ought to have
* other evidence in the XLOG, but let's be safe.
*/
max_xid = XLogRecGetXid(record);
for (i = 0; i < xlrec->nmembers; i++)
......@@ -3278,19 +3278,7 @@ multixact_redo(XLogReaderState *record)
max_xid = xlrec->members[i].xid;
}
/*
* We don't expect anyone else to modify nextXid, hence startup
* process doesn't need to hold a lock while checking this. We still
* acquire the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
AdvanceNextFullTransactionIdPastXid(max_xid);
}
else if (info == XLOG_MULTIXACT_TRUNCATE_ID)
{
......
......@@ -241,14 +241,15 @@ ZeroSUBTRANSPage(int pageno)
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
* after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
*
* oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
* oldestActiveXID is the oldest XID of any prepared transaction, or nextFullXid
* if there are none.
*/
void
StartupSUBTRANS(TransactionId oldestActiveXID)
{
FullTransactionId nextFullXid;
int startPage;
int endPage;
......@@ -261,7 +262,8 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
startPage = TransactionIdToPage(oldestActiveXID);
endPage = TransactionIdToPage(ShmemVariableCache->nextXid);
nextFullXid = ShmemVariableCache->nextFullXid;
endPage = TransactionIdToPage(XidFromFullTransactionId(nextFullXid));
while (startPage != endPage)
{
......
......@@ -1878,16 +1878,16 @@ restoreTwoPhaseData(void)
*
* Scan the shared memory entries of TwoPhaseState and determine the range
* of valid XIDs present. This is run during database startup, after we
* have completed reading WAL. ShmemVariableCache->nextXid has been set to
* have completed reading WAL. ShmemVariableCache->nextFullXid has been set to
* one more than the highest XID for which evidence exists in WAL.
*
* We throw away any prepared xacts with main XID beyond nextXid --- if any
* We throw away any prepared xacts with main XID beyond nextFullXid --- if any
* are present, it suggests that the DBA has done a PITR recovery to an
* earlier point in time without cleaning out pg_twophase. We dare not
* try to recover such prepared xacts since they likely depend on database
* state that doesn't exist now.
*
* However, we will advance nextXid beyond any subxact XIDs belonging to
* However, we will advance nextFullXid beyond any subxact XIDs belonging to
* valid prepared xacts. We need to do this since subxact commit doesn't
* write a WAL entry, and so there might be no evidence in WAL of those
* subxact XIDs.
......@@ -1897,7 +1897,7 @@ restoreTwoPhaseData(void)
* backup should be rolled in.
*
* Our other responsibility is to determine and return the oldest valid XID
* among the prepared xacts (if none, return ShmemVariableCache->nextXid).
* among the prepared xacts (if none, return ShmemVariableCache->nextFullXid).
* This is needed to synchronize pg_subtrans startup properly.
*
* If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
......@@ -1907,7 +1907,8 @@ restoreTwoPhaseData(void)
TransactionId
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
int nxids = 0;
......@@ -2123,7 +2124,7 @@ RecoverPreparedTransactions(void)
*
* If setParent is true, set up subtransaction parent linkages.
*
* If setNextXid is true, set ShmemVariableCache->nextXid to the newest
* If setNextXid is true, set ShmemVariableCache->nextFullXid to the newest
* value scanned.
*/
static char *
......@@ -2132,7 +2133,8 @@ ProcessTwoPhaseBuffer(TransactionId xid,
bool fromdisk,
bool setParent, bool setNextXid)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
......@@ -2212,7 +2214,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
/*
* Examine subtransaction XIDs ... they should all follow main XID, and
* they may force us to advance nextXid.
* they may force us to advance nextFullXid.
*/
subxids = (TransactionId *) (buf +
MAXALIGN(sizeof(TwoPhaseFileHeader)) +
......@@ -2223,25 +2225,9 @@ ProcessTwoPhaseBuffer(TransactionId xid,
Assert(TransactionIdFollows(subxid, xid));
/* update nextXid if needed */
if (setNextXid &&
TransactionIdFollowsOrEquals(subxid,
ShmemVariableCache->nextXid))
{
/*
* We don't expect anyone else to modify nextXid, hence we don't
* need to hold a lock while examining it. We still acquire the
* lock to modify it, though, so we recheck.
*/
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
if (TransactionIdFollowsOrEquals(subxid,
ShmemVariableCache->nextXid))
{
ShmemVariableCache->nextXid = subxid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
}
LWLockRelease(XidGenLock);
}
/* update nextFullXid if needed */
if (setNextXid)
AdvanceNextFullTransactionIdPastXid(subxid);
if (setParent)
SubTransSetParent(subxid, xid);
......
......@@ -73,7 +73,7 @@ GetNewTransactionId(bool isSubXact)
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
/*----------
* Check to see if it's safe to assign another XID. This protects against
......@@ -156,7 +156,7 @@ GetNewTransactionId(bool isSubXact)
/* Re-acquire lock and start over */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
}
/*
......@@ -173,12 +173,12 @@ GetNewTransactionId(bool isSubXact)
ExtendSUBTRANS(xid);
/*
* Now advance the nextXid counter. This must not happen until after we
* have successfully completed ExtendCLOG() --- if that routine fails, we
* want the next incoming transaction to try it again. We cannot assign
* more XIDs until there is CLOG space for them.
* Now advance the nextFullXid counter. This must not happen until after
* we have successfully completed ExtendCLOG() --- if that routine fails,
* we want the next incoming transaction to try it again. We cannot
* assign more XIDs until there is CLOG space for them.
*/
TransactionIdAdvance(ShmemVariableCache->nextXid);
FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
/*
* We must store the new XID into the shared ProcArray before releasing
......@@ -236,18 +236,64 @@ GetNewTransactionId(bool isSubXact)
}
/*
* Read nextXid but don't allocate it.
* Read nextFullXid but don't allocate it.
*/
TransactionId
ReadNewTransactionId(void)
FullTransactionId
ReadNextFullTransactionId(void)
{
TransactionId xid;
FullTransactionId fullXid;
LWLockAcquire(XidGenLock, LW_SHARED);
xid = ShmemVariableCache->nextXid;
fullXid = ShmemVariableCache->nextFullXid;
LWLockRelease(XidGenLock);
return xid;
return fullXid;
}
/*
* Advance nextFullXid to the value after a given xid. The epoch is inferred.
* This must only be called during recovery or from two-phase start-up code.
*/
void
AdvanceNextFullTransactionIdPastXid(TransactionId xid)
{
FullTransactionId newNextFullXid;
TransactionId next_xid;
uint32 epoch;
/*
* It is safe to read nextFullXid without a lock, because this is only
* called from the startup process or single-process mode, meaning that no
* other process can modify it.
*/
Assert(AmStartupProcess() || !IsUnderPostmaster);
/* Fast return if this isn't an xid high enough to move the needle. */
next_xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
if (!TransactionIdFollowsOrEquals(xid, next_xid))
return;
/*
* Compute the FullTransactionId that comes after the given xid. To do
* this, we preserve the existing epoch, but detect when we've wrapped
* into a new epoch. This is necessary because WAL records and 2PC state
* currently contain 32 bit xids. The wrap logic is safe in those cases
* because the span of active xids cannot exceed one epoch at any given
* point in the WAL stream.
*/
TransactionIdAdvance(xid);
epoch = EpochFromFullTransactionId(ShmemVariableCache->nextFullXid);
if (unlikely(xid < next_xid))
++epoch;
newNextFullXid = FullTransactionIdFromEpochAndXid(epoch, xid);
/*
* We still need to take a lock to modify the value when there are
* concurrent readers.
*/
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextFullXid = newNextFullXid;
LWLockRelease(XidGenLock);
}
/*
......@@ -351,7 +397,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
ShmemVariableCache->xidStopLimit = xidStopLimit;
ShmemVariableCache->xidWrapLimit = xidWrapLimit;
ShmemVariableCache->oldestXidDB = oldest_datoid;
curXid = ShmemVariableCache->nextXid;
curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
LWLockRelease(XidGenLock);
/* Log the info */
......@@ -427,7 +473,7 @@ ForceTransactionIdLimitUpdate(void)
/* Locking is probably not really necessary, but let's be careful */
LWLockAcquire(XidGenLock, LW_SHARED);
nextXid = ShmemVariableCache->nextXid;
nextXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
xidVacLimit = ShmemVariableCache->xidVacLimit;
oldestXid = ShmemVariableCache->oldestXid;
oldestXidDB = ShmemVariableCache->oldestXidDB;
......
......@@ -5636,21 +5636,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*
* We don't expect anyone else to modify nextXid, hence we don't need to
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
/* Make sure nextFullXid is beyond any XID mentioned in the record. */
AdvanceNextFullTransactionIdPastXid(max_xid);
Assert(((parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == 0) ==
(origin_id == InvalidRepOriginId));
......@@ -5792,25 +5779,11 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
Assert(TransactionIdIsValid(xid));
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*
* We don't expect anyone else to modify nextXid, hence we don't need to
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
/* Make sure nextFullXid is beyond any XID mentioned in the record. */
max_xid = TransactionIdLatest(xid,
parsed->nsubxacts,
parsed->subxacts);
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
AdvanceNextFullTransactionIdPastXid(max_xid);
if (standbyState == STANDBY_DISABLED)
{
......
......@@ -590,8 +590,7 @@ typedef struct XLogCtlData
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
FullTransactionId ckptFullXid; /* nextFullXid of latest checkpoint */
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
......@@ -5115,8 +5114,8 @@ BootStrapXLOG(void)
checkPoint.ThisTimeLineID = ThisTimeLineID;
checkPoint.PrevTimeLineID = ThisTimeLineID;
checkPoint.fullPageWrites = fullPageWrites;
checkPoint.nextXidEpoch = 0;
checkPoint.nextXid = FirstNormalTransactionId;
checkPoint.nextFullXid =
FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId);
checkPoint.nextOid = FirstBootstrapObjectId;
checkPoint.nextMulti = FirstMultiXactId;
checkPoint.nextMultiOffset = 0;
......@@ -5129,7 +5128,7 @@ BootStrapXLOG(void)
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
......@@ -6557,8 +6556,8 @@ StartupXLOG(void)
(uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
wasShutdown ? "true" : "false")));
ereport(DEBUG1,
(errmsg_internal("next transaction ID: %u:%u; next OID: %u",
checkPoint.nextXidEpoch, checkPoint.nextXid,
(errmsg_internal("next transaction ID: " UINT64_FORMAT "; next OID: %u",
U64FromFullTransactionId(checkPoint.nextFullXid),
checkPoint.nextOid)));
ereport(DEBUG1,
(errmsg_internal("next MultiXactId: %u; next MultiXactOffset: %u",
......@@ -6573,12 +6572,12 @@ StartupXLOG(void)
(errmsg_internal("commit timestamp Xid oldest/newest: %u/%u",
checkPoint.oldestCommitTsXid,
checkPoint.newestCommitTsXid)));
if (!TransactionIdIsNormal(checkPoint.nextXid))
if (!TransactionIdIsNormal(XidFromFullTransactionId(checkPoint.nextFullXid)))
ereport(PANIC,
(errmsg("invalid next transaction ID")));
/* initialize shared memory variables from the checkpoint record */
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
......@@ -6587,8 +6586,7 @@ StartupXLOG(void)
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
SetCommitTsLimit(checkPoint.oldestCommitTsXid,
checkPoint.newestCommitTsXid);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
/*
* Initialize replication slots, before there's a chance to remove
......@@ -6859,7 +6857,7 @@ StartupXLOG(void)
Assert(TransactionIdIsValid(oldestActiveXID));
/* Tell procarray about the range of xids it has to deal with */
ProcArrayInitRecovery(ShmemVariableCache->nextXid);
ProcArrayInitRecovery(XidFromFullTransactionId(ShmemVariableCache->nextFullXid));
/*
* Startup commit log and subtrans only. MultiXact and commit
......@@ -6889,9 +6887,9 @@ StartupXLOG(void)
running.xcnt = nxids;
running.subxcnt = 0;
running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid;
running.nextXid = XidFromFullTransactionId(checkPoint.nextFullXid);
running.oldestRunningXid = oldestActiveXID;
latestCompletedXid = checkPoint.nextXid;
latestCompletedXid = XidFromFullTransactionId(checkPoint.nextFullXid);
TransactionIdRetreat(latestCompletedXid);
Assert(TransactionIdIsNormal(latestCompletedXid));
running.latestCompletedXid = latestCompletedXid;
......@@ -7061,20 +7059,10 @@ StartupXLOG(void)
error_context_stack = &errcallback;
/*
* ShmemVariableCache->nextXid must be beyond record's xid.
*
* We don't expect anyone else to modify nextXid, hence we
* don't need to hold a lock while examining it. We still
* acquire the lock to modify it, though.
* ShmemVariableCache->nextFullXid must be beyond record's
* xid.
*/
if (TransactionIdFollowsOrEquals(record->xl_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = record->xl_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
AdvanceNextFullTransactionIdPastXid(record->xl_xid);
/*
* Before replaying this record, check if this record causes
......@@ -7654,7 +7642,7 @@ StartupXLOG(void)
/* also initialize latestCompletedXid, to nextXid - 1 */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
ShmemVariableCache->latestCompletedXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
LWLockRelease(ProcArrayLock);
......@@ -8247,41 +8235,6 @@ GetLastSegSwitchData(XLogRecPtr *lastSwitchLSN)
return result;
}
/*
* GetNextXidAndEpoch - get the current nextXid value and associated epoch
*
* This is exported for use by code that would like to have 64-bit XIDs.
* We don't really support such things, but all XIDs within the system
* can be presumed "close to" the result, and thus the epoch associated
* with them can be determined.
*/
void
GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
{
uint32 ckptXidEpoch;
TransactionId ckptXid;
TransactionId nextXid;
/* Must read checkpoint info first, else have race condition */
SpinLockAcquire(&XLogCtl->info_lck);
ckptXidEpoch = XLogCtl->ckptXidEpoch;
ckptXid = XLogCtl->ckptXid;
SpinLockRelease(&XLogCtl->info_lck);
/* Now fetch current nextXid */
nextXid = ReadNewTransactionId();
/*
* nextXid is certainly logically later than ckptXid. So if it's
* numerically less, it must have wrapped into the next epoch.
*/
if (nextXid < ckptXid)
ckptXidEpoch++;
*xid = nextXid;
*epoch = ckptXidEpoch;
}
/*
* This must be called ONCE during postmaster or standalone-backend shutdown
*/
......@@ -8701,7 +8654,7 @@ CreateCheckPoint(int flags)
* there.
*/
LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid;
checkPoint.nextFullXid = ShmemVariableCache->nextFullXid;
checkPoint.oldestXid = ShmemVariableCache->oldestXid;
checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
LWLockRelease(XidGenLock);
......@@ -8711,11 +8664,6 @@ CreateCheckPoint(int flags)
checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
LWLockRelease(CommitTsLock);
/* Increase XID epoch if we've wrapped around since last checkpoint */
checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
checkPoint.nextXidEpoch++;
LWLockAcquire(OidGenLock, LW_SHARED);
checkPoint.nextOid = ShmemVariableCache->nextOid;
if (!shutdown)
......@@ -8859,8 +8807,7 @@ CreateCheckPoint(int flags)
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
SpinLockRelease(&XLogCtl->info_lck);
/*
......@@ -9622,7 +9569,7 @@ xlog_redo(XLogReaderState *record)
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
/* In a SHUTDOWN checkpoint, believe the counters exactly */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
LWLockRelease(XidGenLock);
LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextOid = checkPoint.nextOid;
......@@ -9676,9 +9623,9 @@ xlog_redo(XLogReaderState *record)
running.xcnt = nxids;
running.subxcnt = 0;
running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid;
running.nextXid = XidFromFullTransactionId(checkPoint.nextFullXid);
running.oldestRunningXid = oldestActiveXID;
latestCompletedXid = checkPoint.nextXid;
latestCompletedXid = XidFromFullTransactionId(checkPoint.nextFullXid);
TransactionIdRetreat(latestCompletedXid);
Assert(TransactionIdIsNormal(latestCompletedXid));
running.latestCompletedXid = latestCompletedXid;
......@@ -9690,13 +9637,11 @@ xlog_redo(XLogReaderState *record)
}
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
ControlFile->checkPointCopy.nextFullXid = checkPoint.nextFullXid;
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
SpinLockRelease(&XLogCtl->info_lck);
/*
......@@ -9717,9 +9662,9 @@ xlog_redo(XLogReaderState *record)
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
/* In an ONLINE checkpoint, treat the XID counter as a minimum */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
checkPoint.nextXid))
ShmemVariableCache->nextXid = checkPoint.nextXid;
if (FullTransactionIdPrecedes(ShmemVariableCache->nextFullXid,
checkPoint.nextFullXid))
ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
LWLockRelease(XidGenLock);
/*
......@@ -9749,13 +9694,11 @@ xlog_redo(XLogReaderState *record)
SetTransactionIdLimit(checkPoint.oldestXid,
checkPoint.oldestXidDB);
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
ControlFile->checkPointCopy.nextFullXid = checkPoint.nextFullXid;
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
SpinLockRelease(&XLogCtl->info_lck);
/* TLI should not change in an on-line checkpoint */
......
......@@ -1160,6 +1160,7 @@ static void
XLogWalRcvSendHSFeedback(bool immed)
{
TimestampTz now;
FullTransactionId nextFullXid;
TransactionId nextXid;
uint32 xmin_epoch,
catalog_xmin_epoch;
......@@ -1238,7 +1239,9 @@ XLogWalRcvSendHSFeedback(bool immed)
* Get epoch and adjust if nextXid and oldestXmin are different sides of
* the epoch boundary.
*/
GetNextXidAndEpoch(&nextXid, &xmin_epoch);
nextFullXid = ReadNextFullTransactionId();
nextXid = XidFromFullTransactionId(nextFullXid);
xmin_epoch = EpochFromFullTransactionId(nextFullXid);
catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin)
xmin_epoch--;
......
......@@ -1912,10 +1912,13 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
static bool
TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
{
FullTransactionId nextFullXid;
TransactionId nextXid;
uint32 nextEpoch;
GetNextXidAndEpoch(&nextXid, &nextEpoch);
nextFullXid = ReadNextFullTransactionId();
nextXid = XidFromFullTransactionId(nextFullXid);
nextEpoch = EpochFromFullTransactionId(nextFullXid);
if (xid <= nextXid)
{
......
......@@ -664,7 +664,6 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
TransactionId *xids;
int nxids;
TransactionId nextXid;
int i;
Assert(standbyState >= STANDBY_INITIALIZED);
......@@ -881,23 +880,10 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
LWLockRelease(ProcArrayLock);
/*
* ShmemVariableCache->nextXid must be beyond any observed xid.
*
* We don't expect anyone else to modify nextXid, hence we don't need to
* hold a lock while examining it. We still acquire the lock to modify
* it, though.
*/
nextXid = latestObservedXid;
TransactionIdAdvance(nextXid);
if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = nextXid;
LWLockRelease(XidGenLock);
}
/* ShmemVariableCache->nextFullXid must be beyond any observed xid. */
AdvanceNextFullTransactionIdPastXid(latestObservedXid);
Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
Assert(FullTransactionIdIsValid(ShmemVariableCache->nextFullXid));
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
if (standbyState == STANDBY_SNAPSHOT_READY)
......@@ -2001,7 +1987,7 @@ GetRunningTransactionData(void)
latestCompletedXid = ShmemVariableCache->latestCompletedXid;
oldestRunningXid = ShmemVariableCache->nextXid;
oldestRunningXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
/*
* Spin over procArray collecting all xids
......@@ -2093,7 +2079,7 @@ GetRunningTransactionData(void)
CurrentRunningXacts->xcnt = count - subcount;
CurrentRunningXacts->subxcnt = subcount;
CurrentRunningXacts->subxid_overflow = suboverflowed;
CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
CurrentRunningXacts->nextXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
......@@ -2138,7 +2124,7 @@ GetOldestActiveTransactionId(void)
* have already completed), when we spin over it.
*/
LWLockAcquire(XidGenLock, LW_SHARED);
oldestRunningXid = ShmemVariableCache->nextXid;
oldestRunningXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
LWLockRelease(XidGenLock);
/*
......@@ -2206,7 +2192,7 @@ GetOldestSafeDecodingTransactionId(bool catalogOnly)
* a safe, albeit pessimal, value.
*/
LWLockAcquire(XidGenLock, LW_SHARED);
oldestSafeXid = ShmemVariableCache->nextXid;
oldestSafeXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
/*
* If there's already a slot pegging the xmin horizon, we can start with
......@@ -3266,12 +3252,10 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
*/
latestObservedXid = xid;
/* ShmemVariableCache->nextXid must be beyond any observed xid */
/* ShmemVariableCache->nextFullXid must be beyond any observed xid */
AdvanceNextFullTransactionIdPastXid(latestObservedXid);
next_expected_xid = latestObservedXid;
TransactionIdAdvance(next_expected_xid);
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = next_expected_xid;
LWLockRelease(XidGenLock);
}
}
......
......@@ -867,7 +867,7 @@ standby_redo(XLogReaderState *record)
* up from a checkpoint and are immediately at our starting point, we
* unconditionally move to STANDBY_INITIALIZED. After this point we
* must do 4 things:
* * move shared nextXid forwards as we see new xids
* * move shared nextFullXid forwards as we see new xids
* * extend the clog and subtrans with each new xid
* * keep track of uncommitted known assigned xids
* * keep track of uncommitted AccessExclusiveLocks
......
......@@ -3410,7 +3410,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* transaction to complete before freeing some RAM; correctness of visible
* behavior is not affected.
*/
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
MySerializableXact->finishedBefore = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
/*
* If it's not a commit it's either a rollback or a read-only transaction
......
......@@ -91,7 +91,10 @@ typedef struct
static void
load_xid_epoch(TxidEpoch *state)
{
GetNextXidAndEpoch(&state->last_xid, &state->epoch);
FullTransactionId fullXid = ReadNextFullTransactionId();
state->last_xid = XidFromFullTransactionId(fullXid);
state->epoch = EpochFromFullTransactionId(fullXid);
}
/*
......@@ -114,8 +117,11 @@ TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
TransactionId xid = (TransactionId) xid_with_epoch;
uint32 now_epoch;
TransactionId now_epoch_next_xid;
FullTransactionId now_fullxid;
GetNextXidAndEpoch(&now_epoch_next_xid, &now_epoch);
now_fullxid = ReadNextFullTransactionId();
now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
now_epoch = EpochFromFullTransactionId(now_fullxid);
if (extracted_xid != NULL)
*extracted_xid = xid;
......@@ -128,8 +134,7 @@ TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
return true;
/* If the transaction ID is in the future, throw an error. */
if (xid_epoch > now_epoch
|| (xid_epoch == now_epoch && xid >= now_epoch_next_xid))
if (xid_with_epoch >= U64FromFullTransactionId(now_fullxid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("transaction ID %s is in the future",
......
......@@ -16,6 +16,7 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlog.h"
#include "catalog/pg_control.h"
......@@ -164,8 +165,8 @@ pg_control_checkpoint(PG_FUNCTION_ARGS)
nulls[5] = false;
values[6] = CStringGetTextDatum(psprintf("%u:%u",
ControlFile->checkPointCopy.nextXidEpoch,
ControlFile->checkPointCopy.nextXid));
EpochFromFullTransactionId(ControlFile->checkPointCopy.nextFullXid),
XidFromFullTransactionId(ControlFile->checkPointCopy.nextFullXid)));
nulls[6] = false;
values[7] = ObjectIdGetDatum(ControlFile->checkPointCopy.nextOid);
......
......@@ -20,6 +20,7 @@
#include <time.h>
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "catalog/pg_control.h"
......@@ -256,8 +257,8 @@ main(int argc, char *argv[])
printf(_("Latest checkpoint's full_page_writes: %s\n"),
ControlFile->checkPointCopy.fullPageWrites ? _("on") : _("off"));
printf(_("Latest checkpoint's NextXID: %u:%u\n"),
ControlFile->checkPointCopy.nextXidEpoch,
ControlFile->checkPointCopy.nextXid);
EpochFromFullTransactionId(ControlFile->checkPointCopy.nextFullXid),
XidFromFullTransactionId(ControlFile->checkPointCopy.nextFullXid));
printf(_("Latest checkpoint's NextOID: %u\n"),
ControlFile->checkPointCopy.nextOid);
printf(_("Latest checkpoint's NextMultiXactId: %u\n"),
......
......@@ -430,11 +430,15 @@ main(int argc, char *argv[])
* if any, includes these values.)
*/
if (set_xid_epoch != -1)
ControlFile.checkPointCopy.nextXidEpoch = set_xid_epoch;
ControlFile.checkPointCopy.nextFullXid =
FullTransactionIdFromEpochAndXid(set_xid_epoch,
XidFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid));
if (set_xid != 0)
{
ControlFile.checkPointCopy.nextXid = set_xid;
ControlFile.checkPointCopy.nextFullXid =
FullTransactionIdFromEpochAndXid(EpochFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid),
set_xid);
/*
* For the moment, just set oldestXid to a value that will force
......@@ -704,8 +708,8 @@ GuessControlValues(void)
ControlFile.checkPointCopy.ThisTimeLineID = 1;
ControlFile.checkPointCopy.PrevTimeLineID = 1;
ControlFile.checkPointCopy.fullPageWrites = false;
ControlFile.checkPointCopy.nextXidEpoch = 0;
ControlFile.checkPointCopy.nextXid = FirstNormalTransactionId;
ControlFile.checkPointCopy.nextFullXid =
FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId);
ControlFile.checkPointCopy.nextOid = FirstBootstrapObjectId;
ControlFile.checkPointCopy.nextMulti = FirstMultiXactId;
ControlFile.checkPointCopy.nextMultiOffset = 0;
......@@ -786,8 +790,8 @@ PrintControlValues(bool guessed)
printf(_("Latest checkpoint's full_page_writes: %s\n"),
ControlFile.checkPointCopy.fullPageWrites ? _("on") : _("off"));
printf(_("Latest checkpoint's NextXID: %u:%u\n"),
ControlFile.checkPointCopy.nextXidEpoch,
ControlFile.checkPointCopy.nextXid);
EpochFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid),
XidFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid));
printf(_("Latest checkpoint's NextOID: %u\n"),
ControlFile.checkPointCopy.nextOid);
printf(_("Latest checkpoint's NextMultiXactId: %u\n"),
......@@ -879,7 +883,7 @@ PrintNewControlValues(void)
if (set_xid != 0)
{
printf(_("NextXID: %u\n"),
ControlFile.checkPointCopy.nextXid);
XidFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid));
printf(_("OldestXID: %u\n"),
ControlFile.checkPointCopy.oldestXid);
printf(_("OldestXID's DB: %u\n"),
......@@ -889,7 +893,7 @@ PrintNewControlValues(void)
if (set_xid_epoch != -1)
{
printf(_("NextXID epoch: %u\n"),
ControlFile.checkPointCopy.nextXidEpoch);
EpochFromFullTransactionId(ControlFile.checkPointCopy.nextFullXid));
}
if (set_oldest_commit_ts_xid != 0)
......
......@@ -44,6 +44,32 @@
#define TransactionIdStore(xid, dest) (*(dest) = (xid))
#define StoreInvalidTransactionId(dest) (*(dest) = InvalidTransactionId)
#define EpochFromFullTransactionId(x) ((uint32) ((x).value >> 32))
#define XidFromFullTransactionId(x) ((uint32) (x).value)
#define U64FromFullTransactionId(x) ((x).value)
#define FullTransactionIdPrecedes(a, b) ((a).value < (b).value)
#define FullTransactionIdIsValid(x) TransactionIdIsValid(XidFromFullTransactionId(x))
/*
* A 64 bit value that contains an epoch and a TransactionId. This is
* wrapped in a struct to prevent implicit conversion to/from TransactionId.
* Not all values represent valid normal XIDs.
*/
typedef struct FullTransactionId
{
uint64 value;
} FullTransactionId;
static inline FullTransactionId
FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
{
FullTransactionId result;
result.value = ((uint64) epoch) << 32 | xid;
return result;
}
/* advance a transaction ID variable, handling wraparound correctly */
#define TransactionIdAdvance(dest) \
do { \
......@@ -52,6 +78,15 @@
(dest) = FirstNormalTransactionId; \
} while(0)
/* advance a FullTransactionId variable, stepping over special XIDs */
static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
dest->value++;
while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
dest->value++;
}
/* back up a transaction ID variable, handling wraparound correctly */
#define TransactionIdRetreat(dest) \
do { \
......@@ -125,12 +160,12 @@ typedef struct VariableCacheData
/*
* These fields are protected by XidGenLock.
*/
TransactionId nextXid; /* next XID to assign */
FullTransactionId nextFullXid; /* next full XID to assign */
TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */
TransactionId xidVacLimit; /* start forcing autovacuums here */
TransactionId xidWarnLimit; /* start complaining here */
TransactionId xidStopLimit; /* refuse to advance nextXid beyond here */
TransactionId xidStopLimit; /* refuse to advance nextFullXid beyond here */
TransactionId xidWrapLimit; /* where the world ends */
Oid oldestXidDB; /* database with minimum datfrozenxid */
......@@ -187,11 +222,21 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
/* in transam/varsup.c */
extern TransactionId GetNewTransactionId(bool isSubXact);
extern TransactionId ReadNewTransactionId(void);
extern void AdvanceNextFullTransactionIdPastXid(TransactionId xid);
extern FullTransactionId ReadNextFullTransactionId(void);
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
Oid oldest_datoid);
extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
extern bool ForceTransactionIdLimitUpdate(void);
extern Oid GetNewObjectId(void);
/*
* For callers that just need the XID part of the next transaction ID.
*/
static inline TransactionId
ReadNewTransactionId(void)
{
return XidFromFullTransactionId(ReadNextFullTransactionId());
}
#endif /* TRAMSAM_H */
......@@ -310,7 +310,6 @@ extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
extern XLogRecPtr GetLastImportantRecPtr(void);
extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
extern void RemovePromoteSignalFiles(void);
extern bool CheckPromoteSignal(void);
......
......@@ -15,13 +15,14 @@
#ifndef PG_CONTROL_H
#define PG_CONTROL_H
#include "access/transam.h"
#include "access/xlogdefs.h"
#include "pgtime.h" /* for pg_time_t */
#include "port/pg_crc32c.h"
/* Version identifier for this pg_control format */
#define PG_CONTROL_VERSION 1200
#define PG_CONTROL_VERSION 1201
/* Nonce key length, see below */
#define MOCK_AUTH_NONCE_LEN 32
......@@ -39,8 +40,7 @@ typedef struct CheckPoint
TimeLineID PrevTimeLineID; /* previous TLI, if this record begins a new
* timeline (equals ThisTimeLineID otherwise) */
bool fullPageWrites; /* current full_page_writes */
uint32 nextXidEpoch; /* higher-order bits of nextXid */
TransactionId nextXid; /* next free XID */
FullTransactionId nextFullXid; /* next free full transaction ID */
Oid nextOid; /* next free OID */
MultiXactId nextMulti; /* next free MultiXactId */
MultiXactOffset nextMultiOffset; /* next free MultiXact offset */
......
......@@ -72,7 +72,7 @@ typedef struct RunningTransactionsData
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId nextXid; /* xid from ShmemVariableCache->nextFullXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */
TransactionId latestCompletedXid; /* so we can set xmax */
......
......@@ -49,7 +49,7 @@ typedef struct xl_running_xacts
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId nextXid; /* xid from ShmemVariableCache->nextFullXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */
TransactionId latestCompletedXid; /* so we can set xmax */
......
......@@ -796,6 +796,7 @@ FreePageManager
FreePageSpanLeader
FromCharDateMode
FromExpr
FullTransactionId
FuncCall
FuncCallContext
FuncCandidateList
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment