Commit 2871b461 authored by Tom Lane's avatar Tom Lane

Replace the KnownAssignedXids hash table with a sorted-array data structure,

and be more tense about the locking requirements for it, to improve performance
in Hot Standby mode.  In passing fix a few bugs and improve a number of
comments in the existing HS code.

Simon Riggs, with some editorialization by Tom
parent 871e73bb
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.60 2010/04/13 14:17:46 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.61 2010/04/28 00:09:05 tgl Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
......@@ -1200,6 +1200,9 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
Assert(TransactionIdIsValid(xid));
if (max_prepared_xacts <= 0)
return false; /* nothing to do */
/* Read and validate file */
buf = ReadTwoPhaseFile(xid, false);
if (buf == NULL)
......
......@@ -17,27 +17,27 @@
* as are the myProcLocks lists. They can be distinguished from regular
* backend PGPROCs at need by checking for pid == 0.
*
* During recovery, we also keep a list of XIDs representing transactions
* that are known to be running at current point in WAL recovery. This
* list is kept in the KnownAssignedXids array, and updated by watching
* the sequence of arriving xids. This is very important because if we leave
* those xids out of the snapshot then they will appear to be already complete.
* Later, when they have actually completed this could lead to confusion as to
* whether those xids are visible or not, blowing a huge hole in MVCC.
* We need 'em.
*
* It is theoretically possible for a FATAL error to explode before writing
* an abort record. This could tie up KnownAssignedXids indefinitely, so
* we prune the array when a valid list of running xids arrives. These quirks,
* if they do ever exist in reality will not effect the correctness of
* snapshots.
* During hot standby, we also keep a list of XIDs representing transactions
* that are known to be running in the master (or more precisely, were running
* as of the current point in the WAL stream). This list is kept in the
* KnownAssignedXids array, and is updated by watching the sequence of
* arriving XIDs. This is necessary because if we leave those XIDs out of
* snapshots taken for standby queries, then they will appear to be already
* complete, leading to MVCC failures. Note that in hot standby, the PGPROC
* array represents standby processes, which by definition are not running
* transactions that have XIDs.
*
* It is perhaps possible for a backend on the master to terminate without
* writing an abort record for its transaction. While that shouldn't really
* happen, it would tie up KnownAssignedXids indefinitely, so we protect
* ourselves by pruning the array when a valid list of running XIDs arrives.
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.66 2010/04/22 08:04:25 sriggs Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.67 2010/04/28 00:09:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -52,11 +52,11 @@
#include "access/twophase.h"
#include "miscadmin.h"
#include "storage/procarray.h"
#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
static RunningTransactionsData CurrentRunningXactsData;
/* Our shared memory area */
typedef struct ProcArrayStruct
......@@ -64,14 +64,21 @@ typedef struct ProcArrayStruct
int numProcs; /* number of valid procs entries */
int maxProcs; /* allocated size of procs array */
int numKnownAssignedXids; /* current number of known assigned
* xids */
int maxKnownAssignedXids; /* allocated size of known assigned
* xids */
/*
* Known assigned XIDs handling
*/
int maxKnownAssignedXids; /* allocated size of array */
int numKnownAssignedXids; /* currrent # of valid entries */
int tailKnownAssignedXids; /* index of oldest valid element */
int headKnownAssignedXids; /* index of newest element, + 1 */
slock_t known_assigned_xids_lck; /* protects head/tail pointers */
/*
* Highest subxid that overflowed KnownAssignedXids array. Similar to
* overflowing cached subxids in PGPROC entries.
* Highest subxid that has been removed from KnownAssignedXids array to
* prevent overflow; or InvalidTransactionId if none. We track this for
* similar reasons to tracking overflowing cached subxids in PGPROC
* entries. Must hold exclusive ProcArrayLock to change this, and shared
* lock to read it.
*/
TransactionId lastOverflowedXid;
......@@ -87,7 +94,8 @@ static ProcArrayStruct *procArray;
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
static HTAB *KnownAssignedXidsHash;
static TransactionId *KnownAssignedXids;
static bool *KnownAssignedXidsValid;
static TransactionId latestObservedXid = InvalidTransactionId;
/*
......@@ -112,6 +120,7 @@ static long xc_by_my_xact = 0;
static long xc_by_latest_xid = 0;
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
static long xc_by_known_assigned = 0;
static long xc_no_overflow = 0;
static long xc_slow_answer = 0;
......@@ -121,6 +130,7 @@ static long xc_slow_answer = 0;
#define xc_by_latest_xid_inc() (xc_by_latest_xid++)
#define xc_by_main_xid_inc() (xc_by_main_xid++)
#define xc_by_child_xid_inc() (xc_by_child_xid++)
#define xc_by_known_assigned_inc() (xc_by_known_assigned++)
#define xc_no_overflow_inc() (xc_no_overflow++)
#define xc_slow_answer_inc() (xc_slow_answer++)
......@@ -133,18 +143,25 @@ static void DisplayXidCache(void);
#define xc_by_latest_xid_inc() ((void) 0)
#define xc_by_main_xid_inc() ((void) 0)
#define xc_by_child_xid_inc() ((void) 0)
#define xc_by_known_assigned_inc() ((void) 0)
#define xc_no_overflow_inc() ((void) 0)
#define xc_slow_answer_inc() ((void) 0)
#endif /* XIDCACHE_DEBUG */
/* Primitives for KnownAssignedXids array handling for standby */
static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
TransactionId xmax);
static bool KnownAssignedXidsExist(TransactionId xid);
static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
bool exclusive_lock);
static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
static bool KnownAssignedXidExists(TransactionId xid);
static void KnownAssignedXidsRemove(TransactionId xid);
static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
TransactionId *subxids);
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
TransactionId *xmin,
TransactionId xmax);
static void KnownAssignedXidsDisplay(int trace_level);
/*
......@@ -155,10 +172,10 @@ ProcArrayShmemSize(void)
{
Size size;
size = offsetof(ProcArrayStruct, procs);
/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
/* Normal processing - MyProc slots */
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
size = offsetof(ProcArrayStruct, procs);
size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
/*
......@@ -170,11 +187,17 @@ ProcArrayShmemSize(void)
* since we may at times copy the whole of the data structures around. We
* refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
*/
#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
if (XLogRequestRecoveryConnections)
{
size = add_size(size,
mul_size(sizeof(TransactionId),
TOTAL_MAX_CACHED_SUBXIDS));
size = add_size(size,
hash_estimate_size(TOTAL_MAX_CACHED_SUBXIDS,
sizeof(TransactionId)));
mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
}
return size;
}
......@@ -190,7 +213,9 @@ CreateSharedProcArray(void)
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
ShmemInitStruct("Proc Array",
mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
add_size(offsetof(ProcArrayStruct, procs),
mul_size(sizeof(PGPROC *),
PROCARRAY_MAXPROCS)),
&found);
if (!found)
......@@ -198,31 +223,28 @@ CreateSharedProcArray(void)
/*
* We're the first - initialize.
*/
/* Normal processing */
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
procArray->numKnownAssignedXids = 0;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
procArray->headKnownAssignedXids = 0;
SpinLockInit(&procArray->known_assigned_xids_lck);
procArray->lastOverflowedXid = InvalidTransactionId;
}
/* Create or attach to the KnownAssignedXids arrays too, if needed */
if (XLogRequestRecoveryConnections)
{
/* Create or attach to the KnownAssignedXids hash table */
HASHCTL info;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(TransactionId);
info.entrysize = sizeof(TransactionId);
info.hash = tag_hash;
KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
TOTAL_MAX_CACHED_SUBXIDS,
TOTAL_MAX_CACHED_SUBXIDS,
&info,
HASH_ELEM | HASH_FUNCTION);
if (!KnownAssignedXidsHash)
elog(FATAL, "could not initialize known assigned xids hash table");
KnownAssignedXids = (TransactionId *)
ShmemInitStruct("KnownAssignedXids",
mul_size(sizeof(TransactionId),
TOTAL_MAX_CACHED_SUBXIDS),
&found);
KnownAssignedXidsValid = (bool *)
ShmemInitStruct("KnownAssignedXidsValid",
mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
&found);
}
}
......@@ -427,9 +449,9 @@ ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
* are atypical cases where we need to take it in steps.
*
* Use the data about running transactions on master to create the initial
* state of KnownAssignedXids. We also these records to regularly prune
* state of KnownAssignedXids. We also use these records to regularly prune
* KnownAssignedXids because we know it is possible that some transactions
* with FATAL errors do not write abort records, which could cause eventual
* with FATAL errors fail to write abort records, which could cause eventual
* overflow.
*
* Only used during recovery. Notice the signature is very similar to a
......@@ -544,35 +566,42 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
continue;
KnownAssignedXidsAdd(&xid, 1);
KnownAssignedXidsAdd(xid, xid, true);
}
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
/*
* Update lastOverflowedXid if the snapshot had overflown. We don't know
* the exact value for this, so conservatively assume that it's nextXid-1
* Update lastOverflowedXid if the snapshot has any missing subxids.
* We don't know the specific subxids that are missing, so conservatively
* assume the last one is latestObservedXid. If no missing subxids,
* try to clear lastOverflowedXid.
*/
if (running->subxid_overflow &&
TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
procArray->lastOverflowedXid = latestObservedXid;
if (running->subxid_overflow)
{
if (TransactionIdFollows(latestObservedXid,
procArray->lastOverflowedXid))
procArray->lastOverflowedXid = latestObservedXid;
}
else if (TransactionIdFollows(running->oldestRunningXid,
procArray->lastOverflowedXid))
procArray->lastOverflowedXid = InvalidTransactionId;
LWLockRelease(ProcArrayLock);
/* nextXid must be beyond any observed xid */
if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
ShmemVariableCache->nextXid = running->nextXid;
elog(trace_recovery(DEBUG2),
"running transaction data initialized");
LWLockRelease(ProcArrayLock);
elog(trace_recovery(DEBUG2), "running transaction data initialized");
if (standbyState == STANDBY_SNAPSHOT_READY)
elog(trace_recovery(DEBUG2),
"recovery snapshots are now enabled");
elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
}
/*
* ProcArrayApplyXidAssignment
* Process an XLOG_XACT_ASSIGNMENT WAL record
*/
void
ProcArrayApplyXidAssignment(TransactionId topxid,
int nsubxids, TransactionId *subxids)
......@@ -615,13 +644,12 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
/*
* Remove from known-assigned-xacts.
* Remove subxids from known-assigned-xacts.
*/
for (i = 0; i < nsubxids; i++)
KnownAssignedXidsRemove(subxids[i]);
KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
/*
* Advance lastOverflowedXid when required.
* Advance lastOverflowedXid to be at least the last of these subxids.
*/
if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
procArray->lastOverflowedXid = max_xid;
......@@ -633,23 +661,27 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
* TransactionIdIsInProgress -- is given transaction running in some backend
*
* Aside from some shortcuts such as checking RecentXmin and our own Xid,
* there are three possibilities for finding a running transaction:
* there are four possibilities for finding a running transaction:
*
* 1. the given Xid is a main transaction Id. We will find this out cheaply
* 1. The given Xid is a main transaction Id. We will find this out cheaply
* by looking at the PGPROC struct for each backend.
*
* 2. the given Xid is one of the cached subxact Xids in the PGPROC array.
* 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
* We can find this out cheaply too.
*
* 3. Search the SubTrans tree to find the Xid's topmost parent, and then
* see if that is running according to PGPROC. This is the slowest, but
* sadly it has to be done always if the other two failed, unless we see
* that the cached subxact sets are complete (none have overflowed).
* 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
* if the Xid is running on the master.
*
* 4. Search the SubTrans tree to find the Xid's topmost parent, and then
* see if that is running according to PGPROC or KnownAssignedXids. This is
* the slowest way, but sadly it has to be done always if the others failed,
* unless we see that the cached subxact sets are complete (none have
* overflowed).
*
* ProcArrayLock has to be held while we do 1 and 2. If we save the top Xids
* while doing 1, we can release the ProcArrayLock while we do 3. This buys
* back some concurrency (we can't retrieve the main Xids from PGPROC again
* anyway; see GetNewTransactionId).
* ProcArrayLock has to be held while we do 1, 2, 3. If we save the top Xids
* while doing 1 and 3, we can release the ProcArrayLock while we do 4.
* This buys back some concurrency (and we can't retrieve the main Xids from
* PGPROC again anyway; see GetNewTransactionId).
*/
bool
TransactionIdIsInProgress(TransactionId xid)
......@@ -705,8 +737,7 @@ TransactionIdIsInProgress(TransactionId xid)
* known-assigned list. If we later finish recovery, we no longer need
* the bigger array, but we don't bother to shrink it.
*/
int maxxids = RecoveryInProgress() ?
arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS;
int maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
if (xids == NULL)
......@@ -778,7 +809,7 @@ TransactionIdIsInProgress(TransactionId xid)
}
/*
* Save the main Xid for step 3. We only need to remember main Xids
* Save the main Xid for step 4. We only need to remember main Xids
* that have uncached children. (Note: there is no race condition
* here because the overflowed flag cannot be cleared, only set, while
* we hold ProcArrayLock. So we can't miss an Xid that we need to
......@@ -788,25 +819,28 @@ TransactionIdIsInProgress(TransactionId xid)
xids[nxids++] = pxid;
}
/* In hot standby mode, check the known-assigned-xids list. */
/*
* Step 3: in hot standby mode, check the known-assigned-xids list. XIDs
* in the list must be treated as running.
*/
if (RecoveryInProgress())
{
/* none of the PGPROC entries should have XIDs in hot standby mode */
Assert(nxids == 0);
if (KnownAssignedXidsExist(xid))
if (KnownAssignedXidExists(xid))
{
LWLockRelease(ProcArrayLock);
/* XXX: should we have a separate counter for this? */
/* xc_by_main_xid_inc(); */
xc_by_known_assigned_inc();
return true;
}
/*
* If the KnownAssignedXids overflowed, we have to check pg_subtrans
* too. Copy all xids from KnownAssignedXids that are lower than xid,
* too. Fetch all xids from KnownAssignedXids that are lower than xid,
* since if xid is a subtransaction its parent will always have a
* lower value.
* lower value. Note we will collect both main and subXIDs here,
* but there's no help for it.
*/
if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
nxids = KnownAssignedXidsGet(xids, xid);
......@@ -825,7 +859,7 @@ TransactionIdIsInProgress(TransactionId xid)
}
/*
* Step 3: have to check pg_subtrans.
* Step 4: have to check pg_subtrans.
*
* At this point, we know it's either a subtransaction of one of the Xids
* in xids[], or it's not running. If it's an already-failed
......@@ -860,7 +894,8 @@ TransactionIdIsInProgress(TransactionId xid)
* TransactionIdIsActive -- is xid the top-level XID of an active backend?
*
* This differs from TransactionIdIsInProgress in that it ignores prepared
* transactions. Also, we ignore subtransactions since that's not needed
* transactions, as well as transactions running on the master if we're in
* hot standby. Also, we ignore subtransactions since that's not needed
* for current uses.
*/
bool
......@@ -1181,6 +1216,8 @@ GetSnapshotData(Snapshot snapshot)
else
{
/*
* We're in hot standby, so get XIDs from KnownAssignedXids.
*
* We store all xids directly into subxip[]. Here's why:
*
* In recovery we don't know which xids are top-level and which are
......@@ -1201,9 +1238,10 @@ GetSnapshotData(Snapshot snapshot)
* depending upon when the snapshot was taken, or change normal
* snapshot processing so it matches.
*/
subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax);
subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
xmax);
if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid))
if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
suboverflowed = true;
}
......@@ -1248,9 +1286,12 @@ GetSnapshotData(Snapshot snapshot)
/*
* GetRunningTransactionData -- returns information about running transactions.
*
* Similar to GetSnapshotData but returning more information. We include
* Similar to GetSnapshotData but returns more information. We include
* all PGPROCs with an assigned TransactionId, even VACUUM processes.
*
* The returned data structure is statically allocated; caller should not
* modify it, and must not assume it is valid past the next call.
*
* This is never executed during recovery so there is no need to look at
* KnownAssignedXids.
*
......@@ -1261,8 +1302,11 @@ GetSnapshotData(Snapshot snapshot)
RunningTransactions
GetRunningTransactionData(void)
{
/* result workspace */
static RunningTransactionsData CurrentRunningXactsData;
ProcArrayStruct *arrayP = procArray;
RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
TransactionId latestCompletedXid;
TransactionId oldestRunningXid;
TransactionId *xids;
......@@ -1279,7 +1323,7 @@ GetRunningTransactionData(void)
* the lock, so we can't look at numProcs. Likewise, we allocate much
* more subxip storage than is probably needed.
*
* Should only be allocated for bgwriter, since only ever executed during
* Should only be allocated in bgwriter, since only ever executed during
* checkpoints.
*/
if (CurrentRunningXacts->xids == NULL)
......@@ -2095,18 +2139,20 @@ static void
DisplayXidCache(void)
{
fprintf(stderr,
"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, nooflo: %ld, slow: %ld\n",
"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
xc_by_recent_xmin,
xc_by_known_xact,
xc_by_my_xact,
xc_by_latest_xid,
xc_by_main_xid,
xc_by_child_xid,
xc_by_known_assigned,
xc_no_overflow,
xc_slow_answer);
}
#endif /* XIDCACHE_DEBUG */
/* ----------------------------------------------
* KnownAssignedTransactions sub-module
* ----------------------------------------------
......@@ -2114,51 +2160,64 @@ DisplayXidCache(void)
/*
* In Hot Standby mode, we maintain a list of transactions that are (or were)
* running in the master at the current point in WAL.
* running in the master at the current point in WAL. These XIDs must be
* treated as running by standby transactions, even though they are not in
* the standby server's PGPROC array.
*
* We record all XIDs that we know have been assigned. That includes all the
* XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
* been assigned. We can deduce the existence of unobserved XIDs because we
* know XIDs are assigned in sequence, with no gaps. The KnownAssignedXids
* list expands as new XIDs are observed or inferred, and contracts when
* transaction completion records arrive.
*
* During hot standby we do not fret too much about the distinction between
* top-level XIDs and subtransaction XIDs. We store both together in the
* KnownAssignedXids list. In backends, this is copied into snapshots in
* GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
* doesn't care about the distinction either. Subtransaction XIDs are
* effectively treated as top-level XIDs and in the typical case pg_subtrans
* links are *not* maintained (which does not affect visibility).
*
* We have room in KnownAssignedXids and in snapshots to hold maxProcs *
* (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
* report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
* least every PGPROC_MAX_CACHED_SUBXIDS. When we receive one of these
* records, we mark the subXIDs as children of the top XID in pg_subtrans,
* and then remove them from KnownAssignedXids. This prevents overflow of
* KnownAssignedXids and snapshots, at the cost that status checks for these
* subXIDs will take a slower path through TransactionIdIsInProgress().
* This means that KnownAssignedXids is not necessarily complete for subXIDs,
* though it should be complete for top-level XIDs; this is the same situation
* that holds with respect to the PGPROC entries in normal running.
*
* When we throw away subXIDs from KnownAssignedXids, we need to keep track of
* that, similarly to tracking overflow of a PGPROC's subxids array. We do
* that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
* As long as that is within the range of interesting XIDs, we have to assume
* that subXIDs are missing from snapshots. (Note that subXID overflow occurs
* on primary when 65th subXID arrives, whereas on standby it occurs when 64th
* subXID arrives - that is not an error.)
*
* Should a backend on primary somehow disappear before it can write an abort
* record, then we just leave those XIDs in KnownAssignedXids. They actually
* aborted but we think they were running; the distinction is irrelevant
* because either way any changes done by the transaction are not visible to
* backends in the standby. We prune KnownAssignedXids when
* XLOG_XACT_RUNNING_XACTS arrives, to forestall possible overflow of the
* array due to such dead XIDs.
*/
/*
* RecordKnownAssignedTransactionIds
* Record the given XID in KnownAssignedXids, as well as any preceding
* unobserved XIDs.
*
* RecordKnownAssignedTransactionIds() should be run for *every* WAL record
* type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
* snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
* local variables, so should only be called by Startup process.
*
* We record all xids that we know have been assigned. That includes
* all the xids on the WAL record, plus all unobserved xids that
* we can deduce have been assigned. We can deduce the existence of
* unobserved xids because we know xids are in sequence, with no gaps.
*
* During recovery we do not fret too much about the distinction between
* top-level xids and subtransaction xids. We hold both together in
* a hash table called KnownAssignedXids. In backends, this is copied into
* snapshots in GetSnapshotData(), taking advantage
* of the fact that XidInMVCCSnapshot() doesn't care about the distinction
* either. Subtransaction xids are effectively treated as top-level xids
* and in the typical case pg_subtrans is *not* maintained (and that
* does not effect visibility).
*
* KnownAssignedXids expands as new xids are observed or inferred, and
* contracts when transaction completion records arrive. We have room in a
* snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
* every transaction must report their subtransaction xids in a special
* WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
* to remove the subtransaction xids and update pg_subtrans instead. Snapshots
* are still correct yet we don't overflow SnapshotData structure. When we do
* this we need
* to keep track of which xids caused the snapshot to overflow. We do that
* by simply tracking the lastOverflowedXid - if it is within the bounds of
* the KnownAssignedXids then we know the snapshot overflowed. (Note that
* subxid overflow occurs on primary when 65th subxid arrives, whereas on
* standby it occurs when 64th subxid arrives - that is not an error).
*
* Should FATAL errors result in a backend on primary disappearing before
* it can write an abort record then we just leave those xids in
* KnownAssignedXids. They actually aborted but we think they were running;
* the distinction is irrelevant because either way any changes done by the
* transaction are not visible to backends in the standby.
* We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
* ensure we do not overflow.
*
* If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
* xids that are not present.
* type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first
* snapshot so that RecordKnownAssignedTransactionIds() can be called).
*
* Must only be called in Startup process.
*/
void
RecordKnownAssignedTransactionIds(TransactionId xid)
......@@ -2196,45 +2255,33 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
*/
if (TransactionIdFollows(xid, latestObservedXid))
{
TransactionId next_expected_xid = latestObservedXid;
TransactionIdAdvance(next_expected_xid);
TransactionId next_expected_xid;
/*
* Locking requirement is currently higher than for xid assignment in
* normal running. However, we only get called here for new high xids
* - so on a multi-processor where it is common that xids arrive out
* of order the average number of locks per assignment will actually
* reduce. So not too worried about this locking.
*
* XXX It does seem possible that we could add a whole range of
* numbers atomically to KnownAssignedXids, if we use a sorted list
* for KnownAssignedXids. But that design also increases the length of
* time we hold lock when we process commits/aborts, so on balance
* don't worry about this.
* Extend clog and subtrans like we do in GetNewTransactionId()
* during normal operation using individual extend steps.
* Typical case requires almost no activity.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
next_expected_xid = latestObservedXid;
TransactionIdAdvance(next_expected_xid);
while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
{
if (TransactionIdPrecedes(next_expected_xid, xid))
elog(trace_recovery(DEBUG4),
"recording unobserved xid %u (latestObservedXid %u)",
next_expected_xid, latestObservedXid);
KnownAssignedXidsAdd(&next_expected_xid, 1);
/*
* Extend clog and subtrans like we do in GetNewTransactionId()
* during normal operation
*/
ExtendCLOG(next_expected_xid);
ExtendSUBTRANS(next_expected_xid);
TransactionIdAdvance(next_expected_xid);
}
LWLockRelease(ProcArrayLock);
/*
* Add the new xids onto the KnownAssignedXids array.
*/
next_expected_xid = latestObservedXid;
TransactionIdAdvance(next_expected_xid);
KnownAssignedXidsAdd(next_expected_xid, xid, false);
/*
* Now we can advance latestObservedXid
*/
latestObservedXid = xid;
}
......@@ -2247,15 +2294,18 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
}
}
/*
* ExpireTreeKnownAssignedTransactionIds
* Remove the given XIDs from KnownAssignedXids.
*/
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
TransactionId *subxids)
{
int i;
TransactionId max_xid;
if (standbyState == STANDBY_DISABLED)
return;
return; /* nothing to do */
max_xid = TransactionIdLatest(xid, nsubxids, subxids);
......@@ -2264,268 +2314,655 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
if (TransactionIdIsValid(xid))
KnownAssignedXidsRemove(xid);
for (i = 0; i < nsubxids; i++)
KnownAssignedXidsRemove(subxids[i]);
KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
/* Like in ProcArrayRemove, advance latestCompletedXid */
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->latestCompletedXid))
/* As in ProcArrayEndTransaction, advance latestCompletedXid */
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
max_xid))
ShmemVariableCache->latestCompletedXid = max_xid;
LWLockRelease(ProcArrayLock);
}
/*
* ExpireAllKnownAssignedTransactionIds
* Remove all entries in KnownAssignedXids
*/
void
ExpireAllKnownAssignedTransactionIds(void)
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
KnownAssignedXidsRemovePreceding(InvalidTransactionId);
LWLockRelease(ProcArrayLock);
}
/*
* ExpireOldKnownAssignedTransactionIds
* Remove KnownAssignedXids entries preceding the given XID
*/
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
KnownAssignedXidsRemoveMany(xid, true);
KnownAssignedXidsRemovePreceding(xid);
LWLockRelease(ProcArrayLock);
}
/*
* Private module functions to manipulate KnownAssignedXids
*
* There are 3 main users of the KnownAssignedXids data structure:
*
* * backends taking snapshots
* * startup process adding new knownassigned xids
* * startup process removing xids as transactions end
*
* If we make KnownAssignedXids a simple sorted array then the first two
* operations are fast, but the last one is at least O(N). If we make
* KnownAssignedXids a hash table then the last two operations are fast,
* though we have to do more work at snapshot time. Doing more work at
* commit could slow down taking snapshots anyway because of lwlock
* contention. Scanning the hash table is O(N) on the max size of the array,
* so performs poorly in comparison when we have very low numbers of
* write transactions to process. But at least it is constant overhead
* and a sequential memory scan will utilise hardware memory readahead
* to give much improved performance. In any case the emphasis must be on
* having the standby process changes quickly so that it can provide
* high availability. So we choose to implement as a hash table.
* There are 5 main uses of the KnownAssignedXids data structure:
*
* * backends taking snapshots - all valid XIDs need to be copied out
* * backends seeking to determine presence of a specific XID
* * startup process adding new known-assigned XIDs
* * startup process removing specific XIDs as transactions end
* * startup process pruning array when special WAL records arrive
*
* This data structure is known to be a hot spot during Hot Standby, so we
* go to some lengths to make these operations as efficient and as concurrent
* as possible.
*
* The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
* order, to be exact --- to allow binary search for specific XIDs. Note:
* in general TransactionIdPrecedes would not provide a total order, but
* we know that the entries present at any instant should not extend across
* a large enough fraction of XID space to wrap around (the master would
* shut down for fear of XID wrap long before that happens). So it's OK to
* use TransactionIdPrecedes as a binary-search comparator.
*
* It's cheap to maintain the sortedness during insertions, since new known
* XIDs are always reported in XID order; we just append them at the right.
*
* To keep individual deletions cheap, we need to allow gaps in the array.
* This is implemented by marking array elements as valid or invalid using
* the parallel boolean array KnownAssignedXidsValid[]. A deletion is done
* by setting KnownAssignedXidsValid[i] to false, *without* clearing the
* XID entry itself. This preserves the property that the XID entries are
* sorted, so we can do binary searches easily. Periodically we compress
* out the unused entries; that's much cheaper than having to compress the
* array immediately on every deletion.
*
* The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
* are those with indexes tail <= i < head; items outside this subscript range
* have unspecified contents. When head reaches the end of the array, we
* force compression of unused entries rather than wrapping around, since
* allowing wraparound would greatly complicate the search logic. We maintain
* an explicit tail pointer so that pruning of old XIDs can be done without
* immediately moving the array contents. In most cases only a small fraction
* of the array contains valid entries at any instant.
*
* Although only the startup process can ever change the KnownAssignedXids
* data structure, we still need interlocking so that standby backends will
* not observe invalid intermediate states. The convention is that backends
* must hold shared ProcArrayLock to examine the array. To remove XIDs from
* the array, the startup process must hold ProcArrayLock exclusively, for
* the usual transactional reasons (compare commit/abort of a transaction
* during normal running). Compressing unused entries out of the array
* likewise requires exclusive lock. To add XIDs to the array, we just insert
* them into slots to the right of the head pointer and then advance the head
* pointer. This wouldn't require any lock at all, except that on machines
* with weak memory ordering we need to be careful that other processors
* see the array element changes before they see the head pointer change.
* We handle this by using a spinlock to protect reads and writes of the
* head/tail pointers. (We could dispense with the spinlock if we were to
* create suitable memory access barrier primitives and use those instead.)
* The spinlock must be taken to read or write the head/tail pointers unless
* the caller holds ProcArrayLock exclusively.
*
* Algorithmic analysis:
*
* If we have a maximum of M slots, with N XIDs currently spread across
* S elements then we have N <= S <= M always.
*
* * Adding a new XID is O(1) and needs little locking (unless compression
* must happen)
* * Compressing the array is O(S) and requires exclusive lock
* * Removing an XID is O(logS) and requires exclusive lock
* * Taking a snapshot is O(S) and requires shared lock
* * Checking for an XID is O(logS) and requires shared lock
*
* In comparison, using a hash table for KnownAssignedXids would mean that
* taking snapshots would be O(M). If we can maintain S << M then the
* sorted array technique will deliver significantly faster snapshots.
* If we try to keep S too small then we will spend too much time compressing,
* so there is an optimal point for any workload mix. We use a heuristic to
* decide when to compress the array, though trimming also helps reduce
* frequency of compressing. The heuristic requires us to track the number of
* currently valid XIDs in the array.
*/
/*
* Add xids into KnownAssignedXids.
* Compress KnownAssignedXids by shifting valid data down to the start of the
* array, removing any gaps.
*
* A compression step is forced if "force" is true, otherwise we do it
* only if a heuristic indicates it's a good time to do it.
*
* Must be called while holding ProcArrayLock in Exclusive mode
* Caller must hold ProcArrayLock in exclusive mode.
*/
static void
KnownAssignedXidsAdd(TransactionId *xids, int nxids)
KnownAssignedXidsCompress(bool force)
{
TransactionId *result;
bool found;
int i;
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
int head, tail;
int compress_index;
int i;
for (i = 0; i < nxids; i++)
/* no spinlock required since we hold ProcArrayLock exclusively */
head = pArray->headKnownAssignedXids;
tail = pArray->tailKnownAssignedXids;
if (!force)
{
Assert(TransactionIdIsValid(xids[i]));
/*
* If we can choose how much to compress, use a heuristic to
* avoid compressing too often or not often enough.
*
* Heuristic is if we have a large enough current spread and
* less than 50% of the elements are currently in use, then
* compress. This should ensure we compress fairly infrequently.
* We could compress less often though the virtual array would
* spread out more and snapshots would become more expensive.
*/
int nelements = head - tail;
elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
if (nelements < 4 * PROCARRAY_MAXPROCS ||
nelements < 2 * pArray->numKnownAssignedXids)
return;
}
procArray->numKnownAssignedXids++;
if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
/*
* We compress the array by reading the valid values from tail
* to head, re-aligning data to 0th element.
*/
compress_index = 0;
for (i = tail; i < head; i++)
{
if (KnownAssignedXidsValid[i])
{
KnownAssignedXidsDisplay(LOG);
LWLockRelease(ProcArrayLock);
elog(ERROR, "too many KnownAssignedXids (%u)", procArray->maxKnownAssignedXids);
KnownAssignedXids[compress_index] = KnownAssignedXids[i];
KnownAssignedXidsValid[compress_index] = true;
compress_index++;
}
}
result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
&found);
pArray->tailKnownAssignedXids = 0;
pArray->headKnownAssignedXids = compress_index;
}
if (!result)
/*
* Add xids into KnownAssignedXids at the head of the array.
*
* xids from from_xid to to_xid, inclusive, are added to the array.
*
* If exclusive_lock is true then caller already holds ProcArrayLock in
* exclusive mode, so we need no extra locking here. Else caller holds no
* lock, so we need to be sure we maintain sufficient interlocks against
* concurrent readers. (Only the startup process ever calls this, so no need
* to worry about concurrent writers.)
*/
static void
KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
bool exclusive_lock)
{
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
TransactionId next_xid;
int head, tail;
int nxids;
int i;
Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
/*
* Calculate how many array slots we'll need. Normally this is cheap;
* in the unusual case where the XIDs cross the wrap point, we do it the
* hard way.
*/
if (to_xid >= from_xid)
nxids = to_xid - from_xid + 1;
else
{
nxids = 1;
next_xid = from_xid;
while (TransactionIdPrecedes(next_xid, to_xid))
{
nxids++;
TransactionIdAdvance(next_xid);
}
}
/*
* Since only the startup process modifies the head/tail pointers,
* we don't need a lock to read them here.
*/
head = pArray->headKnownAssignedXids;
tail = pArray->tailKnownAssignedXids;
Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
/*
* Verify that insertions occur in TransactionId sequence. Note that
* even if the last existing element is marked invalid, it must still
* have a correctly sequenced XID value.
*/
if (head > tail &&
TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
{
KnownAssignedXidsDisplay(LOG);
elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
}
/*
* If our xids won't fit in the remaining space, compress out free space
*/
if (head + nxids > pArray->maxKnownAssignedXids)
{
/* must hold lock to compress */
if (!exclusive_lock)
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
KnownAssignedXidsCompress(true);
head = pArray->headKnownAssignedXids;
/* note: we no longer care about the tail pointer */
if (!exclusive_lock)
LWLockRelease(ProcArrayLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
/*
* If it still won't fit then we're out of memory
*/
if (head + nxids > pArray->maxKnownAssignedXids)
elog(ERROR, "too many KnownAssignedXids");
}
/* Now we can insert the xids into the space starting at head */
next_xid = from_xid;
for (i = 0; i < nxids; i++)
{
KnownAssignedXids[head] = next_xid;
KnownAssignedXidsValid[head] = true;
TransactionIdAdvance(next_xid);
head++;
}
/* Adjust count of number of valid entries */
pArray->numKnownAssignedXids += nxids;
/*
* Now update the head pointer. We use a spinlock to protect this
* pointer, not because the update is likely to be non-atomic, but to
* ensure that other processors see the above array updates before they
* see the head pointer change.
*
* If we're holding ProcArrayLock exclusively, there's no need to take
* the spinlock.
*/
if (exclusive_lock)
pArray->headKnownAssignedXids = head;
else
{
SpinLockAcquire(&pArray->known_assigned_xids_lck);
pArray->headKnownAssignedXids = head;
SpinLockRelease(&pArray->known_assigned_xids_lck);
}
}
/*
* KnownAssignedXidsSearch
*
* Searches KnownAssignedXids for a specific xid and optionally removes it.
* Returns true if it was found, false if not.
*
* Caller must hold ProcArrayLock in shared or exclusive mode.
* Exclusive lock must be held for remove = true.
*/
static bool
KnownAssignedXidsSearch(TransactionId xid, bool remove)
{
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
int first, last;
int head;
int tail;
int result_index = -1;
if (remove)
{
/* we hold ProcArrayLock exclusively, so no need for spinlock */
tail = pArray->tailKnownAssignedXids;
head = pArray->headKnownAssignedXids;
}
else
{
/* take spinlock to ensure we see up-to-date array contents */
SpinLockAcquire(&pArray->known_assigned_xids_lck);
tail = pArray->tailKnownAssignedXids;
head = pArray->headKnownAssignedXids;
SpinLockRelease(&pArray->known_assigned_xids_lck);
}
/*
* Standard binary search. Note we can ignore the KnownAssignedXidsValid
* array here, since even invalid entries will contain sorted XIDs.
*/
first = tail;
last = head - 1;
while (first <= last)
{
int mid_index;
TransactionId mid_xid;
mid_index = (first + last) / 2;
mid_xid = KnownAssignedXids[mid_index];
if (xid == mid_xid)
{
result_index = mid_index;
break;
}
else if (TransactionIdPrecedes(xid, mid_xid))
last = mid_index - 1;
else
first = mid_index + 1;
}
if (result_index < 0)
return false; /* not in array */
if (!KnownAssignedXidsValid[result_index])
return false; /* in array, but invalid */
if (remove)
{
KnownAssignedXidsValid[result_index] = false;
if (found)
pArray->numKnownAssignedXids--;
Assert(pArray->numKnownAssignedXids >= 0);
/*
* If we're removing the tail element then advance tail pointer over
* any invalid elements. This will speed future searches.
*/
if (result_index == tail)
{
KnownAssignedXidsDisplay(LOG);
LWLockRelease(ProcArrayLock);
elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
tail++;
while (tail < head && !KnownAssignedXidsValid[tail])
tail++;
if (tail >= head)
{
/* Array is empty, so we can reset both pointers */
pArray->headKnownAssignedXids = 0;
pArray->tailKnownAssignedXids = 0;
}
else
{
pArray->tailKnownAssignedXids = tail;
}
}
}
return true;
}
/*
* Is an xid present in KnownAssignedXids?
* Is the specified XID present in KnownAssignedXids[]?
*
* Must be called while holding ProcArrayLock in shared mode
* Caller must hold ProcArrayLock in shared or exclusive mode.
*/
static bool
KnownAssignedXidsExist(TransactionId xid)
KnownAssignedXidExists(TransactionId xid)
{
bool found;
Assert(TransactionIdIsValid(xid));
(void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
return found;
return KnownAssignedXidsSearch(xid, false);
}
/*
* Remove one xid from anywhere in KnownAssignedXids.
* Remove the specified XID from KnownAssignedXids[].
*
* Must be called while holding ProcArrayLock in Exclusive mode
* Caller must hold ProcArrayLock in exclusive mode.
*/
static void
KnownAssignedXidsRemove(TransactionId xid)
{
bool found;
Assert(TransactionIdIsValid(xid));
elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
(void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
if (found)
procArray->numKnownAssignedXids--;
Assert(procArray->numKnownAssignedXids >= 0);
/*
* We can fail to find an xid if the xid came from a subtransaction that
* aborts, though the xid hadn't yet been reported and no WAL records have
* been written using the subxid. In that case the abort record will
* contain that subxid and we haven't seen it before.
* Note: we cannot consider it an error to remove an XID that's not
* present. We intentionally remove subxact IDs while processing
* XLOG_XACT_ASSIGNMENT, to avoid array overflow. Then those XIDs
* will be removed again when the top-level xact commits or aborts.
*
* If we fail to find it for other reasons it might be a problem, but it
* isn't much use to log that it happened, since we can't divine much from
* just an isolated xid value.
* It might be possible to track such XIDs to distinguish this case
* from actual errors, but it would be complicated and probably not
* worth it. So, just ignore the search result.
*/
(void) KnownAssignedXidsSearch(xid, true);
}
/*
* KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
* We filter out anything higher than xmax.
* KnownAssignedXidsRemoveTree
* Remove xid (if it's not InvalidTransactionId) and all the subxids.
*
* Must be called while holding ProcArrayLock (in shared mode)
* Caller must hold ProcArrayLock in exclusive mode.
*/
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
static void
KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
TransactionId *subxids)
{
TransactionId xtmp = InvalidTransactionId;
int i;
return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
if (TransactionIdIsValid(xid))
KnownAssignedXidsRemove(xid);
for (i = 0; i < nsubxids; i++)
KnownAssignedXidsRemove(subxids[i]);
/* Opportunistically compress the array */
KnownAssignedXidsCompress(false);
}
/*
* KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
* to the lowest xid value seen if not already lower.
* Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
* then clear the whole table.
*
* Must be called while holding ProcArrayLock (in shared mode)
* Caller must hold ProcArrayLock in exclusive mode.
*/
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
TransactionId xmax)
static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)
{
HASH_SEQ_STATUS status;
TransactionId *knownXid;
int count = 0;
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
int count = 0;
int head, tail, i;
hash_seq_init(&status, KnownAssignedXidsHash);
while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
if (!TransactionIdIsValid(removeXid))
{
/*
* Filter out anything higher than xmax
*/
if (TransactionIdPrecedes(xmax, *knownXid))
continue;
elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
pArray->numKnownAssignedXids = 0;
pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
return;
}
*xarray = *knownXid;
xarray++;
count++;
elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
/* update xmin if required */
if (TransactionIdPrecedes(*knownXid, *xmin))
*xmin = *knownXid;
/*
* Mark entries invalid starting at the tail. Since array is sorted,
* we can stop as soon as we reach a entry >= removeXid.
*/
tail = pArray->tailKnownAssignedXids;
head = pArray->headKnownAssignedXids;
for (i = tail; i < head; i++)
{
if (KnownAssignedXidsValid[i])
{
TransactionId knownXid = KnownAssignedXids[i];
if (TransactionIdFollowsOrEquals(knownXid, removeXid))
break;
if (!StandbyTransactionIdIsPrepared(knownXid))
{
KnownAssignedXidsValid[i] = false;
count++;
}
}
}
return count;
pArray->numKnownAssignedXids -= count;
Assert(pArray->numKnownAssignedXids >= 0);
/*
* Advance the tail pointer if we've marked the tail item invalid.
*/
for (i = tail; i < head; i++)
{
if (KnownAssignedXidsValid[i])
break;
}
if (i >= head)
{
/* Array is empty, so we can reset both pointers */
pArray->headKnownAssignedXids = 0;
pArray->tailKnownAssignedXids = 0;
}
else
{
pArray->tailKnownAssignedXids = i;
}
/* Opportunistically compress the array */
KnownAssignedXidsCompress(false);
}
/*
* Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
* then clear the whole table.
* KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
* We filter out anything >= xmax.
*
* Returns the number of XIDs stored into xarray[]. Caller is responsible
* that array is large enough.
*
* Must be called while holding ProcArrayLock in Exclusive mode.
* Caller must hold ProcArrayLock in (at least) shared mode.
*/
static void
KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
{
TransactionId *knownXid;
HASH_SEQ_STATUS status;
TransactionId xtmp = InvalidTransactionId;
if (TransactionIdIsValid(xid))
elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
else
elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}
hash_seq_init(&status, KnownAssignedXidsHash);
while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
{
TransactionId removeXid = *knownXid;
bool found;
/*
* KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
* we reduce *xmin to the lowest xid value seen if not already lower.
*
* Caller must hold ProcArrayLock in (at least) shared mode.
*/
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
TransactionId xmax)
{
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
int count = 0;
int head, tail;
int i;
Assert(TransactionIdIsValid(xmax));
if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
/*
* Fetch head just once, since it may change while we loop.
* We can stop once we reach the initially seen head, since
* we are certain that an xid cannot enter and then leave the
* array while we hold ProcArrayLock. We might miss newly-added
* xids, but they should be >= xmax so irrelevant anyway.
*
* Must take spinlock to ensure we see up-to-date array contents.
*/
SpinLockAcquire(&pArray->known_assigned_xids_lck);
head = pArray->tailKnownAssignedXids;
tail = pArray->headKnownAssignedXids;
SpinLockRelease(&pArray->known_assigned_xids_lck);
for (i = tail; i < head; i++)
{
/* Skip any gaps in the array */
if (KnownAssignedXidsValid[i])
{
if (keepPreparedXacts && StandbyTransactionIdIsPrepared(removeXid))
continue;
else
{
(void) hash_search(KnownAssignedXidsHash, &removeXid,
HASH_REMOVE, &found);
if (found)
procArray->numKnownAssignedXids--;
Assert(procArray->numKnownAssignedXids >= 0);
}
TransactionId knownXid = KnownAssignedXids[i];
/*
* Update xmin if required. Only the first XID need be checked,
* since the array is sorted.
*/
if (count == 0 &&
TransactionIdPrecedes(knownXid, *xmin))
*xmin = knownXid;
/*
* Filter out anything >= xmax, again relying on sorted property
* of array.
*/
if (TransactionIdPrecedesOrEquals(xmax, knownXid))
break;
/* Add knownXid into output array */
xarray[count++] = knownXid;
}
}
return count;
}
/*
* Display KnownAssignedXids to provide debug trail
*
* Must be called while holding ProcArrayLock (in shared mode)
* Currently this is only called within startup process, so we need no
* special locking.
*
* Note this is pretty expensive, and much of the expense will be incurred
* even if the elog message will get discarded. It's not currently called
* in any performance-critical places, however, so no need to be tenser.
*/
static void
KnownAssignedXidsDisplay(int trace_level)
{
HASH_SEQ_STATUS status;
TransactionId *knownXid;
StringInfoData buf;
TransactionId *xids;
int nxids;
int i;
/* use volatile pointer to prevent code rearrangement */
volatile ProcArrayStruct *pArray = procArray;
StringInfoData buf;
int head, tail, i;
int nxids = 0;
xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
nxids = 0;
hash_seq_init(&status, KnownAssignedXidsHash);
while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
xids[nxids++] = *knownXid;
qsort(xids, nxids, sizeof(TransactionId), xidComparator);
tail = pArray->tailKnownAssignedXids;
head = pArray->headKnownAssignedXids;
initStringInfo(&buf);
for (i = 0; i < nxids; i++)
appendStringInfo(&buf, "%u ", xids[i]);
for (i = tail; i < head; i++)
{
if (KnownAssignedXidsValid[i])
{
nxids++;
appendStringInfo(&buf, "[%u]=%u ", i, KnownAssignedXids[i]);
}
}
elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s",
nxids,
pArray->numKnownAssignedXids,
pArray->tailKnownAssignedXids,
pArray->headKnownAssignedXids,
buf.data);
pfree(buf.data);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment