Commit 0bead9af authored by Amit Kapila's avatar Amit Kapila

Immediately WAL-log subtransaction and top-level XID association.

The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.

So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead) only when wal_level=logical.
We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is
required for avoiding overflow in the hot standby snapshot.

Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID.

Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent d05b172a
...@@ -191,6 +191,7 @@ typedef struct TransactionStateData ...@@ -191,6 +191,7 @@ typedef struct TransactionStateData
bool didLogXid; /* has xid been included in WAL record? */ bool didLogXid; /* has xid been included in WAL record? */
int parallelModeLevel; /* Enter/ExitParallelMode counter */ int parallelModeLevel; /* Enter/ExitParallelMode counter */
bool chain; /* start a new block after this one */ bool chain; /* start a new block after this one */
bool assigned; /* assigned to top-level XID */
struct TransactionStateData *parent; /* back link to parent */ struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData; } TransactionStateData;
...@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState ...@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
static TransactionStateData TopTransactionStateData = { static TransactionStateData TopTransactionStateData = {
.state = TRANS_DEFAULT, .state = TRANS_DEFAULT,
.blockState = TBLOCK_DEFAULT, .blockState = TBLOCK_DEFAULT,
.assigned = false,
}; };
/* /*
...@@ -5120,6 +5122,7 @@ PushTransaction(void) ...@@ -5120,6 +5122,7 @@ PushTransaction(void)
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly; s->prevXactReadOnly = XactReadOnly;
s->parallelModeLevel = 0; s->parallelModeLevel = 0;
s->assigned = false;
CurrentTransactionState = s; CurrentTransactionState = s;
...@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record) ...@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
else else
elog(PANIC, "xact_redo: unknown op code %u", info); elog(PANIC, "xact_redo: unknown op code %u", info);
} }
/*
* IsSubTransactionAssignmentPending
*
* This is used to decide whether we need to WAL log the top-level XID for
* operation in a subtransaction. We require that for logical decoding, see
* LogicalDecodingProcessRecord.
*
* This returns true if wal_level >= logical and we are inside a valid
* subtransaction, for which the assignment was not yet written to any WAL
* record.
*/
bool
IsSubTransactionAssignmentPending(void)
{
/* wal_level has to be logical */
if (!XLogLogicalInfoActive())
return false;
/* we need to be in a transaction state */
if (!IsTransactionState())
return false;
/* it has to be a subtransaction */
if (!IsSubTransaction())
return false;
/* the subtransaction has to have a XID assigned */
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
return false;
/* and it should not be already 'assigned' */
return !CurrentTransactionState->assigned;
}
/*
* MarkSubTransactionAssigned
*
* Mark the subtransaction assignment as completed.
*/
void
MarkSubTransactionAssigned(void)
{
Assert(IsSubTransactionAssignmentPending());
CurrentTransactionState->assigned = true;
}
...@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt; ...@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL; static char *hdr_scratch = NULL;
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
#define HEADER_SCRATCH_SIZE \ #define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \ (SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
SizeOfXLogTransactionId)
/* /*
* An array of XLogRecData structs, to hold registered data. * An array of XLogRecData structs, to hold registered data.
...@@ -195,6 +197,10 @@ XLogResetInsertion(void) ...@@ -195,6 +197,10 @@ XLogResetInsertion(void)
{ {
int i; int i;
/* reset the subxact assignment flag (if needed) */
if (curinsert_flags & XLOG_INCLUDE_XID)
MarkSubTransactionAssigned();
for (i = 0; i < max_registered_block_id; i++) for (i = 0; i < max_registered_block_id; i++)
registered_buffers[i].in_use = false; registered_buffers[i].in_use = false;
...@@ -398,7 +404,7 @@ void ...@@ -398,7 +404,7 @@ void
XLogSetRecordFlags(uint8 flags) XLogSetRecordFlags(uint8 flags)
{ {
Assert(begininsert_called); Assert(begininsert_called);
curinsert_flags = flags; curinsert_flags |= flags;
} }
/* /*
...@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
scratch += sizeof(replorigin_session_origin); scratch += sizeof(replorigin_session_origin);
} }
/* followed by toplevel XID, if not already included in previous record */
if (IsSubTransactionAssignmentPending())
{
TransactionId xid = GetTopTransactionIdIfAny();
/* update the flag (later used by XLogResetInsertion) */
XLogSetRecordFlags(XLOG_INCLUDE_XID);
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
memcpy(scratch, &xid, sizeof(TransactionId));
scratch += sizeof(TransactionId);
}
/* followed by main data, if any */ /* followed by main data, if any */
if (mainrdata_len > 0) if (mainrdata_len > 0)
{ {
......
...@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
state->decoded_record = record; state->decoded_record = record;
state->record_origin = InvalidRepOriginId; state->record_origin = InvalidRepOriginId;
state->toplevel_xid = InvalidTransactionId;
ptr = (char *) record; ptr = (char *) record;
ptr += SizeOfXLogRecord; ptr += SizeOfXLogRecord;
...@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
{ {
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
} }
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
}
else if (block_id <= XLR_MAX_BLOCK_ID) else if (block_id <= XLR_MAX_BLOCK_ID)
{ {
/* XLogRecordBlockHeader */ /* XLogRecordBlockHeader */
......
...@@ -94,11 +94,27 @@ void ...@@ -94,11 +94,27 @@ void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{ {
XLogRecordBuffer buf; XLogRecordBuffer buf;
TransactionId txid;
buf.origptr = ctx->reader->ReadRecPtr; buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr; buf.endptr = ctx->reader->EndRecPtr;
buf.record = record; buf.record = record;
txid = XLogRecGetTopXid(record);
/*
* If the top-level xid is valid, we need to assign the subxact to the
* top-level xact. We need to do this for all records, hence we do it
* before the switch.
*/
if (TransactionIdIsValid(txid))
{
ReorderBufferAssignChild(ctx->reorder,
txid,
record->decoded_record->xl_xid,
buf.origptr);
}
/* cast so we get a warning when new rmgrs are added */ /* cast so we get a warning when new rmgrs are added */
switch ((RmgrId) XLogRecGetRmid(record)) switch ((RmgrId) XLogRecGetRmid(record))
{ {
...@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* /*
* If the snapshot isn't yet fully built, we cannot decode anything, so * If the snapshot isn't yet fully built, we cannot decode anything, so
* bail out. * bail out.
*
* However, it's critical to process XLOG_XACT_ASSIGNMENT records even
* when the snapshot is being built: it is possible to get later records
* that require subxids to be properly assigned.
*/ */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
info != XLOG_XACT_ASSIGNMENT)
return; return;
switch (info) switch (info)
...@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break; break;
} }
case XLOG_XACT_ASSIGNMENT: case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
int i;
TransactionId *sub_xid;
xlrec = (xl_xact_assignment *) XLogRecGetData(r); /*
* We assign subxact to the toplevel xact while processing each
sub_xid = &xlrec->xsub[0]; * record if required. So, we don't need to do anything here.
* See LogicalDecodingProcessRecord.
for (i = 0; i < xlrec->nsubxacts; i++) */
{ break;
ReorderBufferAssignChild(reorder, xlrec->xtop,
*(sub_xid++), buf->origptr);
}
break;
}
case XLOG_XACT_PREPARE: case XLOG_XACT_PREPARE:
/* /*
......
...@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); ...@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
extern bool IsSubTransactionAssignmentPending(void);
extern void MarkSubTransactionAssigned(void);
extern int xactGetCommittedChildren(TransactionId **ptr); extern int xactGetCommittedChildren(TransactionId **ptr);
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
......
...@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG; ...@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
*/ */
#define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */
#define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */
#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */
/* Checkpoint statistics */ /* Checkpoint statistics */
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
......
...@@ -191,6 +191,8 @@ struct XLogReaderState ...@@ -191,6 +191,8 @@ struct XLogReaderState
RepOriginId record_origin; RepOriginId record_origin;
TransactionId toplevel_xid; /* XID of top-level transaction */
/* information about blocks referenced by the record. */ /* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
...@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, ...@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
#define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetData(decoder) ((decoder)->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
......
...@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong ...@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
#define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_SHORT 255
#define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_DATA_LONG 254
#define XLR_BLOCK_ID_ORIGIN 253 #define XLR_BLOCK_ID_ORIGIN 253
#define XLR_BLOCK_ID_TOPLEVEL_XID 252
#endif /* XLOGRECORD_H */ #endif /* XLOGRECORD_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment