Commit f003d9f8 authored by Thomas Munro's avatar Thomas Munro

Add circular WAL decoding buffer.

Teach xlogreader.c to decode its output into a circular buffer, to
support optimizations based on looking ahead.

 * XLogReadRecord() works as before, consuming records one by one, and
   allowing them to be examined via the traditional XLogRecGetXXX()
   macros.

 * An alternative new interface XLogNextRecord() is added that returns
   pointers to DecodedXLogRecord structs that can be examined directly.

 * XLogReadAhead() provides a second cursor that lets you see
   further ahead, as long as data is available and there is enough space
   in the decoding buffer.  This returns DecodedXLogRecord pointers to the
   caller, but also adds them to a queue of records that will later be
   consumed by XLogNextRecord()/XLogReadRecord().

The buffer's size is controlled with wal_decode_buffer_size.  The buffer
could potentially be placed into shared memory, for future projects.
Large records that don't fit in the circular buffer are called
"oversized" and allocated separately with palloc().

Discussion: https://postgr.es/m/CA+hUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq=AovOddfHpA@mail.gmail.com
parent 323cbe7c
...@@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record) ...@@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record)
uint8 block_id; uint8 block_id;
/* Protect limited size of buffers[] array */ /* Protect limited size of buffers[] array */
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES); Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES);
/* Iterate over blocks */ /* Iterate over blocks */
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
XLogRedoAction action; XLogRedoAction action;
...@@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record) ...@@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record)
} }
/* Changes are done: unlock and release all buffers */ /* Changes are done: unlock and release all buffers */
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
if (BufferIsValid(buffers[block_id])) if (BufferIsValid(buffers[block_id]))
UnlockReleaseBuffer(buffers[block_id]); UnlockReleaseBuffer(buffers[block_id]);
......
...@@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata,
StringInfoData recordBuf; StringInfoData recordBuf;
char *errormsg = NULL; char *errormsg = NULL;
MemoryContext oldCxt; MemoryContext oldCxt;
DecodedXLogRecord *decoded;
oldCxt = MemoryContextSwitchTo(walDebugCxt); oldCxt = MemoryContextSwitchTo(walDebugCxt);
...@@ -1224,6 +1225,9 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1224,6 +1225,9 @@ XLogInsertRecord(XLogRecData *rdata,
for (; rdata != NULL; rdata = rdata->next) for (; rdata != NULL; rdata = rdata->next)
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
/* How much space would it take to decode this record? */
decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len));
if (!debug_reader) if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
...@@ -1231,7 +1235,9 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1231,7 +1235,9 @@ XLogInsertRecord(XLogRecData *rdata,
{ {
appendStringInfoString(&buf, "error decoding record: out of memory"); appendStringInfoString(&buf, "error decoding record: out of memory");
} }
else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, else if (!DecodeXLogRecord(debug_reader, decoded,
(XLogRecord *) recordBuf.data,
EndPos,
&errormsg)) &errormsg))
{ {
appendStringInfo(&buf, "error decoding record: %s", appendStringInfo(&buf, "error decoding record: %s",
...@@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata,
else else
{ {
appendStringInfoString(&buf, " - "); appendStringInfoString(&buf, " - ");
/*
* Temporarily make this decoded record the current record for
* XLogRecGetXXX() macros.
*/
debug_reader->record = decoded;
xlog_outdesc(&buf, debug_reader); xlog_outdesc(&buf, debug_reader);
debug_reader->record = NULL;
} }
elog(LOG, "%s", buf.data); elog(LOG, "%s", buf.data);
pfree(decoded);
pfree(buf.data); pfree(buf.data);
pfree(recordBuf.data); pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt); MemoryContextSwitchTo(oldCxt);
...@@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record) ...@@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record)
Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0); Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
Buffer buf; Buffer buf;
Page page; Page page;
...@@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, ...@@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
ReadRecPtr = xlogreader->ReadRecPtr; ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr; EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL) if (record == NULL)
{ {
if (readFile >= 0) if (readFile >= 0)
...@@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record) ...@@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record)
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
* code just to distinguish them for statistics purposes. * code just to distinguish them for statistics purposes.
*/ */
for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++) for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
Buffer buffer; Buffer buffer;
...@@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record) ...@@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
int block_id; int block_id;
/* decode block references */ /* decode block references */
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
RelFileNode rnode; RelFileNode rnode;
ForkNumber forknum; ForkNumber forknum;
...@@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state, ...@@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state,
XLogRecPtr targetPagePtr = state->readPagePtr; XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen; int reqLen = state->reqLen;
int readLen = 0; int readLen = 0;
XLogRecPtr targetRecPtr = state->ReadRecPtr; XLogRecPtr targetRecPtr = state->DecodeRecPtr;
uint32 targetPageOff; uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r; int r;
...@@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state, ...@@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state,
/* /*
* Request a restartpoint if we've replayed too much xlog since the * Request a restartpoint if we've replayed too much xlog since the
* last one. * last one.
*
* XXX Why is this here? Move it to recovery loop, since it's based
* on replay position, not read position?
*/ */
if (bgwriterLaunched) if (bgwriterLaunched)
{ {
...@@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* be updated on each cycle. When we are behind, * be updated on each cycle. When we are behind,
* XLogReceiptTime will not advance, so the grace time * XLogReceiptTime will not advance, so the grace time
* allotted to conflicting queries will decrease. * allotted to conflicting queries will decrease.
*
*/ */
if (RecPtr < flushedUpto) if (RecPtr < flushedUpto)
havedata = true; havedata = true;
......
...@@ -38,6 +38,9 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) ...@@ -38,6 +38,9 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
int reqLen, bool header_inclusive); int reqLen, bool header_inclusive);
size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
static XLogReadRecordResult XLogDecodeOneRecord(XLogReaderState *state,
bool allow_oversized);
static void XLogReaderInvalReadState(XLogReaderState *state); static void XLogReaderInvalReadState(XLogReaderState *state);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record); XLogRecPtr PrevRecPtr, XLogRecord *record);
...@@ -50,6 +53,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, ...@@ -50,6 +53,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
/* size of the buffer allocated for error message. */ /* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000 #define MAX_ERRORMSG_LEN 1000
#define DEFAULT_DECODE_BUFFER_SIZE 0x10000
/* /*
* Construct a string in state->errormsg_buf explaining what's wrong with * Construct a string in state->errormsg_buf explaining what's wrong with
* the current record being read. * the current record being read.
...@@ -64,6 +69,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) ...@@ -64,6 +69,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
va_start(args, fmt); va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args); va_end(args);
state->errormsg_deferred = true;
} }
/* /*
...@@ -86,8 +93,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -86,8 +93,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
/* initialize caller-provided support functions */ /* initialize caller-provided support functions */
state->cleanup_cb = cleanup_cb; state->cleanup_cb = cleanup_cb;
state->max_block_id = -1;
/* /*
* Permanently allocate readBuf. We do it this way, rather than just * Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the * making a static array, for two reasons: (1) no need to waste the
...@@ -136,18 +141,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -136,18 +141,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
void void
XLogReaderFree(XLogReaderState *state) XLogReaderFree(XLogReaderState *state)
{ {
int block_id;
if (state->seg.ws_file >= 0) if (state->seg.ws_file >= 0)
state->cleanup_cb(state); state->cleanup_cb(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) if (state->decode_buffer && state->free_decode_buffer)
{ pfree(state->decode_buffer);
if (state->blocks[block_id].data)
pfree(state->blocks[block_id].data);
}
if (state->main_data)
pfree(state->main_data);
pfree(state->errormsg_buf); pfree(state->errormsg_buf);
if (state->readRecordBuf) if (state->readRecordBuf)
...@@ -156,6 +154,22 @@ XLogReaderFree(XLogReaderState *state) ...@@ -156,6 +154,22 @@ XLogReaderFree(XLogReaderState *state)
pfree(state); pfree(state);
} }
/*
* Set the size of the decoding buffer. A pointer to a caller supplied memory
* region may also be passed in, in which case non-oversized records will be
* decoded there.
*/
void
XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
{
Assert(state->decode_buffer == NULL);
state->decode_buffer = buffer;
state->decode_buffer_size = size;
state->decode_buffer_head = buffer;
state->decode_buffer_tail = buffer;
}
/* /*
* Allocate readRecordBuf to fit a record of at least the given length. * Allocate readRecordBuf to fit a record of at least the given length.
* Returns true if successful, false if out of memory. * Returns true if successful, false if out of memory.
...@@ -243,22 +257,123 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -243,22 +257,123 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */ /* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr; state->EndRecPtr = RecPtr;
state->NextRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr; state->ReadRecPtr = InvalidXLogRecPtr;
state->DecodeRecPtr = InvalidXLogRecPtr;
state->readRecordState = XLREAD_NEXT_RECORD; state->readRecordState = XLREAD_NEXT_RECORD;
} }
/* /*
* Attempt to read an XLOG record. * See if we can release the last record that was returned by
* * XLogReadRecord(), to free up space.
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call */
* to XLogReadRecord(). static void
XLogReleasePreviousRecord(XLogReaderState *state)
{
DecodedXLogRecord *record;
/*
* Remove it from the decoded record queue. It must be the oldest
* item decoded, decode_queue_tail.
*/
record = state->record;
Assert(record == state->decode_queue_tail);
state->record = NULL;
state->decode_queue_tail = record->next;
/* It might also be the newest item decoded, decode_queue_head. */
if (state->decode_queue_head == record)
state->decode_queue_head = NULL;
/* Release the space. */
if (unlikely(record->oversized))
{
/* It's not in the the decode buffer, so free it to release space. */
pfree(record);
}
else
{
/* It must be the tail record in the decode buffer. */
Assert(state->decode_buffer_tail == (char *) record);
/*
* We need to update tail to point to the next record that is in the
* decode buffer, if any, being careful to skip oversized ones
* (they're not in the decode buffer).
*/
record = record->next;
while (unlikely(record && record->oversized))
record = record->next;
if (record)
{
/* Adjust tail to release space up to the next record. */
state->decode_buffer_tail = (char *) record;
}
else if (state->decoding && !state->decoding->oversized)
{
/*
* We're releasing the last fully decoded record in
* XLogReadRecord(), but some time earlier we partially decoded a
* record in XLogReadAhead() and were unable to complete the job.
* We'll set the buffer head and tail to point to the record we
* started working on, so that we can continue (perhaps from a
* different source).
*/
state->decode_buffer_tail = (char *) state->decoding;
state->decode_buffer_head = (char *) state->decoding;
}
else
{
/*
* Otherwise we might as well just reset head and tail to the
* start of the buffer space, because we're empty. This means
* we'll keep overwriting the same piece of memory if we're not
* doing any prefetching.
*/
state->decode_buffer_tail = state->decode_buffer;
state->decode_buffer_head = state->decode_buffer;
}
}
}
/*
* Similar to XLogNextRecord(), but this traditional interface is for code
* that just wants the header, not the decoded record. Callers can access the
* decoded record through the XLogRecGetXXX() macros.
*/
XLogReadRecordResult
XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
{
XLogReadRecordResult result;
DecodedXLogRecord *decoded;
/* Consume the next decoded record. */
result = XLogNextRecord(state, &decoded, errormsg);
if (result == XLREAD_SUCCESS)
{
/*
* The traditional interface just returns the header, not the decoded
* record. The caller will access the decoded record through the
* XLogRecGetXXX() macros.
*/
*record = &decoded->header;
}
else
*record = NULL;
return result;
}
/*
* Consume the next record. XLogBeginRead() or XLogFindNextRecord() must be
* called before the first call to XLogNextRecord().
* *
* This function may return XLREAD_NEED_DATA several times before returning a * This function may return XLREAD_NEED_DATA several times before returning a
* result record. The caller shall read in some new data then call this * result record. The caller shall read in some new data then call this
* function again with the same parameters. * function again with the same parameters.
* *
* When a record is successfully read, returns XLREAD_SUCCESS with result * When a record is successfully read, returns XLREAD_SUCCESS with result
* record being stored in *record. Otherwise *record is NULL. * record being stored in *record. Otherwise *record is set to NULL.
* *
* Returns XLREAD_NEED_DATA if more data is needed to finish decoding the * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the
* current record. In that case, state->readPagePtr and state->reqLen inform * current record. In that case, state->readPagePtr and state->reqLen inform
...@@ -269,11 +384,249 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -269,11 +384,249 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* length of data that is now available (which must be >= given reqLen), * length of data that is now available (which must be >= given reqLen),
* respectively. * respectively.
* *
* If invalid data is encountered, returns XLREAD_FAIL and sets *record to * Returns XLREAD_FULL if allow_oversized is true, and no space is available.
* NULL. *errormsg is set to a string with details of the failure. The * This is intended for readahead.
*
* If invalid data is encountered, returns XLREAD_FAIL with *record being set
* to NULL. *errormsg is set to a string with details of the failure. The
* returned pointer (or *errormsg) points to an internal buffer that's valid * returned pointer (or *errormsg) points to an internal buffer that's valid
* until the next call to XLogReadRecord. * until the next call to XLogReadRecord.
* *
*/
XLogReadRecordResult
XLogNextRecord(XLogReaderState *state,
DecodedXLogRecord **record,
char **errormsg)
{
/* Release the space occupied by the last record we returned. */
if (state->record)
XLogReleasePreviousRecord(state);
for (;;)
{
XLogReadRecordResult result;
/* We can now return the oldest item in the queue, if there is one. */
if (state->decode_queue_tail)
{
/*
* Record this as the most recent record returned, so that we'll
* release it next time. This also exposes it to the
* XLogRecXXX(decoder) macros, which pass in the decoder rather
* than the record for historical reasons.
*/
state->record = state->decode_queue_tail;
/*
* It should be immediately after the last the record returned by
* XLogReadRecord(), or at the position set by XLogBeginRead() if
* XLogReadRecord() hasn't been called yet. It may be after a
* page header, though.
*/
Assert(state->record->lsn == state->EndRecPtr ||
(state->EndRecPtr % XLOG_BLCKSZ == 0 &&
(state->record->lsn == state->EndRecPtr + SizeOfXLogShortPHD ||
state->record->lsn == state->EndRecPtr + SizeOfXLogLongPHD)));
/*
* Set ReadRecPtr and EndRecPtr to correspond to that
* record.
*
* Calling code could access these through the returned decoded
* record, but for now we'll update them directly here, for the
* benefit of all the existing code that accesses these variables
* directly.
*/
state->ReadRecPtr = state->record->lsn;
state->EndRecPtr = state->record->next_lsn;
*errormsg = NULL;
*record = state->record;
return XLREAD_SUCCESS;
}
else if (state->errormsg_deferred)
{
/*
* If we've run out of records, but we have a deferred error, now
* is the time to report it.
*/
state->errormsg_deferred = false;
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
else
*errormsg = NULL;
*record = NULL;
state->EndRecPtr = state->DecodeRecPtr;
return XLREAD_FAIL;
}
/* We need to get a decoded record into our queue first. */
result = XLogDecodeOneRecord(state, true /* allow_oversized */ );
switch(result)
{
case XLREAD_NEED_DATA:
*errormsg = NULL;
*record = NULL;
return result;
case XLREAD_SUCCESS:
Assert(state->decode_queue_tail != NULL);
break;
case XLREAD_FULL:
/* Not expected because we passed allow_oversized = true */
Assert(false);
break;
case XLREAD_FAIL:
/*
* If that produced neither a queued record nor a queued error,
* then we're at the end (for example, archive recovery with no
* more files available).
*/
Assert(state->decode_queue_tail == NULL);
if (!state->errormsg_deferred)
{
state->EndRecPtr = state->DecodeRecPtr;
*errormsg = NULL;
*record = NULL;
return result;
}
break;
}
}
/* unreachable */
return XLREAD_FAIL;
}
/*
* Try to decode the next available record. The next record will also be
* returned to XLogRecordRead().
*
* In addition to the values that XLogReadRecord() can return, XLogReadAhead()
* can also return XLREAD_FULL to indicate that further readahead is not
* possible yet due to lack of space.
*/
XLogReadRecordResult
XLogReadAhead(XLogReaderState *state, DecodedXLogRecord **record, char **errormsg)
{
XLogReadRecordResult result;
/* We stop trying after encountering an error. */
if (unlikely(state->errormsg_deferred))
{
/* We only report the error message the first time, see below. */
*errormsg = NULL;
return XLREAD_FAIL;
}
/*
* Try to decode one more record, if we have space. Pass allow_oversized
* = false, so that this call returns fast if the decode buffer is full.
*/
result = XLogDecodeOneRecord(state, false);
switch (result)
{
case XLREAD_SUCCESS:
/* New record at head of decode record queue. */
Assert(state->decode_queue_head != NULL);
*record = state->decode_queue_head;
return result;
case XLREAD_FULL:
/* No space in circular decode buffer. */
return result;
case XLREAD_NEED_DATA:
/* The caller needs to insert more data. */
return result;
case XLREAD_FAIL:
/* Report the error. XLogReadRecord() will also report it. */
Assert(state->errormsg_deferred);
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
return result;
}
/* Unreachable. */
return XLREAD_FAIL;
}
/*
* Allocate space for a decoded record. The only member of the returned
* object that is initialized is the 'oversized' flag, indicating that the
* decoded record wouldn't fit in the decode buffer and must eventually be
* freed explicitly.
*
* Return NULL if there is no space in the decode buffer and allow_oversized
* is false, or if memory allocation fails for an oversized buffer.
*/
static DecodedXLogRecord *
XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
{
size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
DecodedXLogRecord *decoded = NULL;
/* Allocate a circular decode buffer if we don't have one already. */
if (unlikely(state->decode_buffer == NULL))
{
if (state->decode_buffer_size == 0)
state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
state->decode_buffer = palloc(state->decode_buffer_size);
state->decode_buffer_head = state->decode_buffer;
state->decode_buffer_tail = state->decode_buffer;
state->free_decode_buffer = true;
}
if (state->decode_buffer_head >= state->decode_buffer_tail)
{
/* Empty, or head is to the right of tail. */
if (state->decode_buffer_head + required_space <=
state->decode_buffer + state->decode_buffer_size)
{
/* There is space between head and end. */
decoded = (DecodedXLogRecord *) state->decode_buffer_head;
decoded->oversized = false;
return decoded;
}
else if (state->decode_buffer + required_space <
state->decode_buffer_tail)
{
/* There is space between start and tail. */
decoded = (DecodedXLogRecord *) state->decode_buffer;
decoded->oversized = false;
return decoded;
}
}
else
{
/* Head is to the left of tail. */
if (state->decode_buffer_head + required_space <
state->decode_buffer_tail)
{
/* There is space between head and tail. */
decoded = (DecodedXLogRecord *) state->decode_buffer_head;
decoded->oversized = false;
return decoded;
}
}
/* Not enough space in the decode buffer. Are we allowed to allocate? */
if (allow_oversized)
{
decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
if (decoded == NULL)
return NULL;
decoded->oversized = true;
return decoded;
}
return decoded;
}
/*
* Try to read and decode the next record and add it to the head of the
* decoded record queue. If 'allow_oversized' is false, then XLREAD_FULL can
* be returned to indicate the decoding buffer is full. XLogBeginRead() or
* XLogFindNextRecord() must be called before the first call to
* XLogReadRecord().
* *
* This function runs a state machine consisting of the following states. * This function runs a state machine consisting of the following states.
* *
...@@ -300,35 +653,35 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -300,35 +653,35 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* current state. This behavior allows us to continue reading a record * current state. This behavior allows us to continue reading a record
* after switching to a different source, during streaming replication. * after switching to a different source, during streaming replication.
*/ */
XLogReadRecordResult static XLogReadRecordResult
XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized)
{ {
XLogRecord *record;
char *errormsg; /* not used */
XLogRecord *prec; XLogRecord *prec;
*record = NULL;
/* reset error state */ /* reset error state */
*errormsg = NULL;
state->errormsg_buf[0] = '\0'; state->errormsg_buf[0] = '\0';
record = NULL;
switch (state->readRecordState) switch (state->readRecordState)
{ {
case XLREAD_NEXT_RECORD: case XLREAD_NEXT_RECORD:
ResetDecoder(state); Assert(!state->decoding);
if (state->ReadRecPtr != InvalidXLogRecPtr) if (state->DecodeRecPtr != InvalidXLogRecPtr)
{ {
/* read the record after the one we just read */ /* read the record after the one we just read */
/* /*
* EndRecPtr is pointing to end+1 of the previous WAL record. * NextRecPtr is pointing to end+1 of the previous WAL record.
* If we're at a page boundary, no more records can fit on the * If we're at a page boundary, no more records can fit on the
* current page. We must skip over the page header, but we * current page. We must skip over the page header, but we
* can't do that until we've read in the page, since the * can't do that until we've read in the page, since the
* header size is variable. * header size is variable.
*/ */
state->PrevRecPtr = state->ReadRecPtr; state->PrevRecPtr = state->DecodeRecPtr;
state->ReadRecPtr = state->EndRecPtr; state->DecodeRecPtr = state->NextRecPtr;
} }
else else
{ {
...@@ -338,8 +691,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -338,8 +691,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* In this case, EndRecPtr should already be pointing to a * In this case, EndRecPtr should already be pointing to a
* valid record starting position. * valid record starting position.
*/ */
Assert(XRecOffIsValid(state->EndRecPtr)); Assert(XRecOffIsValid(state->NextRecPtr));
state->ReadRecPtr = state->EndRecPtr; state->DecodeRecPtr = state->NextRecPtr;
/* /*
* We cannot verify the previous-record pointer when we're * We cannot verify the previous-record pointer when we're
...@@ -347,7 +700,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -347,7 +700,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* won't try doing that. * won't try doing that.
*/ */
state->PrevRecPtr = InvalidXLogRecPtr; state->PrevRecPtr = InvalidXLogRecPtr;
state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */
} }
state->record_verified = false; state->record_verified = false;
...@@ -362,9 +714,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -362,9 +714,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
uint32 targetRecOff; uint32 targetRecOff;
XLogPageHeader pageHeader; XLogPageHeader pageHeader;
Assert(!state->decoding);
targetPagePtr = targetPagePtr =
state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
/* /*
* Check if we have enough data. For the first record in the * Check if we have enough data. For the first record in the
...@@ -385,13 +739,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -385,13 +739,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
if (targetRecOff == 0) if (targetRecOff == 0)
{ {
/* At page start, so skip over page header. */ /* At page start, so skip over page header. */
state->ReadRecPtr += pageHeaderSize; state->DecodeRecPtr += pageHeaderSize;
targetRecOff = pageHeaderSize; targetRecOff = pageHeaderSize;
} }
else if (targetRecOff < pageHeaderSize) else if (targetRecOff < pageHeaderSize)
{ {
report_invalid_record(state, "invalid record offset at %X/%X", report_invalid_record(state, "invalid record offset at %X/%X",
LSN_FORMAT_ARGS(state->ReadRecPtr)); LSN_FORMAT_ARGS(state->DecodeRecPtr));
goto err; goto err;
} }
...@@ -400,8 +754,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -400,8 +754,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
targetRecOff == pageHeaderSize) targetRecOff == pageHeaderSize)
{ {
report_invalid_record(state, "contrecord is requested by %X/%X", report_invalid_record(state, "contrecord is requested by %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) (state->DecodeRecPtr >> 32),
(uint32) state->ReadRecPtr); (uint32) state->DecodeRecPtr);
goto err; goto err;
} }
...@@ -419,9 +773,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -419,9 +773,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* header. * header.
*/ */
prec = (XLogRecord *) (state->readBuf + prec = (XLogRecord *) (state->readBuf +
state->ReadRecPtr % XLOG_BLCKSZ); state->DecodeRecPtr % XLOG_BLCKSZ);
total_len = prec->xl_tot_len; total_len = prec->xl_tot_len;
/* Find space to decode this record. */
Assert(state->decoding == NULL);
state->decoding = XLogReadRecordAlloc(state, total_len,
allow_oversized);
if (state->decoding == NULL)
{
/*
* We couldn't get space. If allow_oversized was true,
* then palloc() must have failed. Otherwise, report that
* our decoding buffer is full. This means that weare
* trying to read too far ahead.
*/
if (allow_oversized)
goto err;
return XLREAD_FULL;
}
/* /*
* If the whole record header is on this page, validate it * If the whole record header is on this page, validate it
* immediately. Otherwise do just a basic sanity check on * immediately. Otherwise do just a basic sanity check on
...@@ -433,7 +804,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -433,7 +804,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
*/ */
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{ {
if (!ValidXLogRecordHeader(state, state->ReadRecPtr, if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr, prec)) state->PrevRecPtr, prec))
goto err; goto err;
...@@ -446,7 +817,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -446,7 +817,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
{ {
report_invalid_record(state, report_invalid_record(state,
"invalid record length at %X/%X: wanted %u, got %u", "invalid record length at %X/%X: wanted %u, got %u",
LSN_FORMAT_ARGS(state->ReadRecPtr), LSN_FORMAT_ARGS(state->DecodeRecPtr),
(uint32) SizeOfXLogRecord, total_len); (uint32) SizeOfXLogRecord, total_len);
goto err; goto err;
} }
...@@ -471,13 +842,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -471,13 +842,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
XLogRecPtr targetPagePtr; XLogRecPtr targetPagePtr;
uint32 targetRecOff; uint32 targetRecOff;
Assert(state->decoding);
/* /*
* Wait for the rest of the record on the first page to become * Wait for the rest of the record on the first page to become
* available * available
*/ */
targetPagePtr = targetPagePtr =
state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
record_len = request_len - targetRecOff; record_len = request_len - targetRecOff;
...@@ -496,7 +869,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -496,7 +869,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* validate record header if not yet */ /* validate record header if not yet */
if (!state->record_verified && record_len >= SizeOfXLogRecord) if (!state->record_verified && record_len >= SizeOfXLogRecord)
{ {
if (!ValidXLogRecordHeader(state, state->ReadRecPtr, if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr, prec)) state->PrevRecPtr, prec))
goto err; goto err;
...@@ -509,15 +882,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -509,15 +882,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* Record does not cross a page boundary */ /* Record does not cross a page boundary */
Assert(state->record_verified); Assert(state->record_verified);
if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
goto err; goto err;
state->record_verified = true; /* to be tidy */ state->record_verified = true; /* to be tidy */
/* We already checked the header earlier */ /* We already checked the header earlier */
state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); state->NextRecPtr = state->DecodeRecPtr + MAXALIGN(record_len);
*record = prec; record = prec;
state->readRecordState = XLREAD_NEXT_RECORD; state->readRecordState = XLREAD_NEXT_RECORD;
break; break;
} }
...@@ -536,7 +909,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -536,7 +909,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
report_invalid_record(state, report_invalid_record(state,
"record length %u at %X/%X too long", "record length %u at %X/%X too long",
total_len, total_len,
LSN_FORMAT_ARGS(state->ReadRecPtr)); LSN_FORMAT_ARGS(state->DecodeRecPtr));
goto err; goto err;
} }
...@@ -547,7 +920,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -547,7 +920,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
state->recordRemainLen -= record_len; state->recordRemainLen -= record_len;
/* Calculate pointer to beginning of next page */ /* Calculate pointer to beginning of next page */
state->recordContRecPtr = state->ReadRecPtr + record_len; state->recordContRecPtr = state->DecodeRecPtr + record_len;
Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
state->readRecordState = XLREAD_CONTINUATION; state->readRecordState = XLREAD_CONTINUATION;
...@@ -564,6 +937,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -564,6 +937,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* we enter this state only if we haven't read the whole * we enter this state only if we haven't read the whole
* record. * record.
*/ */
Assert(state->decoding);
Assert(state->recordRemainLen > 0); Assert(state->recordRemainLen > 0);
while (state->recordRemainLen > 0) while (state->recordRemainLen > 0)
...@@ -583,7 +957,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -583,7 +957,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
return XLREAD_NEED_DATA; return XLREAD_NEED_DATA;
if (!state->page_verified) if (!state->page_verified)
goto err; goto err_continue;
Assert(SizeOfXLogShortPHD <= state->readLen); Assert(SizeOfXLogShortPHD <= state->readLen);
...@@ -596,8 +970,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -596,8 +970,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
"there is no contrecord flag at %X/%X reading %X/%X", "there is no contrecord flag at %X/%X reading %X/%X",
(uint32) (state->recordContRecPtr >> 32), (uint32) (state->recordContRecPtr >> 32),
(uint32) state->recordContRecPtr, (uint32) state->recordContRecPtr,
(uint32) (state->ReadRecPtr >> 32), (uint32) (state->DecodeRecPtr >> 32),
(uint32) state->ReadRecPtr); (uint32) state->DecodeRecPtr);
goto err; goto err;
} }
...@@ -614,8 +988,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -614,8 +988,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
pageHeader->xlp_rem_len, pageHeader->xlp_rem_len,
(uint32) (state->recordContRecPtr >> 32), (uint32) (state->recordContRecPtr >> 32),
(uint32) state->recordContRecPtr, (uint32) state->recordContRecPtr,
(uint32) (state->ReadRecPtr >> 32), (uint32) (state->DecodeRecPtr >> 32),
(uint32) state->ReadRecPtr, (uint32) state->DecodeRecPtr,
state->recordRemainLen); state->recordRemainLen);
goto err; goto err;
} }
...@@ -651,7 +1025,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -651,7 +1025,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
if (!state->record_verified) if (!state->record_verified)
{ {
Assert(state->recordGotLen >= SizeOfXLogRecord); Assert(state->recordGotLen >= SizeOfXLogRecord);
if (!ValidXLogRecordHeader(state, state->ReadRecPtr, if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr, state->PrevRecPtr,
(XLogRecord *) state->readRecordBuf)) (XLogRecord *) state->readRecordBuf))
goto err; goto err;
...@@ -668,16 +1042,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -668,16 +1042,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* targetPagePtr is pointing the last-read page here */ /* targetPagePtr is pointing the last-read page here */
prec = (XLogRecord *) state->readRecordBuf; prec = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
goto err; goto err;
pageHeaderSize = pageHeaderSize =
XLogPageHeaderSize((XLogPageHeader) state->readBuf); XLogPageHeaderSize((XLogPageHeader) state->readBuf);
state->EndRecPtr = targetPagePtr + pageHeaderSize state->NextRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len); + MAXALIGN(pageHeader->xlp_rem_len);
*record = prec; record = prec;
state->readRecordState = XLREAD_NEXT_RECORD; state->readRecordState = XLREAD_NEXT_RECORD;
break; break;
} }
} }
...@@ -685,32 +1060,65 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) ...@@ -685,32 +1060,65 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* /*
* Special processing if it's an XLOG SWITCH record * Special processing if it's an XLOG SWITCH record
*/ */
if ((*record)->xl_rmid == RM_XLOG_ID && if (record->xl_rmid == RM_XLOG_ID &&
((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{ {
/* Pretend it extends to end of segment */ /* Pretend it extends to end of segment */
state->EndRecPtr += state->segcxt.ws_segsize - 1; state->NextRecPtr += state->segcxt.ws_segsize - 1;
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
} }
if (DecodeXLogRecord(state, *record, errormsg)) Assert(!record || state->readLen >= 0);
return XLREAD_SUCCESS; if (DecodeXLogRecord(state, state->decoding, record, state->DecodeRecPtr, &errormsg))
{
/* Record the location of the next record. */
state->decoding->next_lsn = state->NextRecPtr;
*record = NULL; /*
return XLREAD_FAIL; * If it's in the decode buffer (not an "oversized" record allocated
* with palloc()), mark the decode buffer space as occupied.
*/
if (!state->decoding->oversized)
{
/* The new decode buffer head must be MAXALIGNed. */
Assert(state->decoding->size == MAXALIGN(state->decoding->size));
if ((char *) state->decoding == state->decode_buffer)
state->decode_buffer_head = state->decode_buffer +
state->decoding->size;
else
state->decode_buffer_head += state->decoding->size;
}
/* Insert it into the queue of decoded records. */
Assert(state->decode_queue_head != state->decoding);
if (state->decode_queue_head)
state->decode_queue_head->next = state->decoding;
state->decode_queue_head = state->decoding;
if (!state->decode_queue_tail)
state->decode_queue_tail = state->decoding;
state->decoding = NULL;
return XLREAD_SUCCESS;
}
err: err:
if (state->decoding && state->decoding->oversized)
pfree(state->decoding);
state->decoding = NULL;
err_continue:
/* /*
* Invalidate the read page. We might read from a different source after * Invalidate the read page. We might read from a different source after
* failure. * failure.
*/ */
XLogReaderInvalReadState(state); XLogReaderInvalReadState(state);
if (state->errormsg_buf[0] != '\0') /*
*errormsg = state->errormsg_buf; * If an error was written to errmsg_buf, it'll be returned to the caller
* of XLogReadRecord() after all successfully decoded records from the
* read queue.
*/
*record = NULL;
return XLREAD_FAIL; return XLREAD_FAIL;
} }
...@@ -1342,34 +1750,84 @@ WALRead(XLogReaderState *state, ...@@ -1342,34 +1750,84 @@ WALRead(XLogReaderState *state,
* ---------------------------------------- * ----------------------------------------
*/ */
/* private function to reset the state between records */ /*
* Private function to reset the state, forgetting all decoded records, if we
* are asked to move to a new read position.
*/
static void static void
ResetDecoder(XLogReaderState *state) ResetDecoder(XLogReaderState *state)
{ {
int block_id; DecodedXLogRecord *r;
state->decoded_record = NULL;
state->main_data_len = 0; /* Reset the decoded record queue, freeing any oversized records. */
while ((r = state->decode_queue_tail))
for (block_id = 0; block_id <= state->max_block_id; block_id++)
{ {
state->blocks[block_id].in_use = false; state->decode_queue_tail = r->next;
state->blocks[block_id].has_image = false; if (r->oversized)
state->blocks[block_id].has_data = false; pfree(r);
state->blocks[block_id].apply_image = false;
} }
state->max_block_id = -1; state->decode_queue_head = NULL;
state->decode_queue_tail = NULL;
state->record = NULL;
state->decoding = NULL;
/* Reset the decode buffer to empty. */
state->decode_buffer_head = state->decode_buffer;
state->decode_buffer_tail = state->decode_buffer;
/* Clear error state. */
state->errormsg_buf[0] = '\0';
state->errormsg_deferred = false;
}
/*
* Compute the maximum possible amount of padding that could be required to
* decode a record, given xl_tot_len from the record's header. This is the
* amount of output buffer space that we need to decode a record, though we
* might not finish up using it all.
*
* This computation is pessimistic and assumes the maximum possible number of
* blocks, due to lack of better information.
*/
size_t
DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
{
size_t size = 0;
/* Account for the fixed size part of the decoded record struct. */
size += offsetof(DecodedXLogRecord, blocks[0]);
/* Account for the flexible blocks array of maximum possible size. */
size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
/* Account for all the raw main and block data. */
size += xl_tot_len;
/* We might insert padding before main_data. */
size += (MAXIMUM_ALIGNOF - 1);
/* We might insert padding before each block's data. */
size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
/* We might insert padding at the end. */
size += (MAXIMUM_ALIGNOF - 1);
return size;
} }
/* /*
* Decode the previously read record. * Decode a record. "decoded" must point to a MAXALIGNed memory area that has
* space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
* success, decoded->size contains the actual space occupied by the decoded
* record, which may turn out to be less.
*
* Only decoded->oversized member must be initialized already, and will not be
* modified. Other members will be initialized as required.
* *
* On error, a human-readable error message is returned in *errormsg, and * On error, a human-readable error message is returned in *errormsg, and
* the return value is false. * the return value is false.
*/ */
bool bool
DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) DecodeXLogRecord(XLogReaderState *state,
DecodedXLogRecord *decoded,
XLogRecord *record,
XLogRecPtr lsn,
char **errormsg)
{ {
/* /*
* read next _size bytes from record buffer, but check for overrun first. * read next _size bytes from record buffer, but check for overrun first.
...@@ -1384,17 +1842,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1384,17 +1842,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
} while(0) } while(0)
char *ptr; char *ptr;
char *out;
uint32 remaining; uint32 remaining;
uint32 datatotal; uint32 datatotal;
RelFileNode *rnode = NULL; RelFileNode *rnode = NULL;
uint8 block_id; uint8 block_id;
ResetDecoder(state); decoded->header = *record;
decoded->lsn = lsn;
state->decoded_record = record; decoded->next = NULL;
state->record_origin = InvalidRepOriginId; decoded->record_origin = InvalidRepOriginId;
state->toplevel_xid = InvalidTransactionId; decoded->toplevel_xid = InvalidTransactionId;
decoded->main_data = NULL;
decoded->main_data_len = 0;
decoded->max_block_id = -1;
ptr = (char *) record; ptr = (char *) record;
ptr += SizeOfXLogRecord; ptr += SizeOfXLogRecord;
remaining = record->xl_tot_len - SizeOfXLogRecord; remaining = record->xl_tot_len - SizeOfXLogRecord;
...@@ -1412,7 +1873,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1412,7 +1873,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
state->main_data_len = main_data_len; decoded->main_data_len = main_data_len;
datatotal += main_data_len; datatotal += main_data_len;
break; /* by convention, the main data fragment is break; /* by convention, the main data fragment is
* always last */ * always last */
...@@ -1423,18 +1884,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1423,18 +1884,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
uint32 main_data_len; uint32 main_data_len;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
state->main_data_len = main_data_len; decoded->main_data_len = main_data_len;
datatotal += main_data_len; datatotal += main_data_len;
break; /* by convention, the main data fragment is break; /* by convention, the main data fragment is
* always last */ * always last */
} }
else if (block_id == XLR_BLOCK_ID_ORIGIN) else if (block_id == XLR_BLOCK_ID_ORIGIN)
{ {
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
} }
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{ {
COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
} }
else if (block_id <= XLR_MAX_BLOCK_ID) else if (block_id <= XLR_MAX_BLOCK_ID)
{ {
...@@ -1442,7 +1903,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1442,7 +1903,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
DecodedBkpBlock *blk; DecodedBkpBlock *blk;
uint8 fork_flags; uint8 fork_flags;
if (block_id <= state->max_block_id) /* mark any intervening block IDs as not in use */
for (int i = decoded->max_block_id + 1; i < block_id; ++i)
decoded->blocks[i].in_use = false;
if (block_id <= decoded->max_block_id)
{ {
report_invalid_record(state, report_invalid_record(state,
"out-of-order block_id %u at %X/%X", "out-of-order block_id %u at %X/%X",
...@@ -1450,9 +1915,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1450,9 +1915,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
LSN_FORMAT_ARGS(state->ReadRecPtr)); LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err; goto err;
} }
state->max_block_id = block_id; decoded->max_block_id = block_id;
blk = &state->blocks[block_id]; blk = &decoded->blocks[block_id];
blk->in_use = true; blk->in_use = true;
blk->apply_image = false; blk->apply_image = false;
...@@ -1596,17 +2061,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1596,17 +2061,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
/* /*
* Ok, we've parsed the fragment headers, and verified that the total * Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data * length of the payload in the fragments is equal to the amount of data
* left. Copy the data of each fragment to a separate buffer. * left. Copy the data of each fragment to contiguous space after the
* * blocks array, inserting alignment padding before the data fragments so
* We could just set up pointers into readRecordBuf, but we want to align * they can be cast to struct pointers by REDO routines.
* the data for the convenience of the callers. Backup images are not
* copied, however; they don't need alignment.
*/ */
out = ((char *) decoded) +
offsetof(DecodedXLogRecord, blocks) +
sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
/* block data first */ /* block data first */
for (block_id = 0; block_id <= state->max_block_id; block_id++) for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
{ {
DecodedBkpBlock *blk = &state->blocks[block_id]; DecodedBkpBlock *blk = &decoded->blocks[block_id];
if (!blk->in_use) if (!blk->in_use)
continue; continue;
...@@ -1615,58 +2081,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1615,58 +2081,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (blk->has_image) if (blk->has_image)
{ {
blk->bkp_image = ptr; /* no need to align image */
blk->bkp_image = out;
memcpy(out, ptr, blk->bimg_len);
ptr += blk->bimg_len; ptr += blk->bimg_len;
out += blk->bimg_len;
} }
if (blk->has_data) if (blk->has_data)
{ {
if (!blk->data || blk->data_len > blk->data_bufsz) out = (char *) MAXALIGN(out);
{ blk->data = out;
if (blk->data)
pfree(blk->data);
/*
* Force the initial request to be BLCKSZ so that we don't
* waste time with lots of trips through this stanza as a
* result of WAL compression.
*/
blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
blk->data = palloc(blk->data_bufsz);
}
memcpy(blk->data, ptr, blk->data_len); memcpy(blk->data, ptr, blk->data_len);
ptr += blk->data_len; ptr += blk->data_len;
out += blk->data_len;
} }
} }
/* and finally, the main data */ /* and finally, the main data */
if (state->main_data_len > 0) if (decoded->main_data_len > 0)
{ {
if (!state->main_data || state->main_data_len > state->main_data_bufsz) out = (char *) MAXALIGN(out);
{ decoded->main_data = out;
if (state->main_data) memcpy(decoded->main_data, ptr, decoded->main_data_len);
pfree(state->main_data); ptr += decoded->main_data_len;
out += decoded->main_data_len;
/*
* main_data_bufsz must be MAXALIGN'ed. In many xlog record
* types, we omit trailing struct padding on-disk to save a few
* bytes; but compilers may generate accesses to the xlog struct
* that assume that padding bytes are present. If the palloc
* request is not large enough to include such padding bytes then
* we'll get valgrind complaints due to otherwise-harmless fetches
* of the padding bytes.
*
* In addition, force the initial request to be reasonably large
* so that we don't waste time with lots of trips through this
* stanza. BLCKSZ / 2 seems like a good compromise choice.
*/
state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
BLCKSZ / 2));
state->main_data = palloc(state->main_data_bufsz);
}
memcpy(state->main_data, ptr, state->main_data_len);
ptr += state->main_data_len;
} }
/* Report the actual size we used. */
decoded->size = MAXALIGN(out - (char *) decoded);
Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
decoded->size);
return true; return true;
shortdata_err: shortdata_err:
...@@ -1692,10 +2137,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, ...@@ -1692,10 +2137,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
{ {
DecodedBkpBlock *bkpb; DecodedBkpBlock *bkpb;
if (!record->blocks[block_id].in_use) if (block_id > record->record->max_block_id ||
!record->record->blocks[block_id].in_use)
return false; return false;
bkpb = &record->blocks[block_id]; bkpb = &record->record->blocks[block_id];
if (rnode) if (rnode)
*rnode = bkpb->rnode; *rnode = bkpb->rnode;
if (forknum) if (forknum)
...@@ -1715,10 +2161,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) ...@@ -1715,10 +2161,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
{ {
DecodedBkpBlock *bkpb; DecodedBkpBlock *bkpb;
if (!record->blocks[block_id].in_use) if (block_id > record->record->max_block_id ||
!record->record->blocks[block_id].in_use)
return NULL; return NULL;
bkpb = &record->blocks[block_id]; bkpb = &record->record->blocks[block_id];
if (!bkpb->has_data) if (!bkpb->has_data)
{ {
...@@ -1746,12 +2193,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) ...@@ -1746,12 +2193,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
char *ptr; char *ptr;
PGAlignedBlock tmp; PGAlignedBlock tmp;
if (!record->blocks[block_id].in_use) if (block_id > record->record->max_block_id ||
!record->record->blocks[block_id].in_use)
return false; return false;
if (!record->blocks[block_id].has_image) if (!record->record->blocks[block_id].has_image)
return false; return false;
bkpb = &record->blocks[block_id]; bkpb = &record->record->blocks[block_id];
ptr = bkpb->bkp_image; ptr = bkpb->bkp_image;
if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED) if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
......
...@@ -350,7 +350,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -350,7 +350,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
* going to initialize it. And vice versa. * going to initialize it. And vice versa.
*/ */
zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0; willinit = (record->record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
if (willinit && !zeromode) if (willinit && !zeromode)
elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine"); elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
if (!willinit && zeromode) if (!willinit && zeromode)
......
...@@ -123,7 +123,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor ...@@ -123,7 +123,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{ {
ReorderBufferAssignChild(ctx->reorder, ReorderBufferAssignChild(ctx->reorder,
txid, txid,
record->decoded_record->xl_xid, XLogRecGetXid(record),
buf.origptr); buf.origptr);
} }
......
...@@ -439,7 +439,7 @@ extractPageInfo(XLogReaderState *record) ...@@ -439,7 +439,7 @@ extractPageInfo(XLogReaderState *record)
RmgrNames[rmid], info); RmgrNames[rmid], info);
} }
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
RelFileNode rnode; RelFileNode rnode;
ForkNumber forknum; ForkNumber forknum;
......
...@@ -397,10 +397,10 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len) ...@@ -397,10 +397,10 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len)
* add an accessor macro for this. * add an accessor macro for this.
*/ */
*fpi_len = 0; *fpi_len = 0;
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
if (XLogRecHasBlockImage(record, block_id)) if (XLogRecHasBlockImage(record, block_id))
*fpi_len += record->blocks[block_id].bimg_len; *fpi_len += record->record->blocks[block_id].bimg_len;
} }
/* /*
...@@ -498,7 +498,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) ...@@ -498,7 +498,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
if (!config->bkp_details) if (!config->bkp_details)
{ {
/* print block references (short format) */ /* print block references (short format) */
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
if (!XLogRecHasBlockRef(record, block_id)) if (!XLogRecHasBlockRef(record, block_id))
continue; continue;
...@@ -529,7 +529,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) ...@@ -529,7 +529,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
{ {
/* print block references (detailed format) */ /* print block references (detailed format) */
putchar('\n'); putchar('\n');
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{ {
if (!XLogRecHasBlockRef(record, block_id)) if (!XLogRecHasBlockRef(record, block_id))
continue; continue;
...@@ -542,26 +542,26 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) ...@@ -542,26 +542,26 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
blk); blk);
if (XLogRecHasBlockImage(record, block_id)) if (XLogRecHasBlockImage(record, block_id))
{ {
if (record->blocks[block_id].bimg_info & if (record->record->blocks[block_id].bimg_info &
BKPIMAGE_IS_COMPRESSED) BKPIMAGE_IS_COMPRESSED)
{ {
printf(" (FPW%s); hole: offset: %u, length: %u, " printf(" (FPW%s); hole: offset: %u, length: %u, "
"compression saved: %u", "compression saved: %u",
XLogRecBlockImageApply(record, block_id) ? XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification", "" : " for WAL verification",
record->blocks[block_id].hole_offset, record->record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length, record->record->blocks[block_id].hole_length,
BLCKSZ - BLCKSZ -
record->blocks[block_id].hole_length - record->record->blocks[block_id].hole_length -
record->blocks[block_id].bimg_len); record->record->blocks[block_id].bimg_len);
} }
else else
{ {
printf(" (FPW%s); hole: offset: %u, length: %u", printf(" (FPW%s); hole: offset: %u, length: %u",
XLogRecBlockImageApply(record, block_id) ? XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification", "" : " for WAL verification",
record->blocks[block_id].hole_offset, record->record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length); record->record->blocks[block_id].hole_length);
} }
} }
putchar('\n'); putchar('\n');
......
...@@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult ...@@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult
{ {
XLREAD_SUCCESS, /* record is successfully read */ XLREAD_SUCCESS, /* record is successfully read */
XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */
XLREAD_FULL, /* cannot hold more data while reading ahead */
XLREAD_FAIL /* failed during reading a record */ XLREAD_FAIL /* failed during reading a record */
} XLogReadRecordResult; } XLogReadRecordResult;
...@@ -120,6 +121,30 @@ typedef enum XLogReadRecordState ...@@ -120,6 +121,30 @@ typedef enum XLogReadRecordState
XLREAD_CONTINUATION XLREAD_CONTINUATION
} XLogReadRecordState; } XLogReadRecordState;
/*
* The decoded contents of a record. This occupies a contiguous region of
* memory, with main_data and blocks[n].data pointing to memory after the
* members declared here.
*/
typedef struct DecodedXLogRecord
{
/* Private member used for resource management. */
size_t size; /* total size of decoded record */
bool oversized; /* outside the regular decode buffer? */
struct DecodedXLogRecord *next; /* decoded record queue link */
/* Public members. */
XLogRecPtr lsn; /* location */
XLogRecPtr next_lsn; /* location of next record */
XLogRecord header; /* header */
RepOriginId record_origin;
TransactionId toplevel_xid; /* XID of top-level transaction */
char *main_data; /* record's main data portion */
uint32 main_data_len; /* main data portion's length */
int max_block_id; /* highest block_id in use (-1 if none) */
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
} DecodedXLogRecord;
struct XLogReaderState struct XLogReaderState
{ {
/* /*
...@@ -142,10 +167,12 @@ struct XLogReaderState ...@@ -142,10 +167,12 @@ struct XLogReaderState
* Start and end point of last record read. EndRecPtr is also used as the * Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid. * starting position and ReadRecPtr to invalid.
*
* Start and end point of last record returned by XLogReadRecord(). These
* are also available as record->lsn and record->next_lsn.
*/ */
XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr ReadRecPtr; /* start of last record read or being read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */
XLogRecPtr PrevRecPtr; /* start of previous record read */
/* ---------------------------------------- /* ----------------------------------------
* Communication with page reader * Communication with page reader
...@@ -170,27 +197,43 @@ struct XLogReaderState ...@@ -170,27 +197,43 @@ struct XLogReaderState
* Use XLogRecGet* functions to investigate the record; these fields * Use XLogRecGet* functions to investigate the record; these fields
* should not be accessed directly. * should not be accessed directly.
* ---------------------------------------- * ----------------------------------------
* Start and end point of the last record read and decoded by
* XLogReadRecordInternal(). NextRecPtr is also used as the position to
* decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
* the requested starting position.
*/ */
XLogRecord *decoded_record; /* currently decoded record */ XLogRecPtr DecodeRecPtr; /* start of last record decoded */
XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
char *main_data; /* record's main data portion */ XLogRecPtr PrevRecPtr; /* start of previous record decoded */
uint32 main_data_len; /* main data portion's length */
uint32 main_data_bufsz; /* allocated size of the buffer */
RepOriginId record_origin;
TransactionId toplevel_xid; /* XID of top-level transaction */ /* Last record returned by XLogReadRecord(). */
DecodedXLogRecord *record;
/* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
int max_block_id; /* highest block_id in use (-1 if none) */
/* ---------------------------------------- /* ----------------------------------------
* private/internal state * private/internal state
* ---------------------------------------- * ----------------------------------------
*/ */
/*
* Buffer for decoded records. This is a circular buffer, though
* individual records can't be split in the middle, so some space is often
* wasted at the end. Oversized records that don't fit in this space are
* allocated separately.
*/
char *decode_buffer;
size_t decode_buffer_size;
bool free_decode_buffer; /* need to free? */
char *decode_buffer_head; /* write head */
char *decode_buffer_tail; /* read head */
/*
* Queue of records that have been decoded. This is a linked list that
* usually consists of consecutive records in decode_buffer, but may also
* contain oversized records allocated with palloc().
*/
DecodedXLogRecord *decode_queue_head; /* newest decoded record */
DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */
/* last read XLOG position for data currently in readBuf */ /* last read XLOG position for data currently in readBuf */
WALSegmentContext segcxt; WALSegmentContext segcxt;
WALOpenSegment seg; WALOpenSegment seg;
...@@ -230,7 +273,7 @@ struct XLogReaderState ...@@ -230,7 +273,7 @@ struct XLogReaderState
uint32 readRecordBufSize; uint32 readRecordBufSize;
/* /*
* XLogReadRecord() state * XLogReadRecordInternal() state
*/ */
XLogReadRecordState readRecordState; /* state machine state */ XLogReadRecordState readRecordState; /* state machine state */
int recordGotLen; /* amount of current record that has already int recordGotLen; /* amount of current record that has already
...@@ -238,8 +281,11 @@ struct XLogReaderState ...@@ -238,8 +281,11 @@ struct XLogReaderState
int recordRemainLen; /* length of current record that remains */ int recordRemainLen; /* length of current record that remains */
XLogRecPtr recordContRecPtr; /* where the current record continues */ XLogRecPtr recordContRecPtr; /* where the current record continues */
DecodedXLogRecord *decoding; /* record currently being decoded */
/* Buffer to hold error message */ /* Buffer to hold error message */
char *errormsg_buf; char *errormsg_buf;
bool errormsg_deferred;
}; };
struct XLogFindNextRecordState struct XLogFindNextRecordState
...@@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, ...@@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
/* Free an XLogReader */ /* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state); extern void XLogReaderFree(XLogReaderState *state);
/* Optionally provide a circular decoding buffer to allow readahead. */
extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
void *buffer,
size_t size);
/* Position the XLogReader to given record */ /* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND #ifdef FRONTEND
...@@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s ...@@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s
extern bool XLogFindNextRecord(XLogFindNextRecordState *state); extern bool XLogFindNextRecord(XLogFindNextRecordState *state);
#endif /* FRONTEND */ #endif /* FRONTEND */
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */ /* Read the next record's header. Returns NULL on end-of-WAL or failure. */
extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
XLogRecord **record, XLogRecord **record,
char **errormsg); char **errormsg);
/* Read the next decoded record. Returns NULL on end-of-WAL or failure. */
extern XLogReadRecordResult XLogNextRecord(XLogReaderState *state,
DecodedXLogRecord **record,
char **errormsg);
/* Try to read ahead, if there is space in the decoding buffer. */
extern XLogReadRecordResult XLogReadAhead(XLogReaderState *state,
DecodedXLogRecord **record,
char **errormsg);
/* Validate a page */ /* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state, extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr); XLogRecPtr recptr, char *phdr);
...@@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state, ...@@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state,
/* Functions for decoding an XLogRecord */ /* Functions for decoding an XLogRecord */
extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
extern bool DecodeXLogRecord(XLogReaderState *state,
DecodedXLogRecord *decoded,
XLogRecord *record,
XLogRecPtr lsn,
char **errmsg); char **errmsg);
#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) #define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) #define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) #define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) #define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
#define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetData(decoder) ((decoder)->record->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
#define XLogRecHasBlockRef(decoder, block_id) \ #define XLogRecHasBlockRef(decoder, block_id) \
((decoder)->blocks[block_id].in_use) ((decoder)->record->max_block_id >= (block_id)) && \
((decoder)->record->blocks[block_id].in_use)
#define XLogRecHasBlockImage(decoder, block_id) \ #define XLogRecHasBlockImage(decoder, block_id) \
((decoder)->blocks[block_id].has_image) ((decoder)->record->blocks[block_id].has_image)
#define XLogRecBlockImageApply(decoder, block_id) \ #define XLogRecBlockImageApply(decoder, block_id) \
((decoder)->blocks[block_id].apply_image) ((decoder)->record->blocks[block_id].apply_image)
#ifndef FRONTEND #ifndef FRONTEND
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record); extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment