Commit 623a9ba7 authored by Andres Freund's avatar Andres Freund

snapshot scalability: cache snapshots using a xact completion counter.

Previous commits made it faster/more scalable to compute snapshots. But not
building a snapshot is still faster. Now that GetSnapshotData() does not
maintain RecentGlobal* anymore, that is actually not too hard:

This commit introduces xactCompletionCount, which tracks the number of
top-level transactions with xids (i.e. which may have modified the database)
that completed in some form since the start of the server.

We can avoid rebuilding the snapshot's contents whenever the current
xactCompletionCount is the same as it was when the snapshot was
originally built.  Currently this check happens while holding
ProcArrayLock. While it's likely possible to perform the check without
acquiring ProcArrayLock, it seems better to do that separately /
later, some careful analysis is required. Even with the lock this is a
significant win on its own.

On a smaller two socket machine this gains another ~1.03x, on a larger
machine the effect is roughly double (earlier patch version tested
though).  If we were able to safely avoid the lock there'd be another
significant gain on top of that.

Author: Andres Freund <andres@anarazel.de>
Reviewed-By: default avatarRobert Haas <robertmhaas@gmail.com>
Reviewed-By: default avatarThomas Munro <thomas.munro@gmail.com>
Reviewed-By: default avatarDavid Rowley <dgrowleyml@gmail.com>
Discussion: https://postgr.es/m/20200301083601.ews6hz5dduc3w2se@alap3.anarazel.de
parent 51300b45
...@@ -524,6 +524,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder) ...@@ -524,6 +524,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
snapshot->curcid = FirstCommandId; snapshot->curcid = FirstCommandId;
snapshot->active_count = 0; snapshot->active_count = 0;
snapshot->regd_count = 0; snapshot->regd_count = 0;
snapshot->snapXactCompletionCount = 0;
return snapshot; return snapshot;
} }
......
...@@ -407,6 +407,7 @@ CreateSharedProcArray(void) ...@@ -407,6 +407,7 @@ CreateSharedProcArray(void)
procArray->lastOverflowedXid = InvalidTransactionId; procArray->lastOverflowedXid = InvalidTransactionId;
procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId;
procArray->replication_slot_catalog_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId;
ShmemVariableCache->xactCompletionCount = 1;
} }
allProcs = ProcGlobal->allProcs; allProcs = ProcGlobal->allProcs;
...@@ -534,6 +535,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) ...@@ -534,6 +535,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
/* Advance global latestCompletedXid while holding the lock */ /* Advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid); MaintainLatestCompletedXid(latestXid);
/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;
ProcGlobal->xids[proc->pgxactoff] = 0; ProcGlobal->xids[proc->pgxactoff] = 0;
ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false; ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
ProcGlobal->subxidStates[proc->pgxactoff].count = 0; ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
...@@ -667,6 +671,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid) ...@@ -667,6 +671,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
{ {
size_t pgxactoff = proc->pgxactoff; size_t pgxactoff = proc->pgxactoff;
Assert(LWLockHeldByMe(ProcArrayLock));
Assert(TransactionIdIsValid(ProcGlobal->xids[pgxactoff])); Assert(TransactionIdIsValid(ProcGlobal->xids[pgxactoff]));
Assert(ProcGlobal->xids[pgxactoff] == proc->xid); Assert(ProcGlobal->xids[pgxactoff] == proc->xid);
...@@ -698,6 +703,9 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid) ...@@ -698,6 +703,9 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
/* Also advance global latestCompletedXid while holding the lock */ /* Also advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid); MaintainLatestCompletedXid(latestXid);
/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;
} }
/* /*
...@@ -1916,6 +1924,93 @@ GetMaxSnapshotSubxidCount(void) ...@@ -1916,6 +1924,93 @@ GetMaxSnapshotSubxidCount(void)
return TOTAL_MAX_CACHED_SUBXIDS; return TOTAL_MAX_CACHED_SUBXIDS;
} }
/*
* Initialize old_snapshot_threshold specific parts of a newly build snapshot.
*/
static void
GetSnapshotDataInitOldSnapshot(Snapshot snapshot)
{
if (!OldSnapshotThresholdActive())
{
/*
* If not using "snapshot too old" feature, fill related fields with
* dummy values that don't require any locking.
*/
snapshot->lsn = InvalidXLogRecPtr;
snapshot->whenTaken = 0;
}
else
{
/*
* Capture the current time and WAL stream location in case this
* snapshot becomes old enough to need to fall back on the special
* "old snapshot" logic.
*/
snapshot->lsn = GetXLogInsertRecPtr();
snapshot->whenTaken = GetSnapshotCurrentTimestamp();
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, snapshot->xmin);
}
}
/*
* Helper function for GetSnapshotData() that checks if the bulk of the
* visibility information in the snapshot is still valid. If so, it updates
* the fields that need to change and returns true. Otherwise it returns
* false.
*
* This very likely can be evolved to not need ProcArrayLock held (at very
* least in the case we already hold a snapshot), but that's for another day.
*/
static bool
GetSnapshotDataReuse(Snapshot snapshot)
{
uint64 curXactCompletionCount;
Assert(LWLockHeldByMe(ProcArrayLock));
if (unlikely(snapshot->snapXactCompletionCount == 0))
return false;
curXactCompletionCount = ShmemVariableCache->xactCompletionCount;
if (curXactCompletionCount != snapshot->snapXactCompletionCount)
return false;
/*
* If the current xactCompletionCount is still the same as it was at the
* time the snapshot was built, we can be sure that rebuilding the
* contents of the snapshot the hard way would result in the same snapshot
* contents:
*
* As explained in transam/README, the set of xids considered running by
* GetSnapshotData() cannot change while ProcArrayLock is held. Snapshot
* contents only depend on transactions with xids and xactCompletionCount
* is incremented whenever a transaction with an xid finishes (while
* holding ProcArrayLock) exclusively). Thus the xactCompletionCount check
* ensures we would detect if the snapshot would have changed.
*
* As the snapshot contents are the same as it was before, it is is safe
* to re-enter the snapshot's xmin into the PGPROC array. None of the rows
* visible under the snapshot could already have been removed (that'd
* require the set of running transactions to change) and it fulfills the
* requirement that concurrent GetSnapshotData() calls yield the same
* xmin.
*/
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = snapshot->xmin;
RecentXmin = snapshot->xmin;
Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
snapshot->curcid = GetCurrentCommandId(false);
snapshot->active_count = 0;
snapshot->regd_count = 0;
snapshot->copied = false;
GetSnapshotDataInitOldSnapshot(snapshot);
return true;
}
/* /*
* GetSnapshotData -- returns information about running transactions. * GetSnapshotData -- returns information about running transactions.
* *
...@@ -1963,6 +2058,7 @@ GetSnapshotData(Snapshot snapshot) ...@@ -1963,6 +2058,7 @@ GetSnapshotData(Snapshot snapshot)
TransactionId oldestxid; TransactionId oldestxid;
int mypgxactoff; int mypgxactoff;
TransactionId myxid; TransactionId myxid;
uint64 curXactCompletionCount;
TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
...@@ -2007,12 +2103,19 @@ GetSnapshotData(Snapshot snapshot) ...@@ -2007,12 +2103,19 @@ GetSnapshotData(Snapshot snapshot)
*/ */
LWLockAcquire(ProcArrayLock, LW_SHARED); LWLockAcquire(ProcArrayLock, LW_SHARED);
if (GetSnapshotDataReuse(snapshot))
{
LWLockRelease(ProcArrayLock);
return snapshot;
}
latest_completed = ShmemVariableCache->latestCompletedXid; latest_completed = ShmemVariableCache->latestCompletedXid;
mypgxactoff = MyProc->pgxactoff; mypgxactoff = MyProc->pgxactoff;
myxid = other_xids[mypgxactoff]; myxid = other_xids[mypgxactoff];
Assert(myxid == MyProc->xid); Assert(myxid == MyProc->xid);
oldestxid = ShmemVariableCache->oldestXid; oldestxid = ShmemVariableCache->oldestXid;
curXactCompletionCount = ShmemVariableCache->xactCompletionCount;
/* xmax is always latestCompletedXid + 1 */ /* xmax is always latestCompletedXid + 1 */
xmax = XidFromFullTransactionId(latest_completed); xmax = XidFromFullTransactionId(latest_completed);
...@@ -2266,6 +2369,7 @@ GetSnapshotData(Snapshot snapshot) ...@@ -2266,6 +2369,7 @@ GetSnapshotData(Snapshot snapshot)
snapshot->xcnt = count; snapshot->xcnt = count;
snapshot->subxcnt = subcount; snapshot->subxcnt = subcount;
snapshot->suboverflowed = suboverflowed; snapshot->suboverflowed = suboverflowed;
snapshot->snapXactCompletionCount = curXactCompletionCount;
snapshot->curcid = GetCurrentCommandId(false); snapshot->curcid = GetCurrentCommandId(false);
...@@ -2277,26 +2381,7 @@ GetSnapshotData(Snapshot snapshot) ...@@ -2277,26 +2381,7 @@ GetSnapshotData(Snapshot snapshot)
snapshot->regd_count = 0; snapshot->regd_count = 0;
snapshot->copied = false; snapshot->copied = false;
if (old_snapshot_threshold < 0) GetSnapshotDataInitOldSnapshot(snapshot);
{
/*
* If not using "snapshot too old" feature, fill related fields with
* dummy values that don't require any locking.
*/
snapshot->lsn = InvalidXLogRecPtr;
snapshot->whenTaken = 0;
}
else
{
/*
* Capture the current time and WAL stream location in case this
* snapshot becomes old enough to need to fall back on the special
* "old snapshot" logic.
*/
snapshot->lsn = GetXLogInsertRecPtr();
snapshot->whenTaken = GetSnapshotCurrentTimestamp();
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}
return snapshot; return snapshot;
} }
......
...@@ -597,6 +597,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, ...@@ -597,6 +597,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery; CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
/* NB: curcid should NOT be copied, it's a local matter */ /* NB: curcid should NOT be copied, it's a local matter */
CurrentSnapshot->snapXactCompletionCount = 0;
/* /*
* Now we have to fix what GetSnapshotData did with MyProc->xmin and * Now we have to fix what GetSnapshotData did with MyProc->xmin and
* TransactionXmin. There is a race condition: to make sure we are not * TransactionXmin. There is a race condition: to make sure we are not
...@@ -672,6 +674,7 @@ CopySnapshot(Snapshot snapshot) ...@@ -672,6 +674,7 @@ CopySnapshot(Snapshot snapshot)
newsnap->regd_count = 0; newsnap->regd_count = 0;
newsnap->active_count = 0; newsnap->active_count = 0;
newsnap->copied = true; newsnap->copied = true;
newsnap->snapXactCompletionCount = 0;
/* setup XID array */ /* setup XID array */
if (snapshot->xcnt > 0) if (snapshot->xcnt > 0)
...@@ -2209,6 +2212,7 @@ RestoreSnapshot(char *start_address) ...@@ -2209,6 +2212,7 @@ RestoreSnapshot(char *start_address)
snapshot->curcid = serialized_snapshot.curcid; snapshot->curcid = serialized_snapshot.curcid;
snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn; snapshot->lsn = serialized_snapshot.lsn;
snapshot->snapXactCompletionCount = 0;
/* Copy XIDs, if present. */ /* Copy XIDs, if present. */
if (serialized_snapshot.xcnt > 0) if (serialized_snapshot.xcnt > 0)
......
...@@ -231,6 +231,15 @@ typedef struct VariableCacheData ...@@ -231,6 +231,15 @@ typedef struct VariableCacheData
FullTransactionId latestCompletedXid; /* newest full XID that has FullTransactionId latestCompletedXid; /* newest full XID that has
* committed or aborted */ * committed or aborted */
/*
* Number of top-level transactions with xids (i.e. which may have
* modified the database) that completed in some form since the start of
* the server. This currently is solely used to check whether
* GetSnapshotData() needs to recompute the contents of the snapshot, or
* not. There are likely other users of this. Always above 1.
*/
uint64 xactCompletionCount;
/* /*
* These fields are protected by XactTruncationLock * These fields are protected by XactTruncationLock
*/ */
......
...@@ -207,6 +207,13 @@ typedef struct SnapshotData ...@@ -207,6 +207,13 @@ typedef struct SnapshotData
TimestampTz whenTaken; /* timestamp when snapshot was taken */ TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */ XLogRecPtr lsn; /* position in the WAL stream when taken */
/*
* The transaction completion count at the time GetSnapshotData() built
* this snapshot. Allows to avoid re-computing static snapshots when no
* transactions completed since the last GetSnapshotData().
*/
uint64 snapXactCompletionCount;
} SnapshotData; } SnapshotData;
#endif /* SNAPSHOT_H */ #endif /* SNAPSHOT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment