Commit aa203e76 authored by Simon Riggs's avatar Simon Riggs

Don’t push nextid too far forwards in recovery

Doing so allows various crash possibilities. Fix by avoiding
having PrescanPreparedTransactions() increment
ShmemVariableCache->nextXid when it has no 2PC files

Bug found by Jeff Janes, diagnosis and patch by Pavan Deolasee,
then patch re-designed for clarity and full accuracy by
Michael Paquier.

Reported-by: Jeff Janes
Author: Pavan Deolasee, Michael Paquier
Discussion: https://postgr.es/m/CAMkU=1zMLnH_i1-PVQ-biZzvNx7VcuatriquEnh7HNk6K8Ss3Q@mail.gmail.com
parent 51175f36
......@@ -222,7 +222,7 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
static char *ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool overwriteOK, bool setParent,
TransactionId *result, TransactionId *maxsubxid);
bool setNextXid);
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
const char *gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
......@@ -1744,7 +1744,7 @@ restoreTwoPhaseData(void)
buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
true, false, false,
NULL, NULL);
false);
if (buf == NULL)
continue;
......@@ -1786,7 +1786,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
TransactionId maxsubxid = origNextXid;
TransactionId *xids = NULL;
int nxids = 0;
int allocsize = 0;
......@@ -1806,11 +1805,18 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, false, false,
&result, &maxsubxid);
true);
if (buf == NULL)
continue;
/*
* OK, we think this file is valid. Incorporate xid into the
* running-minimum result.
*/
if (TransactionIdPrecedes(xid, result))
result = xid;
if (xids_p)
{
if (nxids == allocsize)
......@@ -1839,15 +1845,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
*nxids_p = nxids;
}
/* update nextXid if needed */
if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = maxsubxid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
return result;
}
......@@ -1884,7 +1881,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, overwriteOK, true,
NULL, NULL);
false);
if (buf != NULL)
pfree(buf);
}
......@@ -1924,7 +1921,7 @@ RecoverPreparedTransactions(void)
buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, false, false,
NULL, NULL);
false);
if (buf == NULL)
continue;
......@@ -2012,20 +2009,16 @@ RecoverPreparedTransactions(void)
* If setParent is true, then use the overwriteOK parameter to set up
* subtransaction parent linkages.
*
* If result and maxsubxid are not NULL, fill them up with smallest
* running transaction id (lesser than ShmemVariableCache->nextXid)
* and largest subtransaction id for this transaction respectively.
* If setNextXid is true, set ShmemVariableCache->nextXid to the newest
* value scanned.
*/
static char *
ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool overwriteOK,
bool setParent, TransactionId *result,
TransactionId *maxsubxid)
bool setParent, bool setNextXid)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId res = InvalidTransactionId;
TransactionId maxsub = InvalidTransactionId;
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
......@@ -2034,11 +2027,6 @@ ProcessTwoPhaseBuffer(TransactionId xid,
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
if (result)
res = *result;
if (maxsubxid)
maxsub = *maxsubxid;
/* Already processed? */
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
{
......@@ -2120,13 +2108,6 @@ ProcessTwoPhaseBuffer(TransactionId xid,
return NULL;
}
/*
* OK, we think this buffer is valid. Incorporate xid into the
* running-minimum result.
*/
if (TransactionIdPrecedes(xid, res))
res = xid;
/*
* Examine subtransaction XIDs ... they should all follow main
* XID, and they may force us to advance nextXid.
......@@ -2139,17 +2120,31 @@ ProcessTwoPhaseBuffer(TransactionId xid,
TransactionId subxid = subxids[i];
Assert(TransactionIdFollows(subxid, xid));
if (TransactionIdFollowsOrEquals(subxid, maxsub))
maxsub = subxid;
/* update nextXid if needed */
if (setNextXid &&
TransactionIdFollowsOrEquals(subxid,
ShmemVariableCache->nextXid))
{
/*
* We don't expect anyone else to modify nextXid, hence we don't
* need to hold a lock while examining it. We still acquire the
* lock to modify it, though, so we recheck.
*/
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
if (TransactionIdFollowsOrEquals(subxid,
ShmemVariableCache->nextXid))
{
ShmemVariableCache->nextXid = subxid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
}
LWLockRelease(XidGenLock);
}
if (setParent)
SubTransSetParent(xid, subxid, overwriteOK);
}
if (result)
*result = res;
if (maxsubxid)
*maxsubxid = maxsub;
return buf;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment