Commit 4f1b890b authored by Andres Freund's avatar Andres Freund

Merge the various forms of transaction commit & abort records.

Since 465883b0 two versions of commit records have existed. A compact
version that was used when no cache invalidations, smgr unlinks and
similar were needed, and a full version that could deal with all
that. Additionally the full version was embedded into twophase commit
records.

That resulted in a measurable reduction in the size of the logged WAL in
some workloads. But more recently additions like logical decoding, which
e.g. needs information about the database something was executed on,
made it applicable in fewer situations. The static split generally made
it hard to expand the commit record, because concerns over the size made
it hard to add anything to the compact version.

Additionally it's not particularly pretty to have twophase.c insert
RM_XACT records.

Rejigger things so that the commit and abort records only have one form
each, including the twophase equivalents. The presence of the various
optional (in the sense of not being in every record) pieces is indicated
by a bits in the 'xinfo' flag.  That flag previously was not included in
compact commit records. To prevent an increase in size due to its
presence, it's only included if necessary; signalled by a bit in the
xl_info bits available for xact.c, similar to heapam.c's
XLOG_HEAP_OPMASK/XLOG_HEAP_INIT_PAGE.

Twophase commit/aborts are now the same as their normal
counterparts. The original transaction's xid is included in an optional
data field.

This means that commit records generally are smaller, except in the case
of a transaction with subtransactions, but no other special cases; the
increase there is four bytes, which seems acceptable given that the more
common case of not having subtransactions shrank.  The savings are
especially measurable for twophase commits, which previously always used
the full version; but will in practice only infrequently have required
that.

The motivation for this work are not the space savings and and
deduplication though; it's that it makes it easier to extend commit
records with additional information. That's just a few lines of code
now; without impacting the common case where that information is not
needed.

Discussion: 20150220152150.GD4149@awork2.anarazel.de,
    235610.92468.qm%40web29004.mail.ird.yahoo.com

Reviewed-By: Heikki Linnakangas, Simon Riggs
parent a0f5954a
......@@ -14,53 +14,188 @@
*/
#include "postgres.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
#include "utils/timestamp.h"
/*
* Parse the WAL format of a xact commit and abort records into a easier to
* understand format.
*
* This routines are in xactdesc.c because they're accessed in backend (when
* replaying WAL) and frontend (pg_xlogdump) code. This file is the only xact
* specific one shared between both. They're complicated enough that
* duplication would be bothersome.
*/
void
ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
{
char *data = ((char *) xlrec) + MinSizeOfXactCommit;
memset(parsed, 0, sizeof(*parsed));
parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
parsed->xact_time = xlrec->xact_time;
if (info & XLOG_XACT_HAS_INFO)
{
xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
parsed->xinfo = xl_xinfo->xinfo;
data += sizeof(xl_xact_xinfo);
}
if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
{
xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
parsed->dbId = xl_dbinfo->dbId;
parsed->tsId = xl_dbinfo->tsId;
data += sizeof(xl_xact_dbinfo);
}
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
parsed->nsubxacts = xl_subxacts->nsubxacts;
parsed->subxacts = xl_subxacts->subxacts;
data += MinSizeOfXactSubxacts;
data += parsed->nsubxacts * sizeof(TransactionId);
}
if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
{
xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
parsed->nrels = xl_relfilenodes->nrels;
parsed->xnodes = xl_relfilenodes->xnodes;
data += MinSizeOfXactRelfilenodes;
data += xl_relfilenodes->nrels * sizeof(RelFileNode);
}
if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
{
xl_xact_invals *xl_invals = (xl_xact_invals *) data;
parsed->nmsgs = xl_invals->nmsgs;
parsed->msgs = xl_invals->msgs;
data += MinSizeOfXactInvals;
data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
}
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
{
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
}
}
void
ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
{
char *data = ((char *) xlrec) + MinSizeOfXactAbort;
memset(parsed, 0, sizeof(*parsed));
parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
parsed->xact_time = xlrec->xact_time;
if (info & XLOG_XACT_HAS_INFO)
{
xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
parsed->xinfo = xl_xinfo->xinfo;
data += sizeof(xl_xact_xinfo);
}
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
parsed->nsubxacts = xl_subxacts->nsubxacts;
parsed->subxacts = xl_subxacts->subxacts;
data += MinSizeOfXactSubxacts;
data += parsed->nsubxacts * sizeof(TransactionId);
}
if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
{
xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
parsed->nrels = xl_relfilenodes->nrels;
parsed->xnodes = xl_relfilenodes->xnodes;
data += MinSizeOfXactRelfilenodes;
data += xl_relfilenodes->nrels * sizeof(RelFileNode);
}
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
{
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
}
}
static void
xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
{
xl_xact_parsed_commit parsed;
int i;
TransactionId *subxacts;
subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
ParseCommitRecord(info, xlrec, &parsed);
/* If this is a prepared xact, show the xid of the original xact */
if (TransactionIdIsValid(parsed.twophase_xid))
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
for (i = 0; i < xlrec->nrels; i++)
for (i = 0; i < parsed.nrels; i++)
{
char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
if (xlrec->nsubxacts > 0)
if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", subxacts[i]);
for (i = 0; i < parsed.nsubxacts; i++)
appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
if (xlrec->nmsgs > 0)
if (parsed.nmsgs > 0)
{
SharedInvalidationMessage *msgs;
msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
xlrec->dbId, xlrec->tsId);
parsed.dbId, parsed.tsId);
appendStringInfoString(buf, "; inval msgs:");
for (i = 0; i < xlrec->nmsgs; i++)
for (i = 0; i < parsed.nmsgs; i++)
{
SharedInvalidationMessage *msg = &msgs[i];
SharedInvalidationMessage *msg = &parsed.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
......@@ -80,48 +215,41 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfo(buf, "; sync");
}
static void
xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
xl_xact_parsed_abort parsed;
int i;
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
ParseAbortRecord(info, xlrec, &parsed);
if (xlrec->nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xlrec->subxacts[i]);
}
}
static void
xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
{
int i;
/* If this is a prepared xact, show the xid of the original xact */
if (TransactionIdIsValid(parsed.twophase_xid))
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
for (i = 0; i < xlrec->nrels; i++)
for (i = 0; i < parsed.nrels; i++)
{
char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
if (xlrec->nsubxacts > 0)
{
TransactionId *xacts = (TransactionId *)
&xlrec->xnodes[xlrec->nrels];
if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xacts[i]);
for (i = 0; i < parsed.nsubxacts; i++)
appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
}
......@@ -140,39 +268,19 @@ void
xact_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
xact_desc_commit_compact(buf, xlrec);
}
else if (info == XLOG_XACT_COMMIT)
if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
xact_desc_commit(buf, xlrec);
xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ABORT)
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
xact_desc_abort(buf, xlrec);
}
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
appendStringInfo(buf, "%u: ", xlrec->xid);
xact_desc_commit(buf, &xlrec->crec);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
appendStringInfo(buf, "%u: ", xlrec->xid);
xact_desc_abort(buf, &xlrec->arec);
xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
......@@ -193,7 +301,7 @@ xact_identify(uint8 info)
{
const char *id = NULL;
switch (info & ~XLR_INFO_MASK)
switch (info & XLOG_XACT_OPMASK)
{
case XLOG_XACT_COMMIT:
id = "COMMIT";
......@@ -213,9 +321,6 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
case XLOG_XACT_COMMIT_COMPACT:
id = "COMMIT_COMPACT";
break;
}
return id;
......
......@@ -2079,7 +2079,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
START_CRIT_SECTION();
......@@ -2088,36 +2087,11 @@ RecordTransactionCommitPrepared(TransactionId xid,
MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
xlrec.crec.dbId = MyDatabaseId;
xlrec.crec.tsId = MyDatabaseTableSpace;
xlrec.crec.xact_time = GetCurrentTimestamp();
xlrec.crec.nrels = nrels;
xlrec.crec.nsubxacts = nchildren;
xlrec.crec.nmsgs = ninvalmsgs;
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
/* dump rels to delete */
if (nrels > 0)
XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
/* dump committed child Xids */
if (nchildren > 0)
XLogRegisterData((char *) children,
nchildren * sizeof(TransactionId));
/* dump cache invalidation messages */
if (ninvalmsgs > 0)
XLogRegisterData((char *) invalmsgs,
ninvalmsgs * sizeof(SharedInvalidationMessage));
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
recptr = XactLogCommitRecord(GetCurrentTimestamp(),
nchildren, children, nrels, rels,
ninvalmsgs, invalmsgs,
initfileinval, false,
xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
......@@ -2160,7 +2134,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
/*
......@@ -2174,24 +2147,10 @@ RecordTransactionAbortPrepared(TransactionId xid,
START_CRIT_SECTION();
/* Emit the XLOG abort record */
xlrec.xid = xid;
xlrec.arec.xact_time = GetCurrentTimestamp();
xlrec.arec.nrels = nrels;
xlrec.arec.nsubxacts = nchildren;
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
/* dump rels to delete */
if (nrels > 0)
XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
/* dump committed child Xids */
if (nchildren > 0)
XLogRegisterData((char *) children,
nchildren * sizeof(TransactionId));
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
nchildren, children,
nrels, rels,
xid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
......
This diff is collapsed.
......@@ -5168,39 +5168,27 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
static bool
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
uint8 xact_info = info & XLOG_XACT_OPMASK;
uint8 rmid = XLogRecGetRmid(record);
if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
{
*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
return true;
}
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
xact_info == XLOG_XACT_COMMIT_PREPARED))
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
{
*recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
return true;
}
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
xact_info == XLOG_XACT_ABORT_PREPARED))
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
{
*recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
return true;
}
return false;
}
......@@ -5216,7 +5204,7 @@ static bool
recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
uint8 record_info;
uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
......@@ -5237,27 +5225,40 @@ recoveryStopsBefore(XLogReaderState *record)
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_COMMIT_PREPARED)
else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
isCommit = true;
recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
ParseCommitRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else if (record_info == XLOG_XACT_ABORT)
else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_ABORT_PREPARED)
else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
isCommit = false;
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
xl_xact_parsed_abort parsed;
isCommit = true;
ParseAbortRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else
return false;
......@@ -5325,11 +5326,12 @@ recoveryStopsBefore(XLogReaderState *record)
static bool
recoveryStopsAfter(XLogReaderState *record)
{
uint8 record_info;
uint8 info;
uint8 xact_info;
uint8 rmid;
TimestampTz recordXtime;
record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);
/*
......@@ -5337,7 +5339,7 @@ recoveryStopsAfter(XLogReaderState *record)
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
......@@ -5358,12 +5360,15 @@ recoveryStopsAfter(XLogReaderState *record)
}
}
if (rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED ||
record_info == XLOG_XACT_ABORT ||
record_info == XLOG_XACT_ABORT_PREPARED))
if (rmid != RM_XACT_ID)
return false;
xact_info = info & XLOG_XACT_OPMASK;
if (xact_info == XLOG_XACT_COMMIT ||
xact_info == XLOG_XACT_COMMIT_PREPARED ||
xact_info == XLOG_XACT_ABORT ||
xact_info == XLOG_XACT_ABORT_PREPARED)
{
TransactionId recordXid;
......@@ -5372,10 +5377,26 @@ recoveryStopsAfter(XLogReaderState *record)
SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
if (record_info == XLOG_XACT_COMMIT_PREPARED)
recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
else if (record_info == XLOG_XACT_ABORT_PREPARED)
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
ParseCommitRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else
recordXid = XLogRecGetXid(record);
......@@ -5396,17 +5417,16 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
if (record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)
if (xact_info == XLOG_XACT_COMMIT ||
xact_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else if (record_info == XLOG_XACT_ABORT ||
record_info == XLOG_XACT_ABORT_PREPARED)
else if (xact_info == XLOG_XACT_ABORT ||
xact_info == XLOG_XACT_ABORT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
......@@ -5494,7 +5514,7 @@ SetRecoveryPause(bool recoveryPause)
static bool
recoveryApplyDelay(XLogReaderState *record)
{
uint8 record_info;
uint8 xact_info;
TimestampTz xtime;
long secs;
int microsecs;
......@@ -5511,11 +5531,13 @@ recoveryApplyDelay(XLogReaderState *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)))
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT;
if (xact_info != XLOG_XACT_COMMIT &&
xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
if (!getRecordTimestamp(record, &xtime))
......
......@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
TransactionId xid, Oid dboid,
TimestampTz commit_time,
int nsubxacts, TransactionId *sub_xids,
int ninval_msgs, SharedInvalidationMessage *msg);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
TransactionId xid, TransactionId *sub_xids, int nsubxacts);
xl_xact_parsed_commit *parsed, TransactionId xid);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
......@@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
......@@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
case XLOG_XACT_COMMIT:
{
xl_xact_commit *xlrec;
TransactionId *subxacts = NULL;
SharedInvalidationMessage *invals = NULL;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
xlrec->xact_time,
xlrec->nsubxacts, subxacts,
xlrec->nmsgs, invals);
break;
}
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
TransactionId *subxacts;
SharedInvalidationMessage *invals = NULL;
/* Prepared commits contain a normal commit record... */
prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
xlrec = &prec->crec;
xl_xact_parsed_commit parsed;
TransactionId xid;
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
xlrec->xact_time,
xlrec->nsubxacts, subxacts,
xlrec->nmsgs, invals);
break;
}
case XLOG_XACT_COMMIT_COMPACT:
{
xl_xact_commit_compact *xlrec;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
xlrec->xact_time,
xlrec->nsubxacts, xlrec->subxacts,
0, NULL);
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ABORT:
{
xl_xact_abort *xlrec;
TransactionId *sub_xids;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
sub_xids, xlrec->nsubxacts);
break;
}
case XLOG_XACT_ABORT_PREPARED:
{
xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
TransactionId *sub_xids;
xl_xact_parsed_abort parsed;
TransactionId xid;
/* prepared abort contain a normal commit abort... */
prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
xlrec = &prec->arec;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
/* r->xl_xid is committed in a separate record */
DecodeAbort(ctx, buf->origptr, prec->xid,
sub_xids, xlrec->nsubxacts);
DecodeAbort(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
......@@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
TransactionId xid, Oid dboid,
TimestampTz commit_time,
int nsubxacts, TransactionId *sub_xids,
int ninval_msgs, SharedInvalidationMessage *msgs)
xl_xact_parsed_commit *parsed, TransactionId xid)
{
int i;
......@@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* transaction's contents, since the various caches need to always be
* consistent.
*/
if (ninval_msgs > 0)
if (parsed->nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
ninval_msgs, msgs);
parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
nsubxacts, sub_xids);
parsed->nsubxacts, parsed->subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
......@@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(dboid != InvalidOid && dboid != ctx->slot->data.database))
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
{
for (i = 0; i < nsubxacts; i++)
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
sub_xids++;
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
......@@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* tell the reorderbuffer about the surviving subtransactions */
for (i = 0; i < nsubxacts; i++)
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time);
parsed->xact_time);
}
/*
......@@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* snapbuild.c and reorderbuffer.c
*/
static void
DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
TransactionId *sub_xids, int nsubxacts)
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
parsed->nsubxacts, parsed->subxacts);
for (i = 0; i < nsubxacts; i++)
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
sub_xids++;
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
buf->record->EndRecPtr);
}
ReorderBufferAbort(ctx->reorder, xid, lsn);
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*
......
......@@ -18,6 +18,7 @@
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "storage/relfilenode.h"
#include "storage/sinval.h"
#include "utils/datetime.h"
......@@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
*/
/*
* XLOG allows to store some information in high 4 bits of log
* record xl_info field
* XLOG allows to store some information in high 4 bits of log record xl_info
* field. We use 3 for the opcode, and one about an optional flag variable.
*/
#define XLOG_XACT_COMMIT 0x00
#define XLOG_XACT_PREPARE 0x10
......@@ -112,7 +113,41 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
#define XLOG_XACT_COMMIT_COMPACT 0x60
/* free opcode 0x60 */
/* free opcode 0x70 */
/* mask for filtering opcodes out of xl_info */
#define XLOG_XACT_OPMASK 0x70
/* does this record have a 'xinfo' field or not */
#define XLOG_XACT_HAS_INFO 0x80
/*
* The following flags, stored in xinfo, determine which information is
* contained in commit/abort records.
*/
#define XACT_XINFO_HAS_DBINFO (1U << 0)
#define XACT_XINFO_HAS_SUBXACTS (1U << 1)
#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
#define XACT_XINFO_HAS_INVALS (1U << 3)
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
* need to occur when emulating transaction effects during recovery.
*
* They are named XactCompletion... to differentiate them from
* EOXact... routines which run at the end of the original transaction
* completion.
*/
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
/* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xinfo) \
(!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
#define XactCompletionForceSyncCommit(xinfo) \
(!!(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT))
typedef struct xl_xact_assignment
{
......@@ -123,85 +158,130 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
typedef struct xl_xact_commit_compact
/*
* Commit and abort records can contain a lot of information. But a large
* portion of the records won't need all possible pieces of information. So we
* only include what's needed.
*
* A minimal commit/abort record only consists out of a xl_xact_commit/abort
* struct. The presence of additional information is indicated by bits set in
* 'xl_xact_xinfo->xinfo'. The presence of the xinfo field itself is signalled
* by a set XLOG_XACT_HAS_INFO bit in the xl_info field.
*
* NB: All the individual data chunks should be be sized to multiples of
* sizeof(int) and only require int32 alignment.
*/
/* sub-records for commit/abort */
typedef struct xl_xact_xinfo
{
/*
* Even though we right now only require 1 byte of space in xinfo we use
* four so following records don't have to care about alignment. Commit
* records can be large, so copying large portions isn't attractive.
*/
uint32 xinfo;
} xl_xact_xinfo;
typedef struct xl_xact_dbinfo
{
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
} xl_xact_dbinfo;
typedef struct xl_xact_subxacts
{
TimestampTz xact_time; /* time of commit */
int nsubxacts; /* number of subtransaction XIDs */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
} xl_xact_commit_compact;
} xl_xact_subxacts;
#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
typedef struct xl_xact_relfilenodes
{
int nrels; /* number of subtransaction XIDs */
RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
} xl_xact_relfilenodes;
#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
typedef struct xl_xact_commit
typedef struct xl_xact_invals
{
TimestampTz xact_time; /* time of commit */
uint32 xinfo; /* info flags */
int nrels; /* number of RelFileNodes */
int nsubxacts; /* number of subtransaction XIDs */
int nmsgs; /* number of shared inval msgs */
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
/* Array of RelFileNode(s) to drop at commit */
RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
} xl_xact_commit;
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
} xl_xact_invals;
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
typedef struct xl_xact_twophase
{
TransactionId xid;
} xl_xact_twophase;
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
/*
* These flags are set in the xinfo fields of WAL commit records,
* indicating a variety of additional actions that need to occur
* when emulating transaction effects during recovery.
* They are named XactCompletion... to differentiate them from
* EOXact... routines which run at the end of the original
* transaction completion.
*/
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
typedef struct xl_xact_commit
{
TimestampTz xact_time; /* time of commit */
/* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
/* xl_xact_invals follows if XINFO_HAS_INVALS */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
} xl_xact_commit;
#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
typedef struct xl_xact_abort
{
TimestampTz xact_time; /* time of abort */
int nrels; /* number of RelFileNodes */
int nsubxacts; /* number of subtransaction XIDs */
/* Array of RelFileNode(s) to drop at abort */
RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
/* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
} xl_xact_abort;
/* Note the intentional lack of an invalidation message array c.f. commit */
#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
/* No db_info required */
/* xl_xact_subxacts follows if HAS_SUBXACT */
/* xl_xact_relfilenodes follows if HAS_RELFILENODES */
/* No invalidation messages needed. */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
} xl_xact_abort;
#define MinSizeOfXactAbort sizeof(xl_xact_abort)
/*
* COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
* except that we have to store the XID of the prepared transaction explicitly
* --- the XID in the record header will be invalid.
* Commit/Abort records in the above form are a bit verbose to parse, so
* there's a deconstructed versions generated by ParseCommit/AbortRecord() for
* easier consumption.
*/
typedef struct xl_xact_commit_prepared
typedef struct xl_xact_parsed_commit
{
TransactionId xid; /* XID of prepared xact */
xl_xact_commit crec; /* COMMIT record */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_commit_prepared;
TimestampTz xact_time;
uint32 xinfo;
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
int nsubxacts;
TransactionId *subxacts;
typedef struct xl_xact_abort_prepared
int nrels;
RelFileNode *xnodes;
int nmsgs;
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
} xl_xact_parsed_commit;
typedef struct xl_xact_parsed_abort
{
TransactionId xid; /* XID of prepared xact */
xl_xact_abort arec; /* ABORT record */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_abort_prepared;
TimestampTz xact_time;
uint32 xinfo;
int nsubxacts;
TransactionId *subxacts;
#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
int nrels;
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
} xl_xact_parsed_abort;
/* ----------------
......@@ -256,8 +336,25 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
extern int xactGetCommittedChildren(TransactionId **ptr);
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
TransactionId twophase_xid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
TransactionId twophase_xid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */
extern void xact_desc(StringInfo buf, XLogReaderState *record);
extern const char *xact_identify(uint8 info);
/* also in xactdesc.c, so they can be shared between front/backend code */
extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
#endif /* XACT_H */
......@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD082 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment