Commit 8431e296 authored by Simon Riggs's avatar Simon Riggs

Cleanup initialization of Hot Standby. Clarify working with reanalysis

of requirements and documentation on LogStandbySnapshot(). Fixes
two minor bugs reported by Tom Lane that would lead to an incorrect
snapshot after transaction wraparound. Also fix two other problems
discovered that would give incorrect snapshots in certain cases.
ProcArrayApplyRecoveryInfo() substantially rewritten. Some minor
refactoring of xact_redo_apply() and ExpireTreeKnownAssignedTransactionIds().
parent c2e7f78a
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.289 2010/02/26 02:00:34 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.290 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -4378,7 +4378,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) ...@@ -4378,7 +4378,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
LWLockRelease(XidGenLock); LWLockRelease(XidGenLock);
} }
if (!InHotStandby) if (standbyState == STANDBY_DISABLED)
{ {
/* /*
* Mark the transaction committed in pg_clog. * Mark the transaction committed in pg_clog.
...@@ -4412,12 +4412,12 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) ...@@ -4412,12 +4412,12 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
/* /*
* We must mark clog before we update the ProcArray. * We must mark clog before we update the ProcArray.
*/ */
ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids); ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
/* /*
* Send any cache invalidations attached to the commit. We must * Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as * maintain the same order of invalidation then release locks as
* occurs in . * occurs in CommitTransaction().
*/ */
ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
XactCompletionRelcacheInitFileInval(xlrec), XactCompletionRelcacheInitFileInval(xlrec),
...@@ -4499,7 +4499,12 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) ...@@ -4499,7 +4499,12 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
LWLockRelease(XidGenLock); LWLockRelease(XidGenLock);
} }
if (InHotStandby) if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
}
else
{ {
/* /*
* If a transaction completion record arrives that has as-yet * If a transaction completion record arrives that has as-yet
...@@ -4511,17 +4516,14 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) ...@@ -4511,17 +4516,14 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
* already. Leave it in. * already. Leave it in.
*/ */
RecordKnownAssignedTransactionIds(max_xid); RecordKnownAssignedTransactionIds(max_xid);
}
/* Mark the transaction aborted in pg_clog, no need for async stuff */ /* Mark the transaction aborted in pg_clog, no need for async stuff */
TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
if (InHotStandby)
{
/* /*
* We must mark clog before we update the ProcArray. * We must update the ProcArray after we have marked clog.
*/ */
ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids); ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
/* /*
* There are no flat files that need updating, nor invalidation * There are no flat files that need updating, nor invalidation
...@@ -4596,7 +4598,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -4596,7 +4598,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
{ {
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
if (InHotStandby) if (standbyState >= STANDBY_INITIALIZED)
ProcArrayApplyXidAssignment(xlrec->xtop, ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub); xlrec->nsubxacts, xlrec->xsub);
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.409 2010/05/03 11:17:52 heikki Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.410 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -5995,6 +5995,7 @@ StartupXLOG(void) ...@@ -5995,6 +5995,7 @@ StartupXLOG(void)
if (wasShutdown) if (wasShutdown)
{ {
RunningTransactionsData running; RunningTransactionsData running;
TransactionId latestCompletedXid;
/* /*
* Construct a RunningTransactions snapshot representing a shut * Construct a RunningTransactions snapshot representing a shut
...@@ -6006,6 +6007,9 @@ StartupXLOG(void) ...@@ -6006,6 +6007,9 @@ StartupXLOG(void)
running.subxid_overflow = false; running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid; running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID; running.oldestRunningXid = oldestActiveXID;
latestCompletedXid = checkPoint.nextXid;
TransactionIdRetreat(latestCompletedXid);
running.latestCompletedXid = latestCompletedXid;
running.xids = xids; running.xids = xids;
ProcArrayApplyRecoveryInfo(&running); ProcArrayApplyRecoveryInfo(&running);
...@@ -6154,8 +6158,9 @@ StartupXLOG(void) ...@@ -6154,8 +6158,9 @@ StartupXLOG(void)
xlogctl->recoveryLastXTime = recoveryLastXTime; xlogctl->recoveryLastXTime = recoveryLastXTime;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
/* In Hot Standby mode, keep track of XIDs we've seen */ /* If we are attempting to enter Hot Standby mode, process XIDs we see */
if (InHotStandby && TransactionIdIsValid(record->xl_xid)) if (standbyState >= STANDBY_INITIALIZED &&
TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid); RecordKnownAssignedTransactionIds(record->xl_xid);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
...@@ -7803,6 +7808,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -7803,6 +7808,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
TransactionId *xids; TransactionId *xids;
int nxids; int nxids;
TransactionId oldestActiveXID; TransactionId oldestActiveXID;
TransactionId latestCompletedXid;
RunningTransactionsData running; RunningTransactionsData running;
oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
...@@ -7817,6 +7823,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -7817,6 +7823,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
running.subxid_overflow = false; running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid; running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID; running.oldestRunningXid = oldestActiveXID;
latestCompletedXid = checkPoint.nextXid;
TransactionIdRetreat(latestCompletedXid);
running.latestCompletedXid = latestCompletedXid;
running.xids = xids; running.xids = xids;
ProcArrayApplyRecoveryInfo(&running); ProcArrayApplyRecoveryInfo(&running);
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.68 2010/04/29 21:36:19 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.69 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -105,12 +105,6 @@ static TransactionId latestObservedXid = InvalidTransactionId; ...@@ -105,12 +105,6 @@ static TransactionId latestObservedXid = InvalidTransactionId;
*/ */
static TransactionId standbySnapshotPendingXmin; static TransactionId standbySnapshotPendingXmin;
/*
* Oldest transaction still running according to the running-xacts snapshot
* we initialized standby mode from.
*/
static TransactionId snapshotOldestActiveXid;
#ifdef XIDCACHE_DEBUG #ifdef XIDCACHE_DEBUG
/* counters for XidCache measurement */ /* counters for XidCache measurement */
...@@ -158,7 +152,7 @@ static void KnownAssignedXidsRemove(TransactionId xid); ...@@ -158,7 +152,7 @@ static void KnownAssignedXidsRemove(TransactionId xid);
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
TransactionId *subxids); TransactionId *subxids);
static void KnownAssignedXidsRemovePreceding(TransactionId xid); static void KnownAssignedXidsRemovePreceding(TransactionId xid);
static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
TransactionId *xmin, TransactionId *xmin,
TransactionId xmax); TransactionId xmax);
...@@ -439,10 +433,17 @@ ProcArrayClearTransaction(PGPROC *proc) ...@@ -439,10 +433,17 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->subxids.overflowed = false; proc->subxids.overflowed = false;
} }
/*
* ProcArrayInitRecoveryInfo
*
* When trying to assemble our snapshot we only care about xids after this value.
* See comments for LogStandbySnapshot().
*/
void void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{ {
snapshotOldestActiveXid = oldestActiveXid; latestObservedXid = oldestActiveXid;
TransactionIdRetreat(latestObservedXid);
} }
/* /*
...@@ -458,16 +459,15 @@ ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) ...@@ -458,16 +459,15 @@ ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
* with FATAL errors fail to write abort records, which could cause eventual * with FATAL errors fail to write abort records, which could cause eventual
* overflow. * overflow.
* *
* Only used during recovery. Notice the signature is very similar to a * See comments for LogStandbySnapshot().
* _redo function and its difficult to decide exactly where this code should
* reside.
*/ */
void void
ProcArrayApplyRecoveryInfo(RunningTransactions running) ProcArrayApplyRecoveryInfo(RunningTransactions running)
{ {
int xid_index; /* main loop */
TransactionId *xids; TransactionId *xids;
int nxids; int nxids;
TransactionId nextXid;
int i;
Assert(standbyState >= STANDBY_INITIALIZED); Assert(standbyState >= STANDBY_INITIALIZED);
...@@ -505,41 +505,40 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) ...@@ -505,41 +505,40 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
elog(trace_recovery(DEBUG2), elog(trace_recovery(DEBUG2),
"recovery snapshots are now enabled"); "recovery snapshots are now enabled");
} }
else
elog(trace_recovery(DEBUG2),
"recovery snapshot waiting for %u oldest active xid on standby is %u",
standbySnapshotPendingXmin,
running->oldestRunningXid);
return; return;
} }
Assert(standbyState == STANDBY_INITIALIZED);
/* /*
* OK, we need to initialise from the RunningXactData record * OK, we need to initialise from the RunningXactData record
*/ */
latestObservedXid = running->nextXid;
TransactionIdRetreat(latestObservedXid);
/* /*
* If the snapshot overflowed, then we still initialise with what we know, * Remove all xids except xids later than the snapshot. We don't know
* but the recovery snapshot isn't fully valid yet because we know there * exactly which ones that is until precisely now, so that is why we
* are some subxids missing (ergo we don't know which ones) * allow xids to be added only to remove most of them again here.
*/ */
if (!running->subxid_overflow) ExpireOldKnownAssignedTransactionIds(running->nextXid);
{ StandbyReleaseOldLocks(running->nextXid);
standbyState = STANDBY_SNAPSHOT_READY;
standbySnapshotPendingXmin = InvalidTransactionId;
}
else
{
standbyState = STANDBY_SNAPSHOT_PENDING;
standbySnapshotPendingXmin = latestObservedXid;
ereport(LOG,
(errmsg("consistent state delayed because recovery snapshot incomplete")));
}
nxids = running->xcnt; /*
xids = running->xids; * Nobody else is running yet, but take locks anyhow
*/
KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
/* /*
* Scan through the incoming array of RunningXacts and collect xids. We * Combine the running xact data with already known xids, if any exist.
* don't use SubtransSetParent because it doesn't matter yet. If we aren't * KnownAssignedXids is sorted so we cannot just add new xids, we have
* to combine them first, sort them and then re-add to KnownAssignedXids.
*
* Some of the new xids are top-level xids and some are subtransactions. We
* don't call SubtransSetParent because it doesn't matter yet. If we aren't
* overflowed then all xids will fit in snapshot and so we don't need * overflowed then all xids will fit in snapshot and so we don't need
* subtrans. If we later overflow, an xid assignment record will add xids * subtrans. If we later overflow, an xid assignment record will add xids
* to subtrans. If RunningXacts is overflowed then we don't have enough * to subtrans. If RunningXacts is overflowed then we don't have enough
...@@ -547,59 +546,148 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) ...@@ -547,59 +546,148 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
*/ */
/* /*
* Nobody else is running yet, but take locks anyhow * Allocate a temporary array so we can combine xids. The total
* of both arrays should never normally exceed TOTAL_MAX_CACHED_SUBXIDS.
*/ */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
/*
* Get the remaining KnownAssignedXids. In most cases there won't
* be any at all since this exists only to catch a theoretical
* race condition.
*/
nxids = KnownAssignedXidsGet(xids, InvalidTransactionId);
if (nxids > 0)
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
/* Reset latestCompletedXid */ /*
ShmemVariableCache->latestCompletedXid = running->nextXid; * Now we have a copy of any KnownAssignedXids we can zero the
TransactionIdRetreat(ShmemVariableCache->latestCompletedXid); * array before we re-insertion of combined snapshot.
*/
KnownAssignedXidsRemovePreceding(InvalidTransactionId);
/* /*
* Add our new xids into the array * Add to the temp array any xids which have not already completed,
* taking care not to overflow in extreme cases.
*/ */
for (xid_index = 0; xid_index < running->xcnt; xid_index++) for (i = 0; i < running->xcnt; i++)
{ {
TransactionId xid = running->xids[xid_index]; TransactionId xid = running->xids[i];
/* /*
* The running-xacts snapshot can contain xids that did finish between * The running-xacts snapshot can contain xids that were running at
* when the snapshot was taken and when it was written to WAL. Such * the time of the snapshot, yet complete before the snapshot was
* transactions are not running anymore, so ignore them. * written to WAL. They're running now, so ignore them.
*/ */
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
continue; continue;
KnownAssignedXidsAdd(xid, xid, true); xids[nxids++] = xid;
/*
* Test for overflow only after we have filtered out already complete
* transactions.
*/
if (nxids > TOTAL_MAX_CACHED_SUBXIDS)
elog(ERROR, "too many xids to add into KnownAssignedXids");
} }
KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); if (nxids > 0)
{
/*
* Sort the array so that we can add them safely into KnownAssignedXids.
*/
qsort(xids, nxids, sizeof(TransactionId), xidComparator);
/*
* Re-initialise latestObservedXid to the highest xid we've seen.
*/
latestObservedXid = xids[nxids - 1];
/*
* Add the sorted snapshot into KnownAssignedXids
*/
for (i = 0; i < nxids; i++)
{
TransactionId xid = xids[i];
KnownAssignedXidsAdd(xid, xid, true);
}
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
}
pfree(xids);
/* /*
* Update lastOverflowedXid if the snapshot has any missing subxids. * Now we've got the running xids we need to set the global values
* thare used to track snapshots as they evolve further
*
* * latestCompletedXid which will be the xmax for snapshots
* * lastOverflowedXid which shows whether snapshots overflow
* * nextXid
*
* If the snapshot overflowed, then we still initialise with what we know,
* but the recovery snapshot isn't fully valid yet because we know there
* are some subxids missing.
* We don't know the specific subxids that are missing, so conservatively * We don't know the specific subxids that are missing, so conservatively
* assume the last one is latestObservedXid. If no missing subxids, * assume the last one is latestObservedXid. If no missing subxids,
* try to clear lastOverflowedXid. * try to clear lastOverflowedXid.
*
* If the snapshot didn't overflow it's still possible that an overflow
* occurred in the gap between taking snapshot and logging record, so
* we also need to check if lastOverflowedXid is already ahead of us.
*/ */
if (running->subxid_overflow) if (running->subxid_overflow)
{ {
standbyState = STANDBY_SNAPSHOT_PENDING;
standbySnapshotPendingXmin = latestObservedXid;
if (TransactionIdFollows(latestObservedXid, if (TransactionIdFollows(latestObservedXid,
procArray->lastOverflowedXid)) procArray->lastOverflowedXid))
procArray->lastOverflowedXid = latestObservedXid; procArray->lastOverflowedXid = latestObservedXid;
} }
else if (TransactionIdFollows(running->oldestRunningXid, else if (TransactionIdFollows(procArray->lastOverflowedXid,
latestObservedXid))
{
standbyState = STANDBY_SNAPSHOT_PENDING;
standbySnapshotPendingXmin = procArray->lastOverflowedXid;
}
else
{
standbyState = STANDBY_SNAPSHOT_READY;
standbySnapshotPendingXmin = InvalidTransactionId;
if (TransactionIdFollows(running->oldestRunningXid,
procArray->lastOverflowedXid)) procArray->lastOverflowedXid))
procArray->lastOverflowedXid = InvalidTransactionId; procArray->lastOverflowedXid = InvalidTransactionId;
}
/*
* If a transaction completed in the gap between taking and logging the
* snapshot then latestCompletedXid may already be higher than the value
* from the snapshot, so check before we use the incoming value.
*/
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
running->latestCompletedXid))
ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
/* nextXid must be beyond any observed xid */ /* nextXid must be beyond any observed xid */
if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid)) nextXid = latestObservedXid;
ShmemVariableCache->nextXid = running->nextXid; TransactionIdAdvance(nextXid);
if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
ShmemVariableCache->nextXid = nextXid;
LWLockRelease(ProcArrayLock); LWLockRelease(ProcArrayLock);
elog(trace_recovery(DEBUG2), "running transaction data initialized"); elog(trace_recovery(DEBUG2), "running transaction data initialized");
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
if (standbyState == STANDBY_SNAPSHOT_READY) if (standbyState == STANDBY_SNAPSHOT_READY)
elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled"); elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
else
ereport(LOG,
(errmsg("consistent state delayed because recovery snapshot incomplete")));
} }
/* /*
...@@ -613,8 +701,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid, ...@@ -613,8 +701,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
TransactionId max_xid; TransactionId max_xid;
int i; int i;
if (standbyState < STANDBY_SNAPSHOT_PENDING) Assert(standbyState >= STANDBY_INITIALIZED);
return;
max_xid = TransactionIdLatest(topxid, nsubxids, subxids); max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
...@@ -1410,6 +1497,7 @@ GetRunningTransactionData(void) ...@@ -1410,6 +1497,7 @@ GetRunningTransactionData(void)
CurrentRunningXacts->subxid_overflow = suboverflowed; CurrentRunningXacts->subxid_overflow = suboverflowed;
CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid; CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
LWLockRelease(XidGenLock); LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock); LWLockRelease(ProcArrayLock);
...@@ -2219,35 +2307,16 @@ DisplayXidCache(void) ...@@ -2219,35 +2307,16 @@ DisplayXidCache(void)
* *
* RecordKnownAssignedTransactionIds() should be run for *every* WAL record * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
* type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first * type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first
* snapshot so that RecordKnownAssignedTransactionIds() can be called). * snapshot so that RecordKnownAssignedTransactionIds() can be called). Must
* be called for each record after we have executed StartupCLog() et al,
* since we must ExtendCLOG() etc..
* *
* Must only be called in Startup process. * Called during recovery in analogy with and in place of GetNewTransactionId()
*/ */
void void
RecordKnownAssignedTransactionIds(TransactionId xid) RecordKnownAssignedTransactionIds(TransactionId xid)
{ {
/* Assert(standbyState >= STANDBY_INITIALIZED);
* Skip processing if the current snapshot is not initialized.
*/
if (standbyState < STANDBY_SNAPSHOT_PENDING)
return;
/*
* We can see WAL records before the running-xacts snapshot that contain
* XIDs that are not in the running-xacts snapshot, but that we know to
* have finished before the running-xacts snapshot was taken. Don't waste
* precious shared memory by keeping them in the hash table.
*
* We can also see WAL records before the running-xacts snapshot that
* contain XIDs that are not in the running-xacts snapshot for a different
* reason: the transaction started *after* the running-xacts snapshot was
* taken, but before it was written to WAL. We must be careful to not
* ignore such XIDs. Because such a transaction started after the
* running-xacts snapshot was taken, it must have an XID larger than the
* oldest XID according to the running-xacts snapshot.
*/
if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
return;
elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u", elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
xid, latestObservedXid); xid, latestObservedXid);
...@@ -2287,31 +2356,25 @@ RecordKnownAssignedTransactionIds(TransactionId xid) ...@@ -2287,31 +2356,25 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
* Now we can advance latestObservedXid * Now we can advance latestObservedXid
*/ */
latestObservedXid = xid; latestObservedXid = xid;
}
/* nextXid must be beyond any observed xid */ /* ShmemVariableCache->nextXid must be beyond any observed xid */
if (TransactionIdFollowsOrEquals(latestObservedXid, next_expected_xid = latestObservedXid;
ShmemVariableCache->nextXid)) TransactionIdAdvance(next_expected_xid);
{ ShmemVariableCache->nextXid = next_expected_xid;
ShmemVariableCache->nextXid = latestObservedXid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
} }
} }
/* /*
* ExpireTreeKnownAssignedTransactionIds * ExpireTreeKnownAssignedTransactionIds
* Remove the given XIDs from KnownAssignedXids. * Remove the given XIDs from KnownAssignedXids.
*
* Called during recovery in analogy with and in place of ProcArrayEndTransaction()
*/ */
void void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
TransactionId *subxids) TransactionId *subxids, TransactionId max_xid)
{ {
TransactionId max_xid; Assert(standbyState >= STANDBY_INITIALIZED);
if (standbyState == STANDBY_DISABLED)
return; /* nothing to do */
max_xid = TransactionIdLatest(xid, nsubxids, subxids);
/* /*
* Uses same locking as transaction commit * Uses same locking as transaction commit
...@@ -2882,8 +2945,6 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, ...@@ -2882,8 +2945,6 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
int head, tail; int head, tail;
int i; int i;
Assert(TransactionIdIsValid(xmax));
/* /*
* Fetch head just once, since it may change while we loop. * Fetch head just once, since it may change while we loop.
* We can stop once we reach the initially seen head, since * We can stop once we reach the initially seen head, since
...@@ -2894,8 +2955,8 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, ...@@ -2894,8 +2955,8 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
* Must take spinlock to ensure we see up-to-date array contents. * Must take spinlock to ensure we see up-to-date array contents.
*/ */
SpinLockAcquire(&pArray->known_assigned_xids_lck); SpinLockAcquire(&pArray->known_assigned_xids_lck);
head = pArray->tailKnownAssignedXids; tail = pArray->tailKnownAssignedXids;
tail = pArray->headKnownAssignedXids; head = pArray->headKnownAssignedXids;
SpinLockRelease(&pArray->known_assigned_xids_lck); SpinLockRelease(&pArray->known_assigned_xids_lck);
for (i = tail; i < head; i++) for (i = tail; i < head; i++)
...@@ -2917,7 +2978,8 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, ...@@ -2917,7 +2978,8 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
* Filter out anything >= xmax, again relying on sorted property * Filter out anything >= xmax, again relying on sorted property
* of array. * of array.
*/ */
if (TransactionIdPrecedesOrEquals(xmax, knownXid)) if (TransactionIdIsValid(xmax) &&
TransactionIdFollowsOrEquals(knownXid, xmax))
break; break;
/* Add knownXid into output array */ /* Add knownXid into output array */
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.21 2010/05/02 02:10:33 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.22 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -776,6 +776,51 @@ standby_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -776,6 +776,51 @@ standby_desc(StringInfo buf, uint8 xl_info, char *rec)
/* /*
* Log details of the current snapshot to WAL. This allows the snapshot state * Log details of the current snapshot to WAL. This allows the snapshot state
* to be reconstructed on the standby. * to be reconstructed on the standby.
*
* We can move directly to STANDBY_SNAPSHOT_READY at startup if we
* start from a shutdown checkpoint because we know nothing was running
* at that time and our recovery snapshot is known empty. In the more
* typical case of an online checkpoint we need to jump through a few
* hoops to get a correct recovery snapshot and this requires a two or
* sometimes a three stage process.
*
* The initial snapshot must contain all running xids and all current
* AccessExclusiveLocks at a point in time on the standby. Assembling
* that information while the server is running requires many and
* various LWLocks, so we choose to derive that information piece by
* piece and then re-assemble that info on the standby. When that
* information is fully assembled we move to STANDBY_SNAPSHOT_READY.
*
* Since locking on the primary when we derive the information is not
* strict, we note that there is a time window between the derivation and
* writing to WAL of the derived information. That allows race conditions
* that we must resolve, since xids and locks may enter or leave the
* snapshot during that window. This creates the issue that an xid or
* lock may start *after* the snapshot has been derived yet *before* the
* snapshot is logged in the running xacts WAL record. We resolve this by
* starting to accumulate changes at a point just prior to when we derive
* the snapshot on the primary, then ignore duplicates when we later apply
* the snapshot from the running xacts record. This is implemented during
* CreateCheckpoint() where we use the logical checkpoint location as
* our starting point and then write the running xacts record immediately
* before writing the main checkpoint WAL record. Since we always start
* up from a checkpoint and are immediately at our starting point, we
* unconditionally move to STANDBY_INITIALIZED. After this point we
* must do 4 things:
* * move shared nextXid forwards as we see new xids
* * extend the clog and subtrans with each new xid
* * keep track of uncommitted known assigned xids
* * keep track of uncommitted AccessExclusiveLocks
*
* When we see a commit/abort we must remove known assigned xids and locks
* from the completing transaction. Attempted removals that cannot locate
* an entry are expected and must not cause an error when we are in state
* STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
* KnownAssignedXidsRemove().
*
* Later, when we apply the running xact data we must be careful to ignore
* transactions already committed, since those commits raced ahead when
* making WAL entries.
*/ */
void void
LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
...@@ -788,6 +833,12 @@ LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) ...@@ -788,6 +833,12 @@ LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
/* /*
* Get details of any AccessExclusiveLocks being held at the moment. * Get details of any AccessExclusiveLocks being held at the moment.
*
* XXX GetRunningTransactionLocks() currently holds a lock on all partitions
* though it is possible to further optimise the locking. By reference
* counting locks and storing the value on the ProcArray entry for each backend
* we can easily tell if any locks need recording without trying to acquire
* the partition locks and scanning the lock table.
*/ */
locks = GetRunningTransactionLocks(&nlocks); locks = GetRunningTransactionLocks(&nlocks);
if (nlocks > 0) if (nlocks > 0)
...@@ -798,6 +849,11 @@ LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) ...@@ -798,6 +849,11 @@ LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
* record we write, because standby will open up when it sees this. * record we write, because standby will open up when it sees this.
*/ */
running = GetRunningTransactionData(); running = GetRunningTransactionData();
/*
* The gap between GetRunningTransactionData() and LogCurrentRunningXacts()
* is what most of the fuss is about here, so artifically extending this
* interval is a great way to test the little used parts of the code.
*/
LogCurrentRunningXacts(running); LogCurrentRunningXacts(running);
*oldestActiveXid = running->oldestRunningXid; *oldestActiveXid = running->oldestRunningXid;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/procarray.h,v 1.31 2010/01/23 16:37:12 sriggs Exp $ * $PostgreSQL: pgsql/src/include/storage/procarray.h,v 1.32 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -35,7 +35,8 @@ extern void ProcArrayApplyXidAssignment(TransactionId topxid, ...@@ -35,7 +35,8 @@ extern void ProcArrayApplyXidAssignment(TransactionId topxid,
extern void RecordKnownAssignedTransactionIds(TransactionId xid); extern void RecordKnownAssignedTransactionIds(TransactionId xid);
extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
int nsubxids, TransactionId *subxids); int nsubxids, TransactionId *subxids,
TransactionId max_xid);
extern void ExpireAllKnownAssignedTransactionIds(void); extern void ExpireAllKnownAssignedTransactionIds(void);
extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid); extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.9 2010/02/26 02:01:28 momjian Exp $ * $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.10 2010/05/13 11:15:38 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -68,6 +68,7 @@ typedef struct xl_running_xacts ...@@ -68,6 +68,7 @@ typedef struct xl_running_xacts
bool subxid_overflow; /* snapshot overflowed, subxids missing */ bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId oldestRunningXid; /* *not* oldestXmin */
TransactionId latestCompletedXid; /* so we can set xmax */
TransactionId xids[1]; /* VARIABLE LENGTH ARRAY */ TransactionId xids[1]; /* VARIABLE LENGTH ARRAY */
} xl_running_xacts; } xl_running_xacts;
...@@ -97,6 +98,7 @@ typedef struct RunningTransactionsData ...@@ -97,6 +98,7 @@ typedef struct RunningTransactionsData
bool subxid_overflow; /* snapshot overflowed, subxids missing */ bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId oldestRunningXid; /* *not* oldestXmin */
TransactionId latestCompletedXid; /* so we can set xmax */
TransactionId *xids; /* array of (sub)xids still running */ TransactionId *xids; /* array of (sub)xids still running */
} RunningTransactionsData; } RunningTransactionsData;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment