Commit 7b8a899b authored by Fujii Masao's avatar Fujii Masao

Make pg_waldump report more detail information about PREPARE TRANSACTION record.

This commit changes xact_desc() so that it reports the detail information about
PREPARE TRANSACTION record, like GID (global transaction identifier),
timestamp at prepare transaction, delete-on-abort/commit relations,
XID of subtransactions, and invalidation messages. These are helpful
when diagnosing 2PC-related troubles.

Author: Fujii Masao
Reviewed-by: Michael Paquier, Andrey Lepikhov, Kyotaro Horiguchi, Julien Rouhaud, Alvaro Herrera
Discussion: https://postgr.es/m/CAHGQGwEvhASad4JJnCv=0dW2TJypZgW_Vpb-oZik2a3utCqcrA@mail.gmail.com
parent 94fec485
......@@ -102,6 +102,10 @@ standby_desc_invalidations(StringInfo buf,
{
int i;
/* Do nothing if there are no invalidation messages */
if (nmsgs <= 0)
return;
if (relcacheInitFileInval)
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
dbId, tsId);
......
......@@ -209,43 +209,95 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
}
}
static void
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
/*
* ParsePrepareRecord
*/
void
ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed)
{
xl_xact_parsed_commit parsed;
int i;
char *bufptr;
ParseCommitRecord(info, xlrec, &parsed);
bufptr = ((char *) xlrec) + MAXALIGN(sizeof(xl_xact_prepare));
/* If this is a prepared xact, show the xid of the original xact */
if (TransactionIdIsValid(parsed.twophase_xid))
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
memset(parsed, 0, sizeof(*parsed));
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
parsed->xact_time = xlrec->prepared_at;
parsed->origin_lsn = xlrec->origin_lsn;
parsed->origin_timestamp = xlrec->origin_timestamp;
parsed->twophase_xid = xlrec->xid;
parsed->dbId = xlrec->database;
parsed->nsubxacts = xlrec->nsubxacts;
parsed->nrels = xlrec->ncommitrels;
parsed->nabortrels = xlrec->nabortrels;
parsed->nmsgs = xlrec->ninvalmsgs;
strncpy(parsed->twophase_gid, bufptr, xlrec->gidlen);
bufptr += MAXALIGN(xlrec->gidlen);
parsed->subxacts = (TransactionId *) bufptr;
bufptr += MAXALIGN(xlrec->nsubxacts * sizeof(TransactionId));
parsed->xnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(xlrec->ncommitrels * sizeof(RelFileNode));
parsed->abortnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(xlrec->nabortrels * sizeof(RelFileNode));
if (parsed.nrels > 0)
parsed->msgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(xlrec->ninvalmsgs * sizeof(SharedInvalidationMessage));
}
static void
xact_desc_relations(StringInfo buf, char *label, int nrels,
RelFileNode *xnodes)
{
int i;
if (nrels > 0)
{
appendStringInfoString(buf, "; rels:");
for (i = 0; i < parsed.nrels; i++)
appendStringInfo(buf, "; %s:", label);
for (i = 0; i < nrels; i++)
{
char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
char *path = relpathperm(xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
if (parsed.nsubxacts > 0)
}
static void
xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
{
int i;
if (nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
for (i = 0; i < parsed.nsubxacts; i++)
appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
if (parsed.nmsgs > 0)
{
standby_desc_invalidations(
buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
XactCompletionRelcacheInitFileInval(parsed.xinfo));
for (i = 0; i < nsubxacts; i++)
appendStringInfo(buf, " %u", subxacts[i]);
}
}
static void
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
{
xl_xact_parsed_commit parsed;
ParseCommitRecord(info, xlrec, &parsed);
/* If this is a prepared xact, show the xid of the original xact */
if (TransactionIdIsValid(parsed.twophase_xid))
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
standby_desc_invalidations(
buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
XactCompletionRelcacheInitFileInval(parsed.xinfo));
if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfoString(buf, "; sync");
......@@ -264,7 +316,6 @@ static void
xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
xl_xact_parsed_abort parsed;
int i;
ParseAbortRecord(info, xlrec, &parsed);
......@@ -273,24 +324,29 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
for (i = 0; i < parsed.nrels; i++)
{
char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
}
if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
for (i = 0; i < parsed.nsubxacts; i++)
appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
static void
xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec)
{
xl_xact_parsed_prepare parsed;
ParsePrepareRecord(info, xlrec, &parsed);
appendStringInfo(buf, "gid %s: ", parsed.twophase_gid);
appendStringInfoString(buf, timestamptz_to_str(parsed.xact_time));
xact_desc_relations(buf, "rels(commit)", parsed.nrels, parsed.xnodes);
xact_desc_relations(buf, "rels(abort)", parsed.nabortrels,
parsed.abortnodes);
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
standby_desc_invalidations(
buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
xlrec->initfileinval);
}
static void
......@@ -323,6 +379,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_PREPARE)
{
xl_xact_prepare *xlrec = (xl_xact_prepare *) rec;
xact_desc_prepare(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
......
......@@ -910,23 +910,7 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
*/
#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
uint32 magic; /* format identifier */
uint32 total_len; /* actual file length */
TransactionId xid; /* original transaction XID */
Oid database; /* OID of database it was in */
TimestampTz prepared_at; /* time of preparation */
Oid owner; /* user running the transaction */
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
typedef xl_xact_prepare TwoPhaseFileHeader;
/*
* Header for each record in a state file
......@@ -1331,44 +1315,6 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
return buf;
}
/*
* ParsePrepareRecord
*/
void
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
{
TwoPhaseFileHeader *hdr;
char *bufptr;
hdr = (TwoPhaseFileHeader *) xlrec;
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
parsed->origin_lsn = hdr->origin_lsn;
parsed->origin_timestamp = hdr->origin_timestamp;
parsed->twophase_xid = hdr->xid;
parsed->dbId = hdr->database;
parsed->nsubxacts = hdr->nsubxacts;
parsed->nrels = hdr->ncommitrels;
parsed->nabortrels = hdr->nabortrels;
parsed->nmsgs = hdr->ninvalmsgs;
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
bufptr += MAXALIGN(hdr->gidlen);
parsed->subxacts = (TransactionId *) bufptr;
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
parsed->xnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
parsed->abortnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
parsed->msgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
}
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
......
......@@ -47,8 +47,6 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
extern void ParsePrepareRecord(uint8 info, char *xlrec,
xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
......
......@@ -292,6 +292,24 @@ typedef struct xl_xact_abort
} xl_xact_abort;
#define MinSizeOfXactAbort sizeof(xl_xact_abort)
typedef struct xl_xact_prepare
{
uint32 magic; /* format identifier */
uint32 total_len; /* actual file length */
TransactionId xid; /* original transaction XID */
Oid database; /* OID of database it was in */
TimestampTz prepared_at; /* time of preparation */
Oid owner; /* user running the transaction */
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
TimestampTz origin_timestamp; /* time of prepare at origin node */
} xl_xact_prepare;
/*
* Commit/Abort records in the above form are a bit verbose to parse, so
* there's a deconstructed versions generated by ParseCommit/AbortRecord() for
......@@ -435,6 +453,7 @@ extern const char *xact_identify(uint8 info);
/* also in xactdesc.c, so they can be shared between front/backend code */
extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed);
extern void EnterParallelMode(void);
extern void ExitParallelMode(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment