Commit f21bb9cf authored by Simon Riggs's avatar Simon Riggs

Refactor inCommit flag into generic delayChkpt flag.

Rename PGXACT->inCommit flag into delayChkpt flag,
and generalise comments to allow use in other situations,
such as the forthcoming potential use in checksum patch.
Replace wait loop to look for VXIDs with delayChkpt set.
No user visible changes, not behaviour changes at present.

Simon Riggs, reviewed and rebased by Jeff Davis
parent 7a764990
...@@ -318,7 +318,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, ...@@ -318,7 +318,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
proc->lxid = (LocalTransactionId) xid; proc->lxid = (LocalTransactionId) xid;
pgxact->xid = xid; pgxact->xid = xid;
pgxact->xmin = InvalidTransactionId; pgxact->xmin = InvalidTransactionId;
pgxact->inCommit = false; pgxact->delayChkpt = false;
pgxact->vacuumFlags = 0; pgxact->vacuumFlags = 0;
proc->pid = 0; proc->pid = 0;
proc->backendId = InvalidBackendId; proc->backendId = InvalidBackendId;
...@@ -1034,18 +1034,18 @@ EndPrepare(GlobalTransaction gxact) ...@@ -1034,18 +1034,18 @@ EndPrepare(GlobalTransaction gxact)
* odds of a PANIC actually occurring should be very tiny given that we * odds of a PANIC actually occurring should be very tiny given that we
* were able to write the bogus CRC above. * were able to write the bogus CRC above.
* *
* We have to set inCommit here, too; otherwise a checkpoint starting * We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without * immediately after the WAL record is inserted could complete without
* fsync'ing our state file. (This is essentially the same kind of race * fsync'ing our state file. (This is essentially the same kind of race
* condition as the COMMIT-to-clog-write case that RecordTransactionCommit * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
* uses inCommit for; see notes there.) * uses delayChkpt for; see notes there.)
* *
* We save the PREPARE record's location in the gxact for later use by * We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase. * CheckPointTwoPhase.
*/ */
START_CRIT_SECTION(); START_CRIT_SECTION();
MyPgXact->inCommit = true; MyPgXact->delayChkpt = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head); records.head);
...@@ -1086,7 +1086,7 @@ EndPrepare(GlobalTransaction gxact) ...@@ -1086,7 +1086,7 @@ EndPrepare(GlobalTransaction gxact)
* checkpoint starting after this will certainly see the gxact as a * checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing. * candidate for fsyncing.
*/ */
MyPgXact->inCommit = false; MyPgXact->delayChkpt = false;
END_CRIT_SECTION(); END_CRIT_SECTION();
...@@ -1972,7 +1972,7 @@ RecoverPreparedTransactions(void) ...@@ -1972,7 +1972,7 @@ RecoverPreparedTransactions(void)
* RecordTransactionCommitPrepared * RecordTransactionCommitPrepared
* *
* This is basically the same as RecordTransactionCommit: in particular, * This is basically the same as RecordTransactionCommit: in particular,
* we must set the inCommit flag to avoid a race condition. * we must set the delayChkpt flag to avoid a race condition.
* *
* We know the transaction made at least one XLOG entry (its PREPARE), * We know the transaction made at least one XLOG entry (its PREPARE),
* so it is never possible to optimize out the commit record. * so it is never possible to optimize out the commit record.
...@@ -1995,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ...@@ -1995,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION(); START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */ /* See notes in RecordTransactionCommit */
MyPgXact->inCommit = true; MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */ /* Emit the XLOG commit record */
xlrec.xid = xid; xlrec.xid = xid;
...@@ -2053,7 +2053,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ...@@ -2053,7 +2053,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children); TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */ /* Checkpoint can proceed now */
MyPgXact->inCommit = false; MyPgXact->delayChkpt = false;
END_CRIT_SECTION(); END_CRIT_SECTION();
......
...@@ -1001,13 +1001,13 @@ RecordTransactionCommit(void) ...@@ -1001,13 +1001,13 @@ RecordTransactionCommit(void)
* RecordTransactionAbort. That's because loss of a transaction abort * RecordTransactionAbort. That's because loss of a transaction abort
* is noncritical; the presumption would be that it aborted, anyway. * is noncritical; the presumption would be that it aborted, anyway.
* *
* It's safe to change the inCommit flag of our own backend without * It's safe to change the delayChkpt flag of our own backend without
* holding the ProcArrayLock, since we're the only one modifying it. * holding the ProcArrayLock, since we're the only one modifying it.
* This makes checkpoint's determination of which xacts are inCommit a * This makes checkpoint's determination of which xacts are delayChkpt a
* bit fuzzy, but it doesn't matter. * bit fuzzy, but it doesn't matter.
*/ */
START_CRIT_SECTION(); START_CRIT_SECTION();
MyPgXact->inCommit = true; MyPgXact->delayChkpt = true;
SetCurrentTransactionStopTimestamp(); SetCurrentTransactionStopTimestamp();
...@@ -1160,7 +1160,7 @@ RecordTransactionCommit(void) ...@@ -1160,7 +1160,7 @@ RecordTransactionCommit(void)
*/ */
if (markXidCommitted) if (markXidCommitted)
{ {
MyPgXact->inCommit = false; MyPgXact->delayChkpt = false;
END_CRIT_SECTION(); END_CRIT_SECTION();
} }
......
...@@ -6884,8 +6884,8 @@ CreateCheckPoint(int flags) ...@@ -6884,8 +6884,8 @@ CreateCheckPoint(int flags)
XLogRecData rdata; XLogRecData rdata;
uint32 freespace; uint32 freespace;
XLogSegNo _logSegNo; XLogSegNo _logSegNo;
TransactionId *inCommitXids; VirtualTransactionId *vxids;
int nInCommit; int nvxids;
/* /*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just * An end-of-recovery checkpoint is really a shutdown checkpoint, just
...@@ -7056,9 +7056,14 @@ CreateCheckPoint(int flags) ...@@ -7056,9 +7056,14 @@ CreateCheckPoint(int flags)
TRACE_POSTGRESQL_CHECKPOINT_START(flags); TRACE_POSTGRESQL_CHECKPOINT_START(flags);
/* /*
* Before flushing data, we must wait for any transactions that are * In some cases there are groups of actions that must all occur on
* currently in their commit critical sections. If an xact inserted its * one side or the other of a checkpoint record. Before flushing the
* commit record into XLOG just before the REDO point, then a crash * checkpoint record we must explicitly wait for any backend currently
* performing those groups of actions.
*
* One example is end of transaction, so we must wait for any transactions
* that are currently in commit critical sections. If an xact inserted
* its commit record into XLOG just before the REDO point, then a crash
* restart from the REDO point would not replay that record, which means * restart from the REDO point would not replay that record, which means
* that our flushing had better include the xact's update of pg_clog. So * that our flushing had better include the xact's update of pg_clog. So
* we wait till he's out of his commit critical section before proceeding. * we wait till he's out of his commit critical section before proceeding.
...@@ -7073,21 +7078,24 @@ CreateCheckPoint(int flags) ...@@ -7073,21 +7078,24 @@ CreateCheckPoint(int flags)
* protected by different locks, but again that seems best on grounds of * protected by different locks, but again that seems best on grounds of
* minimizing lock contention.) * minimizing lock contention.)
* *
* A transaction that has not yet set inCommit when we look cannot be at * A transaction that has not yet set delayChkpt when we look cannot be at
* risk, since he's not inserted his commit record yet; and one that's * risk, since he's not inserted his commit record yet; and one that's
* already cleared it is not at risk either, since he's done fixing clog * already cleared it is not at risk either, since he's done fixing clog
* and we will correctly flush the update below. So we cannot miss any * and we will correctly flush the update below. So we cannot miss any
* xacts we need to wait for. * xacts we need to wait for.
*/ */
nInCommit = GetTransactionsInCommit(&inCommitXids); vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
if (nInCommit > 0) if (nvxids > 0)
{ {
uint nwaits = 0;
do do
{ {
pg_usleep(10000L); /* wait for 10 msec */ pg_usleep(10000L); /* wait for 10 msec */
} while (HaveTransactionsInCommit(inCommitXids, nInCommit)); nwaits++;
} while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
} }
pfree(inCommitXids); pfree(vxids);
/* /*
* Get the other info we need for the checkpoint record. * Get the other info we need for the checkpoint record.
......
...@@ -400,7 +400,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) ...@@ -400,7 +400,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
pgxact->xmin = InvalidTransactionId; pgxact->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */ /* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
pgxact->inCommit = false; /* be sure this is cleared in abort */ pgxact->delayChkpt = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false; proc->recoveryConflictPending = false;
/* Clear the subtransaction-XID cache too while holding the lock */ /* Clear the subtransaction-XID cache too while holding the lock */
...@@ -427,7 +427,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) ...@@ -427,7 +427,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
pgxact->xmin = InvalidTransactionId; pgxact->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */ /* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
pgxact->inCommit = false; /* be sure this is cleared in abort */ pgxact->delayChkpt = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false; proc->recoveryConflictPending = false;
Assert(pgxact->nxids == 0); Assert(pgxact->nxids == 0);
...@@ -462,7 +462,7 @@ ProcArrayClearTransaction(PGPROC *proc) ...@@ -462,7 +462,7 @@ ProcArrayClearTransaction(PGPROC *proc)
/* redundant, but just in case */ /* redundant, but just in case */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
pgxact->inCommit = false; pgxact->delayChkpt = false;
/* Clear the subtransaction-XID cache too */ /* Clear the subtransaction-XID cache too */
pgxact->nxids = 0; pgxact->nxids = 0;
...@@ -1778,65 +1778,70 @@ GetOldestActiveTransactionId(void) ...@@ -1778,65 +1778,70 @@ GetOldestActiveTransactionId(void)
} }
/* /*
* GetTransactionsInCommit -- Get the XIDs of transactions that are committing * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
* delaying checkpoint because they have critical actions in progress.
* *
* Constructs an array of XIDs of transactions that are currently in commit * Constructs an array of VXIDs of transactions that are currently in commit
* critical sections, as shown by having inCommit set in their PGXACT entries. * critical sections, as shown by having delayChkpt set in their PGXACT.
* *
* *xids_p is set to a palloc'd array that should be freed by the caller. * Returns a palloc'd array that should be freed by the caller.
* The return value is the number of valid entries. * *nvxids is the number of valid entries.
* *
* Note that because backends set or clear inCommit without holding any lock, * Note that because backends set or clear delayChkpt without holding any lock,
* the result is somewhat indeterminate, but we don't really care. Even in * the result is somewhat indeterminate, but we don't really care. Even in
* a multiprocessor with delayed writes to shared memory, it should be certain * a multiprocessor with delayed writes to shared memory, it should be certain
* that setting of inCommit will propagate to shared memory when the backend * that setting of delayChkpt will propagate to shared memory when the backend
* takes the WALInsertLock, so we cannot fail to see an xact as inCommit if * takes a lock, so we cannot fail to see an virtual xact as delayChkpt if
* it's already inserted its commit record. Whether it takes a little while * it's already inserted its commit record. Whether it takes a little while
* for clearing of inCommit to propagate is unimportant for correctness. * for clearing of delayChkpt to propagate is unimportant for correctness.
*/ */
int VirtualTransactionId *
GetTransactionsInCommit(TransactionId **xids_p) GetVirtualXIDsDelayingChkpt(int *nvxids)
{ {
VirtualTransactionId *vxids;
ProcArrayStruct *arrayP = procArray; ProcArrayStruct *arrayP = procArray;
TransactionId *xids; int count = 0;
int nxids;
int index; int index;
xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId)); /* allocate what's certainly enough result space */
nxids = 0; vxids = (VirtualTransactionId *)
palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
LWLockAcquire(ProcArrayLock, LW_SHARED); LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++) for (index = 0; index < arrayP->numProcs; index++)
{ {
int pgprocno = arrayP->pgprocnos[index]; int pgprocno = arrayP->pgprocnos[index];
volatile PGXACT *pgxact = &allPgXact[pgprocno]; volatile PGPROC *proc = &allProcs[pgprocno];
TransactionId pxid; volatile PGXACT *pgxact = &allPgXact[pgprocno];
/* Fetch xid just once - see GetNewTransactionId */ if (pgxact->delayChkpt)
pxid = pgxact->xid; {
VirtualTransactionId vxid;
if (pgxact->inCommit && TransactionIdIsValid(pxid)) GET_VXID_FROM_PGPROC(vxid, *proc);
xids[nxids++] = pxid; if (VirtualTransactionIdIsValid(vxid))
vxids[count++] = vxid;
}
} }
LWLockRelease(ProcArrayLock); LWLockRelease(ProcArrayLock);
*xids_p = xids; *nvxids = count;
return nxids; return vxids;
} }
/* /*
* HaveTransactionsInCommit -- Are any of the specified XIDs in commit? * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
* *
* This is used with the results of GetTransactionsInCommit to see if any * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
* of the specified XIDs are still in their commit critical sections. * of the specified VXIDs are still in critical sections of code.
* *
* Note: this is O(N^2) in the number of xacts that are/were in commit, but * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
* those numbers should be small enough for it not to be a problem. * those numbers should be small enough for it not to be a problem.
*/ */
bool bool
HaveTransactionsInCommit(TransactionId *xids, int nxids) HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
{ {
bool result = false; bool result = false;
ProcArrayStruct *arrayP = procArray; ProcArrayStruct *arrayP = procArray;
...@@ -1844,30 +1849,32 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids) ...@@ -1844,30 +1849,32 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids)
LWLockAcquire(ProcArrayLock, LW_SHARED); LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++) while (VirtualTransactionIdIsValid(*vxids))
{ {
int pgprocno = arrayP->pgprocnos[index]; for (index = 0; index < arrayP->numProcs; index++)
volatile PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
pxid = pgxact->xid;
if (pgxact->inCommit && TransactionIdIsValid(pxid))
{ {
int i; int pgprocno = arrayP->pgprocnos[index];
volatile PGPROC *proc = &allProcs[pgprocno];
volatile PGXACT *pgxact = &allPgXact[pgprocno];
VirtualTransactionId vxid;
for (i = 0; i < nxids; i++) GET_VXID_FROM_PGPROC(vxid, *proc);
if (VirtualTransactionIdIsValid(vxid))
{ {
if (xids[i] == pxid) if (VirtualTransactionIdEquals(vxid, *vxids) &&
pgxact->delayChkpt)
{ {
result = true; result = true;
break; break;
} }
} }
if (result)
break;
} }
if (result)
break;
/* The virtual transaction is gone now, wait for the next one */
vxids++;
} }
LWLockRelease(ProcArrayLock); LWLockRelease(ProcArrayLock);
......
...@@ -350,7 +350,7 @@ InitProcess(void) ...@@ -350,7 +350,7 @@ InitProcess(void)
MyProc->backendId = InvalidBackendId; MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid; MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid; MyProc->roleId = InvalidOid;
MyPgXact->inCommit = false; MyPgXact->delayChkpt = false;
MyPgXact->vacuumFlags = 0; MyPgXact->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */ /* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess()) if (IsAutoVacuumWorkerProcess())
...@@ -516,7 +516,7 @@ InitAuxiliaryProcess(void) ...@@ -516,7 +516,7 @@ InitAuxiliaryProcess(void)
MyProc->backendId = InvalidBackendId; MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid; MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid; MyProc->roleId = InvalidOid;
MyPgXact->inCommit = false; MyPgXact->delayChkpt = false;
MyPgXact->vacuumFlags = 0; MyPgXact->vacuumFlags = 0;
MyProc->lwWaiting = false; MyProc->lwWaiting = false;
MyProc->lwWaitMode = 0; MyProc->lwWaitMode = 0;
......
...@@ -168,7 +168,8 @@ typedef struct PGXACT ...@@ -168,7 +168,8 @@ typedef struct PGXACT
uint8 vacuumFlags; /* vacuum-related flags, see above */ uint8 vacuumFlags; /* vacuum-related flags, see above */
bool overflowed; bool overflowed;
bool inCommit; /* true if within commit critical section */ bool delayChkpt; /* true if this proc delays checkpoint start */
/* previously called InCommit */
uint8 nxids; uint8 nxids;
} PGXACT; } PGXACT;
......
...@@ -52,8 +52,8 @@ extern bool TransactionIdIsActive(TransactionId xid); ...@@ -52,8 +52,8 @@ extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum); extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
extern TransactionId GetOldestActiveTransactionId(void); extern TransactionId GetOldestActiveTransactionId(void);
extern int GetTransactionsInCommit(TransactionId **xids_p); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids); extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
extern PGPROC *BackendPidGetProc(int pid); extern PGPROC *BackendPidGetProc(int pid);
extern int BackendXidGetPid(TransactionId xid); extern int BackendXidGetPid(TransactionId xid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment