Commit e74e0906 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Treat 2PC commit/abort the same as regular xacts in recovery.

There were several oversights in recovery code where COMMIT/ABORT PREPARED
records were ignored:

* pg_last_xact_replay_timestamp() (wasn't updated for 2PC commits)
* recovery_min_apply_delay (2PC commits were applied immediately)
* recovery_target_xid (recovery would not stop if the XID used 2PC)

The first of those was reported by Sergiy Zuban in bug #11032, analyzed by
Tom Lane and Andres Freund. The bug was always there, but was masked before
commit d19bd29f, because COMMIT PREPARED
always created an extra regular transaction that was WAL-logged.

Backpatch to all supported versions (older versions didn't have all the
features and therefore didn't have all of the above bugs).
parent 61e48efb
...@@ -5457,11 +5457,21 @@ getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime) ...@@ -5457,11 +5457,21 @@ getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time; *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true; return true;
} }
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
{
*recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
return true;
}
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT) if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
{ {
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time; *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true; return true;
} }
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
{
*recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
return true;
}
return false; return false;
} }
...@@ -5480,6 +5490,7 @@ recoveryStopsBefore(XLogRecord *record) ...@@ -5480,6 +5490,7 @@ recoveryStopsBefore(XLogRecord *record)
uint8 record_info; uint8 record_info;
bool isCommit; bool isCommit;
TimestampTz recordXtime = 0; TimestampTz recordXtime = 0;
TransactionId recordXid;
/* Check if we should stop as soon as reaching consistency */ /* Check if we should stop as soon as reaching consistency */
if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency) if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
...@@ -5498,10 +5509,27 @@ recoveryStopsBefore(XLogRecord *record) ...@@ -5498,10 +5509,27 @@ recoveryStopsBefore(XLogRecord *record)
if (record->xl_rmid != RM_XACT_ID) if (record->xl_rmid != RM_XACT_ID)
return false; return false;
record_info = record->xl_info & ~XLR_INFO_MASK; record_info = record->xl_info & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = record->xl_xid;
}
if (record_info == XLOG_XACT_COMMIT_PREPARED)
{
isCommit = true; isCommit = true;
recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
}
else if (record_info == XLOG_XACT_ABORT) else if (record_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = record->xl_xid;
}
else if (record_info == XLOG_XACT_ABORT_PREPARED)
{
isCommit = false; isCommit = false;
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
}
else else
return false; return false;
...@@ -5516,7 +5544,7 @@ recoveryStopsBefore(XLogRecord *record) ...@@ -5516,7 +5544,7 @@ recoveryStopsBefore(XLogRecord *record)
* they complete. A higher numbered xid will complete before you about * they complete. A higher numbered xid will complete before you about
* 50% of the time... * 50% of the time...
*/ */
stopsHere = (record->xl_xid == recoveryTargetXid); stopsHere = (recordXid == recoveryTargetXid);
} }
if (recoveryTarget == RECOVERY_TARGET_TIME && if (recoveryTarget == RECOVERY_TARGET_TIME &&
...@@ -5536,7 +5564,7 @@ recoveryStopsBefore(XLogRecord *record) ...@@ -5536,7 +5564,7 @@ recoveryStopsBefore(XLogRecord *record)
if (stopsHere) if (stopsHere)
{ {
recoveryStopAfter = false; recoveryStopAfter = false;
recoveryStopXid = record->xl_xid; recoveryStopXid = recordXid;
recoveryStopTime = recordXtime; recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0'; recoveryStopName[0] = '\0';
...@@ -5602,12 +5630,24 @@ recoveryStopsAfter(XLogRecord *record) ...@@ -5602,12 +5630,24 @@ recoveryStopsAfter(XLogRecord *record)
if (record->xl_rmid == RM_XACT_ID && if (record->xl_rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT || (record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT || record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_ABORT)) record_info == XLOG_XACT_COMMIT_PREPARED ||
record_info == XLOG_XACT_ABORT ||
record_info == XLOG_XACT_ABORT_PREPARED))
{ {
TransactionId recordXid;
/* Update the last applied transaction timestamp */ /* Update the last applied transaction timestamp */
if (getRecordTimestamp(record, &recordXtime)) if (getRecordTimestamp(record, &recordXtime))
SetLatestXTime(recordXtime); SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
if (record_info == XLOG_XACT_COMMIT_PREPARED)
recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
else if (record_info == XLOG_XACT_ABORT_PREPARED)
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
else
recordXid = record->xl_xid;
/* /*
* There can be only one transaction end record with this exact * There can be only one transaction end record with this exact
* transactionid * transactionid
...@@ -5618,21 +5658,24 @@ recoveryStopsAfter(XLogRecord *record) ...@@ -5618,21 +5658,24 @@ recoveryStopsAfter(XLogRecord *record)
* 50% of the time... * 50% of the time...
*/ */
if (recoveryTarget == RECOVERY_TARGET_XID && recoveryTargetInclusive && if (recoveryTarget == RECOVERY_TARGET_XID && recoveryTargetInclusive &&
record->xl_xid == recoveryTargetXid) recordXid == recoveryTargetXid)
{ {
recoveryStopAfter = true; recoveryStopAfter = true;
recoveryStopXid = record->xl_xid; recoveryStopXid = recordXid;
recoveryStopTime = recordXtime; recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0'; recoveryStopName[0] = '\0';
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) if (record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)
{ {
ereport(LOG, ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s", (errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid, recoveryStopXid,
timestamptz_to_str(recoveryStopTime)))); timestamptz_to_str(recoveryStopTime))));
} }
else if (record_info == XLOG_XACT_ABORT) else if (record_info == XLOG_XACT_ABORT ||
record_info == XLOG_XACT_ABORT_PREPARED)
{ {
ereport(LOG, ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s", (errmsg("recovery stopping after abort of transaction %u, time %s",
...@@ -5745,7 +5788,8 @@ recoveryApplyDelay(XLogRecord *record) ...@@ -5745,7 +5788,8 @@ recoveryApplyDelay(XLogRecord *record)
record_info = record->xl_info & ~XLR_INFO_MASK; record_info = record->xl_info & ~XLR_INFO_MASK;
if (!(record->xl_rmid == RM_XACT_ID && if (!(record->xl_rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT || (record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT))) record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)))
return false; return false;
if (!getRecordTimestamp(record, &xtime)) if (!getRecordTimestamp(record, &xtime))
......
...@@ -180,8 +180,7 @@ typedef struct xl_xact_abort ...@@ -180,8 +180,7 @@ typedef struct xl_xact_abort
/* /*
* COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
* except that we have to store the XID of the prepared transaction explicitly * except that we have to store the XID of the prepared transaction explicitly
* --- the XID in the record header will be for the transaction doing the * --- the XID in the record header will be invalid.
* COMMIT PREPARED or ABORT PREPARED command.
*/ */
typedef struct xl_xact_commit_prepared typedef struct xl_xact_commit_prepared
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment