Commit 5c117258 authored by Simon Riggs's avatar Simon Riggs

Rearrange storage of data in xl_running_xacts.

Previously we stored all xids mixed together.
Now we store top-level xids first, followed
by all subxids. Also skip logging any subxids
if the snapshot is suboverflowed, since there
are potentially large numbers of them and they
are not useful in that case anyway. Has value
in the envisaged design for decoding of WAL.
No planned effect on Hot Standby.

Andres Freund, reviewed by me
parent c1113069
...@@ -5631,6 +5631,7 @@ StartupXLOG(void) ...@@ -5631,6 +5631,7 @@ StartupXLOG(void)
* subxids are listed with their parent prepared transactions. * subxids are listed with their parent prepared transactions.
*/ */
running.xcnt = nxids; running.xcnt = nxids;
running.subxcnt = 0;
running.subxid_overflow = false; running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid; running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID; running.oldestRunningXid = oldestActiveXID;
...@@ -7834,6 +7835,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -7834,6 +7835,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
* with their parent prepared transactions. * with their parent prepared transactions.
*/ */
running.xcnt = nxids; running.xcnt = nxids;
running.subxcnt = 0;
running.subxid_overflow = false; running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid; running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID; running.oldestRunningXid = oldestActiveXID;
......
...@@ -501,6 +501,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) ...@@ -501,6 +501,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
* Remove stale transactions, if any. * Remove stale transactions, if any.
*/ */
ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid); ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
/*
* Remove stale locks, if any.
*
* Locks are always assigned to the toplevel xid so we don't need to care
* about subxcnt/subxids (and by extension not about ->suboverflowed).
*/
StandbyReleaseOldLocks(running->xcnt, running->xids); StandbyReleaseOldLocks(running->xcnt, running->xids);
/* /*
...@@ -581,13 +588,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) ...@@ -581,13 +588,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
* Allocate a temporary array to avoid modifying the array passed as * Allocate a temporary array to avoid modifying the array passed as
* argument. * argument.
*/ */
xids = palloc(sizeof(TransactionId) * running->xcnt); xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
/* /*
* Add to the temp array any xids which have not already completed. * Add to the temp array any xids which have not already completed.
*/ */
nxids = 0; nxids = 0;
for (i = 0; i < running->xcnt; i++) for (i = 0; i < running->xcnt + running->subxcnt; i++)
{ {
TransactionId xid = running->xids[i]; TransactionId xid = running->xids[i];
...@@ -1627,15 +1634,13 @@ GetRunningTransactionData(void) ...@@ -1627,15 +1634,13 @@ GetRunningTransactionData(void)
oldestRunningXid = ShmemVariableCache->nextXid; oldestRunningXid = ShmemVariableCache->nextXid;
/* /*
* Spin over procArray collecting all xids and subxids. * Spin over procArray collecting all xids
*/ */
for (index = 0; index < arrayP->numProcs; index++) for (index = 0; index < arrayP->numProcs; index++)
{ {
int pgprocno = arrayP->pgprocnos[index]; int pgprocno = arrayP->pgprocnos[index];
volatile PGPROC *proc = &allProcs[pgprocno];
volatile PGXACT *pgxact = &allPgXact[pgprocno]; volatile PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid; TransactionId xid;
int nxids;
/* Fetch xid just once - see GetNewTransactionId */ /* Fetch xid just once - see GetNewTransactionId */
xid = pgxact->xid; xid = pgxact->xid;
...@@ -1652,30 +1657,46 @@ GetRunningTransactionData(void) ...@@ -1652,30 +1657,46 @@ GetRunningTransactionData(void)
if (TransactionIdPrecedes(xid, oldestRunningXid)) if (TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid; oldestRunningXid = xid;
/* if (pgxact->overflowed)
* Save subtransaction XIDs. Other backends can't add or remove suboverflowed = true;
* entries while we're holding XidGenLock. }
*/
nxids = pgxact->nxids;
if (nxids > 0)
{
memcpy(&xids[count], (void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
count += nxids;
subcount += nxids;
if (pgxact->overflowed) /*
suboverflowed = true; * Spin over procArray collecting all subxids, but only if there hasn't
* been a suboverflow.
*/
if (!suboverflowed)
{
for (index = 0; index < arrayP->numProcs; index++)
{
int pgprocno = arrayP->pgprocnos[index];
volatile PGPROC *proc = &allProcs[pgprocno];
volatile PGXACT *pgxact = &allPgXact[pgprocno];
int nxids;
/* /*
* Top-level XID of a transaction is always less than any of its * Save subtransaction XIDs. Other backends can't add or remove
* subxids, so we don't need to check if any of the subxids are * entries while we're holding XidGenLock.
* smaller than oldestRunningXid
*/ */
nxids = pgxact->nxids;
if (nxids > 0)
{
memcpy(&xids[count], (void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
count += nxids;
subcount += nxids;
/*
* Top-level XID of a transaction is always less than any of
* its subxids, so we don't need to check if any of the subxids
* are smaller than oldestRunningXid
*/
}
} }
} }
CurrentRunningXacts->xcnt = count; CurrentRunningXacts->xcnt = count - subcount;
CurrentRunningXacts->subxcnt = subcount;
CurrentRunningXacts->subxid_overflow = suboverflowed; CurrentRunningXacts->subxid_overflow = suboverflowed;
CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid; CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
......
...@@ -778,6 +778,7 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -778,6 +778,7 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
RunningTransactionsData running; RunningTransactionsData running;
running.xcnt = xlrec->xcnt; running.xcnt = xlrec->xcnt;
running.subxcnt = xlrec->subxcnt;
running.subxid_overflow = xlrec->subxid_overflow; running.subxid_overflow = xlrec->subxid_overflow;
running.nextXid = xlrec->nextXid; running.nextXid = xlrec->nextXid;
running.latestCompletedXid = xlrec->latestCompletedXid; running.latestCompletedXid = xlrec->latestCompletedXid;
...@@ -897,6 +898,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) ...@@ -897,6 +898,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
XLogRecPtr recptr; XLogRecPtr recptr;
xlrec.xcnt = CurrRunningXacts->xcnt; xlrec.xcnt = CurrRunningXacts->xcnt;
xlrec.subxcnt = CurrRunningXacts->subxcnt;
xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow; xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
xlrec.nextXid = CurrRunningXacts->nextXid; xlrec.nextXid = CurrRunningXacts->nextXid;
xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid; xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
...@@ -912,7 +914,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) ...@@ -912,7 +914,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
{ {
rdata[0].next = &(rdata[1]); rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) CurrRunningXacts->xids; rdata[1].data = (char *) CurrRunningXacts->xids;
rdata[1].len = xlrec.xcnt * sizeof(TransactionId); rdata[1].len = (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId);
rdata[1].buffer = InvalidBuffer; rdata[1].buffer = InvalidBuffer;
lastrdata = 1; lastrdata = 1;
} }
...@@ -931,8 +933,8 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) ...@@ -931,8 +933,8 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
CurrRunningXacts->nextXid); CurrRunningXacts->nextXid);
else else
elog(trace_recovery(DEBUG2), elog(trace_recovery(DEBUG2),
"snapshot of %u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)", "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
CurrRunningXacts->xcnt, CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
(uint32) (recptr >> 32), (uint32) recptr, (uint32) (recptr >> 32), (uint32) recptr,
CurrRunningXacts->oldestRunningXid, CurrRunningXacts->oldestRunningXid,
CurrRunningXacts->latestCompletedXid, CurrRunningXacts->latestCompletedXid,
......
...@@ -68,6 +68,7 @@ typedef struct xl_standby_locks ...@@ -68,6 +68,7 @@ typedef struct xl_standby_locks
typedef struct xl_running_xacts typedef struct xl_running_xacts
{ {
int xcnt; /* # of xact ids in xids[] */ int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */ bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId oldestRunningXid; /* *not* oldestXmin */
...@@ -98,6 +99,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec); ...@@ -98,6 +99,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec);
typedef struct RunningTransactionsData typedef struct RunningTransactionsData
{ {
int xcnt; /* # of xact ids in xids[] */ int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */ bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId oldestRunningXid; /* *not* oldestXmin */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment