Commit 323cbe7c authored by Thomas Munro's avatar Thomas Munro

Remove read_page callback from XLogReader.

Previously, the XLogReader module would fetch new input data using a
callback function.  Redesign the interface so that it tells the caller
to insert more data with a special return value instead.  This API suits
later patches for prefetching, encryption and maybe other future
projects that would otherwise require continually extending the callback
interface.

As incidental cleanup work, move global variables readOff, readLen and
readSegNo inside XlogReaderState.

Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version)
Reviewed-by: default avatarAntonin Houska <ah@cybertec.at>
Reviewed-by: default avatarAlvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: default avatarTakashi Menjo <takashi.menjo@gmail.com>
Reviewed-by: default avatarAndres Freund <andres@anarazel.de>
Reviewed-by: default avatarThomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
parent 5ac9c430
...@@ -1330,11 +1330,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) ...@@ -1330,11 +1330,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg; char *errormsg;
TimeLineID save_currtli = ThisTimeLineID; TimeLineID save_currtli = ThisTimeLineID;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL, xlogreader = XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
XL_ROUTINE(.page_read = &read_local_xlog_page,
.segment_open = &wal_segment_open,
.segment_close = &wal_segment_close),
NULL);
if (!xlogreader) if (!xlogreader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
...@@ -1342,7 +1339,12 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) ...@@ -1342,7 +1339,12 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
errdetail("Failed while allocating a WAL reading processor."))); errdetail("Failed while allocating a WAL reading processor.")));
XLogBeginRead(xlogreader, lsn); XLogBeginRead(xlogreader, lsn);
record = XLogReadRecord(xlogreader, &errormsg); while (XLogReadRecord(xlogreader, &record, &errormsg) ==
XLREAD_NEED_DATA)
{
if (!read_local_xlog_page(xlogreader))
break;
}
/* /*
* Restore immediately the timeline where it was previously, as * Restore immediately the timeline where it was previously, as
......
...@@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0; ...@@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0;
* These variables are used similarly to the ones above, but for reading * These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset * the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which * of the page just read, not the seek position of the FD itself, which
* will be just past that page. readLen indicates how much of the current * will be just past that page. readSource indicates where we got the
* page has been read into readBuf, and readSource indicates where we got * currently open file from.
* the currently open file from.
* Note: we could use Reserve/ReleaseExternalFD to track consumption of * Note: we could use Reserve/ReleaseExternalFD to track consumption of
* this FD too; but it doesn't currently seem worthwhile, since the XLOG is * this FD too; but it doesn't currently seem worthwhile, since the XLOG is
* not read by general-purpose sessions. * not read by general-purpose sessions.
*/ */
static int readFile = -1; static int readFile = -1;
static XLogSegNo readSegNo = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
static XLogSource readSource = XLOG_FROM_ANY; static XLogSource readSource = XLOG_FROM_ANY;
/* /*
...@@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY; ...@@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY;
static bool lastSourceFailed = false; static bool lastSourceFailed = false;
static bool pendingWalRcvRestart = false; static bool pendingWalRcvRestart = false;
typedef struct XLogPageReadPrivate
{
int emode;
bool fetching_ckpt; /* are we fetching a checkpoint record? */
bool randAccess;
} XLogPageReadPrivate;
/* /*
* These variables track when we last obtained some WAL data to process, * These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as * and where we got it from. (XLogReceiptSource is initially the same as
...@@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, ...@@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk); XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, static bool XLogPageRead(XLogReaderState *state,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf); bool fetching_ckpt, int emode, bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr); bool fetching_ckpt,
XLogRecPtr tliRecPtr,
XLogSegNo readSegNo);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void); static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr); static void PreallocXlogFiles(XLogRecPtr endptr);
...@@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata,
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader) if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
XL_ROUTINE(), NULL);
if (!debug_reader) if (!debug_reader)
{ {
...@@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, ...@@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt) bool fetching_ckpt)
{ {
XLogRecord *record; XLogRecord *record;
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
private->emode = emode;
private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
/* This is the first attempt to read this page. */ /* This is the first attempt to read this page. */
lastSourceFailed = false; lastSourceFailed = false;
...@@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, ...@@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
for (;;) for (;;)
{ {
char *errormsg; char *errormsg;
XLogReadRecordResult result;
while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
== XLREAD_NEED_DATA)
{
if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
break;
}
record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr; ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr; EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL) if (record == NULL)
...@@ -6457,7 +6450,6 @@ StartupXLOG(void) ...@@ -6457,7 +6450,6 @@ StartupXLOG(void)
bool backupFromStandby = false; bool backupFromStandby = false;
DBState dbstate_at_startup; DBState dbstate_at_startup;
XLogReaderState *xlogreader; XLogReaderState *xlogreader;
XLogPageReadPrivate private;
bool promoted = false; bool promoted = false;
struct stat st; struct stat st;
...@@ -6616,13 +6608,9 @@ StartupXLOG(void) ...@@ -6616,13 +6608,9 @@ StartupXLOG(void)
OwnLatch(&XLogCtl->recoveryWakeupLatch); OwnLatch(&XLogCtl->recoveryWakeupLatch);
/* Set up XLOG reader facility */ /* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
xlogreader = xlogreader =
XLogReaderAllocate(wal_segment_size, NULL, XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
XL_ROUTINE(.page_read = &XLogPageRead,
.segment_open = NULL,
.segment_close = wal_segment_close),
&private);
if (!xlogreader) if (!xlogreader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
...@@ -7819,7 +7807,8 @@ StartupXLOG(void) ...@@ -7819,7 +7807,8 @@ StartupXLOG(void)
XLogRecPtr pageBeginPtr; XLogRecPtr pageBeginPtr;
pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) ==
XLogSegmentOffset(pageBeginPtr, wal_segment_size));
firstIdx = XLogRecPtrToBufIdx(EndOfLog); firstIdx = XLogRecPtrToBufIdx(EndOfLog);
...@@ -12107,13 +12096,15 @@ CancelBackup(void) ...@@ -12107,13 +12096,15 @@ CancelBackup(void)
* XLogPageRead() to try fetching the record from another source, or to * XLogPageRead() to try fetching the record from another source, or to
* sleep and retry. * sleep and retry.
*/ */
static int static bool
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogPageRead(XLogReaderState *state,
XLogRecPtr targetRecPtr, char *readBuf) bool fetching_ckpt, int emode, bool randAccess)
{ {
XLogPageReadPrivate *private = char *readBuf = state->readBuf;
(XLogPageReadPrivate *) xlogreader->private_data; XLogRecPtr targetPagePtr = state->readPagePtr;
int emode = private->emode; int reqLen = state->reqLen;
int readLen = 0;
XLogRecPtr targetRecPtr = state->ReadRecPtr;
uint32 targetPageOff; uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r; int r;
...@@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, ...@@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
* is not in the currently open one. * is not in the currently open one.
*/ */
if (readFile >= 0 && if (readFile >= 0 &&
!XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size))
{ {
/* /*
* Request a restartpoint if we've replayed too much xlog since the * Request a restartpoint if we've replayed too much xlog since the
...@@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, ...@@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
*/ */
if (bgwriterLaunched) if (bgwriterLaunched)
{ {
if (XLogCheckpointNeeded(readSegNo)) if (XLogCheckpointNeeded(state->seg.ws_segno))
{ {
(void) GetRedoRecPtr(); (void) GetRedoRecPtr();
if (XLogCheckpointNeeded(readSegNo)) if (XLogCheckpointNeeded(state->seg.ws_segno))
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
} }
} }
...@@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, ...@@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
readSource = XLOG_FROM_ANY; readSource = XLOG_FROM_ANY;
} }
XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size);
retry: retry:
/* See if we need to retrieve more data */ /* See if we need to retrieve more data */
...@@ -12156,17 +12147,15 @@ retry: ...@@ -12156,17 +12147,15 @@ retry:
flushedUpto < targetPagePtr + reqLen)) flushedUpto < targetPagePtr + reqLen))
{ {
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
private->randAccess, randAccess, fetching_ckpt,
private->fetching_ckpt, targetRecPtr, state->seg.ws_segno))
targetRecPtr))
{ {
if (readFile >= 0) if (readFile >= 0)
close(readFile); close(readFile);
readFile = -1; readFile = -1;
readLen = 0;
readSource = XLOG_FROM_ANY; readSource = XLOG_FROM_ANY;
XLogReaderSetInputData(state, -1);
return -1; return false;
} }
} }
...@@ -12193,40 +12182,36 @@ retry: ...@@ -12193,40 +12182,36 @@ retry:
else else
readLen = XLOG_BLCKSZ; readLen = XLOG_BLCKSZ;
/* Read the requested page */
readOff = targetPageOff;
pgstat_report_wait_start(WAIT_EVENT_WAL_READ); pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff);
if (r != XLOG_BLCKSZ) if (r != XLOG_BLCKSZ)
{ {
char fname[MAXFNAMELEN]; char fname[MAXFNAMELEN];
int save_errno = errno; int save_errno = errno;
pgstat_report_wait_end(); pgstat_report_wait_end();
XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size);
if (r < 0) if (r < 0)
{ {
errno = save_errno; errno = save_errno;
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m", errmsg("could not read from log segment %s, offset %u: %m",
fname, readOff))); fname, targetPageOff)));
} }
else else
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode(ERRCODE_DATA_CORRUPTED), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read from log segment %s, offset %u: read %d of %zu", errmsg("could not read from log segment %s, offset %u: read %d of %zu",
fname, readOff, r, (Size) XLOG_BLCKSZ))); fname, targetPageOff, r, (Size) XLOG_BLCKSZ)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
pgstat_report_wait_end(); pgstat_report_wait_end();
Assert(targetSegNo == readSegNo); Assert(targetSegNo == state->seg.ws_segno);
Assert(targetPageOff == readOff); Assert(readLen >= reqLen);
Assert(reqLen <= readLen);
xlogreader->seg.ws_tli = curFileTLI; state->seg.ws_tli = curFileTLI;
/* /*
* Check the page header immediately, so that we can retry immediately if * Check the page header immediately, so that we can retry immediately if
...@@ -12254,14 +12239,16 @@ retry: ...@@ -12254,14 +12239,16 @@ retry:
* Validating the page header is cheap enough that doing it twice * Validating the page header is cheap enough that doing it twice
* shouldn't be a big deal from a performance point of view. * shouldn't be a big deal from a performance point of view.
*/ */
if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf))
{ {
/* reset any error XLogReaderValidatePageHeader() might have set */ /* reset any error StateValidatePageHeader() might have set */
xlogreader->errormsg_buf[0] = '\0'; state->errormsg_buf[0] = '\0';
goto next_record_is_invalid; goto next_record_is_invalid;
} }
return readLen; Assert(state->readPagePtr == targetPagePtr);
XLogReaderSetInputData(state, readLen);
return true;
next_record_is_invalid: next_record_is_invalid:
lastSourceFailed = true; lastSourceFailed = true;
...@@ -12269,14 +12256,14 @@ next_record_is_invalid: ...@@ -12269,14 +12256,14 @@ next_record_is_invalid:
if (readFile >= 0) if (readFile >= 0)
close(readFile); close(readFile);
readFile = -1; readFile = -1;
readLen = 0;
readSource = XLOG_FROM_ANY; readSource = XLOG_FROM_ANY;
/* In standby-mode, keep trying */ /* In standby-mode, keep trying */
if (StandbyMode) if (StandbyMode)
goto retry; goto retry;
else
return -1; XLogReaderSetInputData(state, -1);
return false;
} }
/* /*
...@@ -12307,7 +12294,8 @@ next_record_is_invalid: ...@@ -12307,7 +12294,8 @@ next_record_is_invalid:
*/ */
static bool static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr) bool fetching_ckpt, XLogRecPtr tliRecPtr,
XLogSegNo readSegNo)
{ {
static TimestampTz last_fail_time = 0; static TimestampTz last_fail_time = 0;
TimestampTz now; TimestampTz now;
......
...@@ -36,11 +36,11 @@ ...@@ -36,11 +36,11 @@
static void report_invalid_record(XLogReaderState *state, const char *fmt,...) static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
pg_attribute_printf(2, 3); pg_attribute_printf(2, 3);
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
int reqLen); int reqLen, bool header_inclusive);
static void XLogReaderInvalReadState(XLogReaderState *state); static void XLogReaderInvalReadState(XLogReaderState *state);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); XLogRecPtr PrevRecPtr, XLogRecord *record);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr); XLogRecPtr recptr);
static void ResetDecoder(XLogReaderState *state); static void ResetDecoder(XLogReaderState *state);
...@@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) ...@@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/ */
XLogReaderState * XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderAllocate(int wal_segment_size, const char *waldir,
XLogReaderRoutine *routine, void *private_data) WALSegmentCleanupCB cleanup_cb)
{ {
XLogReaderState *state; XLogReaderState *state;
...@@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
return NULL; return NULL;
/* initialize caller-provided support functions */ /* initialize caller-provided support functions */
state->routine = *routine; state->cleanup_cb = cleanup_cb;
state->max_block_id = -1; state->max_block_id = -1;
...@@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir); waldir);
/* system_identifier initialized to zeroes above */ /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
MCXT_ALLOC_NO_OOM); MCXT_ALLOC_NO_OOM);
if (!state->errormsg_buf) if (!state->errormsg_buf)
...@@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state) ...@@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state)
{ {
int block_id; int block_id;
if (state->seg.ws_file != -1) if (state->seg.ws_file >= 0)
state->routine.segment_close(state); state->cleanup_cb(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{ {
...@@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */ /* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr; state->EndRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr; state->ReadRecPtr = InvalidXLogRecPtr;
state->readRecordState = XLREAD_NEXT_RECORD;
} }
/* /*
...@@ -254,140 +253,191 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -254,140 +253,191 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord(). * to XLogReadRecord().
* *
* If the page_read callback fails to read the requested data, NULL is * This function may return XLREAD_NEED_DATA several times before returning a
* returned. The callback is expected to have reported the error; errormsg * result record. The caller shall read in some new data then call this
* is set to NULL. * function again with the same parameters.
*
* When a record is successfully read, returns XLREAD_SUCCESS with result
* record being stored in *record. Otherwise *record is NULL.
*
* Returns XLREAD_NEED_DATA if more data is needed to finish decoding the
* current record. In that case, state->readPagePtr and state->reqLen inform
* the desired position and minimum length of data needed. The caller shall
* read in the requested data and set state->readBuf to point to a buffer
* containing it. The caller must also set state->seg->ws_tli and
* state->readLen to indicate the timeline that it was read from, and the
* length of data that is now available (which must be >= given reqLen),
* respectively.
*
* If invalid data is encountered, returns XLREAD_FAIL and sets *record to
* NULL. *errormsg is set to a string with details of the failure. The
* returned pointer (or *errormsg) points to an internal buffer that's valid
* until the next call to XLogReadRecord.
*
*
* This function runs a state machine consisting of the following states.
* *
* If the reading fails for some other reason, NULL is also returned, and * XLREAD_NEXT_RECORD:
* *errormsg is set to a string with details of the failure. * The initial state. If called with a valid XLogRecPtr, try to read a
* record at that position. If invalid RecPtr is given try to read a record
* just after the last one read. The next state is XLREAD_TOT_LEN.
* *
* The returned pointer (or *errormsg) points to an internal buffer that's * XLREAD_TOT_LEN:
* valid until the next call to XLogReadRecord. * Examining record header. Ends after reading record length.
* recordRemainLen and recordGotLen are initialized. The next state is
* XLREAD_FIRST_FRAGMENT.
*
* XLREAD_FIRST_FRAGMENT:
* Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or
* XLREAD_CONTINUATION if we need more data.
* XLREAD_CONTINUATION:
* Reading continuation of record. If the whole record is now decoded, goes
* to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how
* much is left.
*
* If invalid data is found in any state, the state machine stays at the
* current state. This behavior allows us to continue reading a record
* after switching to a different source, during streaming replication.
*/ */
XLogRecord * XLogReadRecordResult
XLogReadRecord(XLogReaderState *state, char **errormsg) XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
{ {
XLogRecPtr RecPtr; XLogRecord *prec;
XLogRecord *record;
XLogRecPtr targetPagePtr;
bool randAccess;
uint32 len,
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
bool gotheader;
int readOff;
/* *record = NULL;
* randAccess indicates whether to verify the previous-record pointer of
* the record we're reading. We only do this if we're reading
* sequentially, which is what we initially assume.
*/
randAccess = false;
/* reset error state */ /* reset error state */
*errormsg = NULL; *errormsg = NULL;
state->errormsg_buf[0] = '\0'; state->errormsg_buf[0] = '\0';
switch (state->readRecordState)
{
case XLREAD_NEXT_RECORD:
ResetDecoder(state); ResetDecoder(state);
RecPtr = state->EndRecPtr;
if (state->ReadRecPtr != InvalidXLogRecPtr) if (state->ReadRecPtr != InvalidXLogRecPtr)
{ {
/* read the record after the one we just read */ /* read the record after the one we just read */
/* /*
* EndRecPtr is pointing to end+1 of the previous WAL record. If * EndRecPtr is pointing to end+1 of the previous WAL record.
* we're at a page boundary, no more records can fit on the current * If we're at a page boundary, no more records can fit on the
* page. We must skip over the page header, but we can't do that until * current page. We must skip over the page header, but we
* we've read in the page, since the header size is variable. * can't do that until we've read in the page, since the
* header size is variable.
*/ */
state->PrevRecPtr = state->ReadRecPtr;
state->ReadRecPtr = state->EndRecPtr;
} }
else else
{ {
/* /*
* Caller supplied a position to start at. * Caller supplied a position to start at.
* *
* In this case, EndRecPtr should already be pointing to a valid * In this case, EndRecPtr should already be pointing to a
* record starting position. * valid record starting position.
*/
Assert(XRecOffIsValid(state->EndRecPtr));
state->ReadRecPtr = state->EndRecPtr;
/*
* We cannot verify the previous-record pointer when we're
* seeking to a particular record. Reset PrevRecPtr so that we
* won't try doing that.
*/ */
Assert(XRecOffIsValid(RecPtr)); state->PrevRecPtr = InvalidXLogRecPtr;
randAccess = true; state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */
} }
state->currRecPtr = RecPtr; state->record_verified = false;
state->readRecordState = XLREAD_TOT_LEN;
/* fall through */
case XLREAD_TOT_LEN:
{
uint32 total_len;
uint32 pageHeaderSize;
XLogRecPtr targetPagePtr;
uint32 targetRecOff;
XLogPageHeader pageHeader;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); targetPagePtr =
targetRecOff = RecPtr % XLOG_BLCKSZ; state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
/* /*
* Read the page containing the record into state->readBuf. Request enough * Check if we have enough data. For the first record in the
* byte to cover the whole record header, or at least the part of it that * page, the requesting length doesn't contain page header.
* fits on the same page.
*/ */
readOff = ReadPageInternal(state, targetPagePtr, if (XLogNeedData(state, targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
if (readOff < 0) targetRecOff != 0))
return XLREAD_NEED_DATA;
/* error out if caller supplied bogus page */
if (!state->page_verified)
goto err; goto err;
/* /* examine page header now. */
* ReadPageInternal always returns at least the page header, so we can pageHeaderSize =
* examine it now. XLogPageHeaderSize((XLogPageHeader) state->readBuf);
*/
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
if (targetRecOff == 0) if (targetRecOff == 0)
{ {
/* /* At page start, so skip over page header. */
* At page start, so skip over page header. state->ReadRecPtr += pageHeaderSize;
*/
RecPtr += pageHeaderSize;
targetRecOff = pageHeaderSize; targetRecOff = pageHeaderSize;
} }
else if (targetRecOff < pageHeaderSize) else if (targetRecOff < pageHeaderSize)
{ {
report_invalid_record(state, "invalid record offset at %X/%X", report_invalid_record(state, "invalid record offset at %X/%X",
LSN_FORMAT_ARGS(RecPtr)); LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err; goto err;
} }
if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && pageHeader = (XLogPageHeader) state->readBuf;
if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
targetRecOff == pageHeaderSize) targetRecOff == pageHeaderSize)
{ {
report_invalid_record(state, "contrecord is requested by %X/%X", report_invalid_record(state, "contrecord is requested by %X/%X",
LSN_FORMAT_ARGS(RecPtr)); (uint32) (state->ReadRecPtr >> 32),
(uint32) state->ReadRecPtr);
goto err; goto err;
} }
/* ReadPageInternal has verified the page header */ /* XLogNeedData has verified the page header */
Assert(pageHeaderSize <= readOff); Assert(pageHeaderSize <= state->readLen);
/* /*
* Read the record length. * Read the record length.
* *
* NB: Even though we use an XLogRecord pointer here, the whole record * NB: Even though we use an XLogRecord pointer here, the
* header might not fit on this page. xl_tot_len is the first field of the * whole record header might not fit on this page. xl_tot_len
* struct, so it must be on this page (the records are MAXALIGNed), but we * is the first field of the struct, so it must be on this
* cannot access any other fields until we've verified that we got the * page (the records are MAXALIGNed), but we cannot access any
* whole header. * other fields until we've verified that we got the whole
* header.
*/ */
record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); prec = (XLogRecord *) (state->readBuf +
total_len = record->xl_tot_len; state->ReadRecPtr % XLOG_BLCKSZ);
total_len = prec->xl_tot_len;
/* /*
* If the whole record header is on this page, validate it immediately. * If the whole record header is on this page, validate it
* Otherwise do just a basic sanity check on xl_tot_len, and validate the * immediately. Otherwise do just a basic sanity check on
* rest of the header after reading it from the next page. The xl_tot_len * xl_tot_len, and validate the rest of the header after
* check is necessary here to ensure that we enter the "Need to reassemble * reading it from the next page. The xl_tot_len check is
* record" code path below; otherwise we might fail to apply * necessary here to ensure that we enter the
* ValidXLogRecordHeader at all. * XLREAD_CONTINUATION state below; otherwise we might fail to
* apply ValidXLogRecordHeader at all.
*/ */
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{ {
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
randAccess)) state->PrevRecPtr, prec))
goto err; goto err;
gotheader = true;
state->record_verified = true;
} }
else else
{ {
...@@ -396,161 +446,263 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) ...@@ -396,161 +446,263 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
{ {
report_invalid_record(state, report_invalid_record(state,
"invalid record length at %X/%X: wanted %u, got %u", "invalid record length at %X/%X: wanted %u, got %u",
LSN_FORMAT_ARGS(RecPtr), LSN_FORMAT_ARGS(state->ReadRecPtr),
(uint32) SizeOfXLogRecord, total_len); (uint32) SizeOfXLogRecord, total_len);
goto err; goto err;
} }
gotheader = false;
} }
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; /*
if (total_len > len) * Wait for the rest of the record, or the part of the record
* that fit on the first page if crossed a page boundary, to
* become available.
*/
state->recordGotLen = 0;
state->recordRemainLen = total_len;
state->readRecordState = XLREAD_FIRST_FRAGMENT;
}
/* fall through */
case XLREAD_FIRST_FRAGMENT:
{ {
/* Need to reassemble record */ uint32 total_len = state->recordRemainLen;
char *contdata; uint32 request_len;
XLogPageHeader pageHeader; uint32 record_len;
char *buffer; XLogRecPtr targetPagePtr;
uint32 gotlen; uint32 targetRecOff;
/*
* Wait for the rest of the record on the first page to become
* available
*/
targetPagePtr =
state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
record_len = request_len - targetRecOff;
/* ReadRecPtr contains page header */
Assert(targetRecOff != 0);
if (XLogNeedData(state, targetPagePtr, request_len, true))
return XLREAD_NEED_DATA;
/* error out if caller supplied bogus page */
if (!state->page_verified)
goto err;
prec = (XLogRecord *) (state->readBuf + targetRecOff);
/* validate record header if not yet */
if (!state->record_verified && record_len >= SizeOfXLogRecord)
{
if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
state->PrevRecPtr, prec))
goto err;
state->record_verified = true;
}
if (total_len == record_len)
{
/* Record does not cross a page boundary */
Assert(state->record_verified);
if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
goto err;
state->record_verified = true; /* to be tidy */
/* We already checked the header earlier */
state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len);
*record = prec;
state->readRecordState = XLREAD_NEXT_RECORD;
break;
}
/* /*
* Enlarge readRecordBuf as needed. * The record continues on the next page. Need to reassemble
* record
*/ */
Assert(total_len > record_len);
/* Enlarge readRecordBuf as needed. */
if (total_len > state->readRecordBufSize && if (total_len > state->readRecordBufSize &&
!allocate_recordbuf(state, total_len)) !allocate_recordbuf(state, total_len))
{ {
/* We treat this as a "bogus data" condition */ /* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long", report_invalid_record(state,
total_len, LSN_FORMAT_ARGS(RecPtr)); "record length %u at %X/%X too long",
total_len,
LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err; goto err;
} }
/* Copy the first fragment of the record from the first page. */ /* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf, memcpy(state->readRecordBuf, state->readBuf + targetRecOff,
state->readBuf + RecPtr % XLOG_BLCKSZ, len); record_len);
buffer = state->readRecordBuf + len; state->recordGotLen += record_len;
gotlen = len; state->recordRemainLen -= record_len;
do
{
/* Calculate pointer to beginning of next page */ /* Calculate pointer to beginning of next page */
targetPagePtr += XLOG_BLCKSZ; state->recordContRecPtr = state->ReadRecPtr + record_len;
Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
state->readRecordState = XLREAD_CONTINUATION;
}
/* fall through */
case XLREAD_CONTINUATION:
{
XLogPageHeader pageHeader = NULL;
uint32 pageHeaderSize;
XLogRecPtr targetPagePtr = InvalidXLogRecPtr;
/*
* we enter this state only if we haven't read the whole
* record.
*/
Assert(state->recordRemainLen > 0);
while (state->recordRemainLen > 0)
{
char *contdata;
uint32 request_len PG_USED_FOR_ASSERTS_ONLY;
uint32 record_len;
/* Wait for the next page to become available */ /* Wait for the next page to become available */
readOff = ReadPageInternal(state, targetPagePtr, targetPagePtr = state->recordContRecPtr;
Min(total_len - gotlen + SizeOfXLogShortPHD,
XLOG_BLCKSZ));
if (readOff < 0) /* this request contains page header */
Assert(targetPagePtr != 0);
if (XLogNeedData(state, targetPagePtr,
Min(state->recordRemainLen, XLOG_BLCKSZ),
false))
return XLREAD_NEED_DATA;
if (!state->page_verified)
goto err; goto err;
Assert(SizeOfXLogShortPHD <= readOff); Assert(SizeOfXLogShortPHD <= state->readLen);
/* Check that the continuation on next page looks valid */ /* Check that the continuation on next page looks valid */
pageHeader = (XLogPageHeader) state->readBuf; pageHeader = (XLogPageHeader) state->readBuf;
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{ {
report_invalid_record(state, report_invalid_record(
"there is no contrecord flag at %X/%X", state,
LSN_FORMAT_ARGS(RecPtr)); "there is no contrecord flag at %X/%X reading %X/%X",
(uint32) (state->recordContRecPtr >> 32),
(uint32) state->recordContRecPtr,
(uint32) (state->ReadRecPtr >> 32),
(uint32) state->ReadRecPtr);
goto err; goto err;
} }
/* /*
* Cross-check that xlp_rem_len agrees with how much of the record * Cross-check that xlp_rem_len agrees with how much of
* we expect there to be left. * the record we expect there to be left.
*/ */
if (pageHeader->xlp_rem_len == 0 || if (pageHeader->xlp_rem_len == 0 ||
total_len != (pageHeader->xlp_rem_len + gotlen)) pageHeader->xlp_rem_len != state->recordRemainLen)
{ {
report_invalid_record(state, report_invalid_record(
"invalid contrecord length %u (expected %lld) at %X/%X", state,
"invalid contrecord length %u at %X/%X reading %X/%X, expected %u",
pageHeader->xlp_rem_len, pageHeader->xlp_rem_len,
((long long) total_len) - gotlen, (uint32) (state->recordContRecPtr >> 32),
LSN_FORMAT_ARGS(RecPtr)); (uint32) state->recordContRecPtr,
(uint32) (state->ReadRecPtr >> 32),
(uint32) state->ReadRecPtr,
state->recordRemainLen);
goto err; goto err;
} }
/* Append the continuation from this page to the buffer */ /* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader); pageHeaderSize = XLogPageHeaderSize(pageHeader);
if (readOff < pageHeaderSize) /*
readOff = ReadPageInternal(state, targetPagePtr, * XLogNeedData should have ensured that the whole page
pageHeaderSize); * header was read
*/
Assert(pageHeaderSize <= readOff); Assert(pageHeaderSize <= state->readLen);
contdata = (char *) state->readBuf + pageHeaderSize; contdata = (char *) state->readBuf + pageHeaderSize;
len = XLOG_BLCKSZ - pageHeaderSize; record_len = XLOG_BLCKSZ - pageHeaderSize;
if (pageHeader->xlp_rem_len < len) if (pageHeader->xlp_rem_len < record_len)
len = pageHeader->xlp_rem_len; record_len = pageHeader->xlp_rem_len;
request_len = record_len + pageHeaderSize;
if (readOff < pageHeaderSize + len) /*
readOff = ReadPageInternal(state, targetPagePtr, * XLogNeedData should have ensured all needed data was
pageHeaderSize + len); * read
*/
Assert(request_len <= state->readLen);
memcpy(buffer, (char *) contdata, len); memcpy(state->readRecordBuf + state->recordGotLen,
buffer += len; (char *) contdata, record_len);
gotlen += len; state->recordGotLen += record_len;
state->recordRemainLen -= record_len;
/* If we just reassembled the record header, validate it. */ /* If we just reassembled the record header, validate it. */
if (!gotheader) if (!state->record_verified)
{ {
record = (XLogRecord *) state->readRecordBuf; Assert(state->recordGotLen >= SizeOfXLogRecord);
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
record, randAccess)) state->PrevRecPtr,
(XLogRecord *) state->readRecordBuf))
goto err; goto err;
gotheader = true;
state->record_verified = true;
} }
} while (gotlen < total_len);
Assert(gotheader); /*
* Calculate pointer to beginning of next page, and
* continue
*/
state->recordContRecPtr += XLOG_BLCKSZ;
}
record = (XLogRecord *) state->readRecordBuf; /* targetPagePtr is pointing the last-read page here */
if (!ValidXLogRecord(state, record, RecPtr)) prec = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
goto err; goto err;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); pageHeaderSize =
state->ReadRecPtr = RecPtr; XLogPageHeaderSize((XLogPageHeader) state->readBuf);
state->EndRecPtr = targetPagePtr + pageHeaderSize state->EndRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len); + MAXALIGN(pageHeader->xlp_rem_len);
}
else
{
/* Wait for the record data to become available */
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + total_len, XLOG_BLCKSZ));
if (readOff < 0)
goto err;
/* Record does not cross a page boundary */
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
state->EndRecPtr = RecPtr + MAXALIGN(total_len);
state->ReadRecPtr = RecPtr; *record = prec;
state->readRecordState = XLREAD_NEXT_RECORD;
break;
}
} }
/* /*
* Special processing if it's an XLOG SWITCH record * Special processing if it's an XLOG SWITCH record
*/ */
if (record->xl_rmid == RM_XLOG_ID && if ((*record)->xl_rmid == RM_XLOG_ID &&
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{ {
/* Pretend it extends to end of segment */ /* Pretend it extends to end of segment */
state->EndRecPtr += state->segcxt.ws_segsize - 1; state->EndRecPtr += state->segcxt.ws_segsize - 1;
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
} }
if (DecodeXLogRecord(state, record, errormsg)) if (DecodeXLogRecord(state, *record, errormsg))
return record; return XLREAD_SUCCESS;
else
return NULL; *record = NULL;
return XLREAD_FAIL;
err: err:
/* /*
* Invalidate the read state. We might read from a different source after * Invalidate the read page. We might read from a different source after
* failure. * failure.
*/ */
XLogReaderInvalReadState(state); XLogReaderInvalReadState(state);
...@@ -558,113 +710,141 @@ err: ...@@ -558,113 +710,141 @@ err:
if (state->errormsg_buf[0] != '\0') if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf; *errormsg = state->errormsg_buf;
return NULL; *record = NULL;
return XLREAD_FAIL;
} }
/* /*
* Read a single xlog page including at least [pageptr, reqLen] of valid data * Checks that an xlog page loaded in state->readBuf is including at least
* via the page_read() callback. * [pageptr, reqLen] and the page is valid. header_inclusive indicates that
* reqLen is calculated including page header length.
* *
* Returns -1 if the required page cannot be read for some reason; errormsg_buf * Returns false if the buffer already contains the requested data, or found
* is set in that case (unless the error occurs in the page_read callback). * error. state->page_verified is set to true for the former and false for the
* latter.
* *
* We fetch the page from a reader-local cache if we know we have the required * Otherwise returns true and requests data loaded onto state->readBuf by
* data and if there hasn't been any error since caching the data. * state->readPagePtr and state->readLen. The caller shall call this function
* again after filling the buffer at least with that portion of data and set
* state->readLen to the length of actually loaded data.
*
* If header_inclusive is false, corrects reqLen internally by adding the
* actual page header length and may request caller for new data.
*/ */
static int static bool
ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
bool header_inclusive)
{ {
int readLen;
uint32 targetPageOff; uint32 targetPageOff;
XLogSegNo targetSegNo; XLogSegNo targetSegNo;
XLogPageHeader hdr; uint32 addLen = 0;
Assert((pageptr % XLOG_BLCKSZ) == 0); /* Some data is loaded, but page header is not verified yet. */
if (!state->page_verified &&
!XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0)
{
uint32 pageHeaderSize;
XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); /* just loaded new data so needs to verify page header */
targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
/* check whether we have all the requested data already */ /* The caller must have loaded at least page header */
if (targetSegNo == state->seg.ws_segno && Assert(state->readLen >= SizeOfXLogShortPHD);
targetPageOff == state->segoff && reqLen <= state->readLen)
return state->readLen;
/* /*
* Data is not in our buffer. * We have enough data to check the header length. Recheck the loaded
* * length against the actual header length.
* Every time we actually read the segment, even if we looked at parts of
* it before, we need to do verification as the page_read callback might
* now be rereading data from a different source.
*
* Whenever switching to a new WAL segment, we read the first page of the
* file and validate its header, even if that's not where the target
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header.
*/ */
if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
/* Request more data if we don't have the full header. */
if (state->readLen < pageHeaderSize)
{ {
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; state->reqLen = pageHeaderSize;
return true;
}
readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, /* Now that we know we have the full header, validate it. */
state->currRecPtr, if (!XLogReaderValidatePageHeader(state, state->readPagePtr,
state->readBuf); (char *) state->readBuf))
if (readLen < 0) {
goto err; /* That's bad. Force reading the page again. */
XLogReaderInvalReadState(state);
return false;
}
/* we can be sure to have enough WAL available, we scrolled back */ state->page_verified = true;
Assert(readLen == XLOG_BLCKSZ);
if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, XLByteToSeg(state->readPagePtr, state->seg.ws_segno,
state->readBuf)) state->segcxt.ws_segsize);
goto err;
} }
/* /*
* First, read the requested data length, but at least a short page header * The loaded page may not be the one caller is supposing to read when we
* so that we can validate it. * are verifying the first page of new segment. In that case, skip further
* verification and immediately load the target page.
*/ */
readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), if (state->page_verified && pageptr == state->readPagePtr)
state->currRecPtr, {
state->readBuf); /*
if (readLen < 0) * calculate additional length for page header keeping the total
goto err; * length within the block size.
*/
if (!header_inclusive)
{
uint32 pageHeaderSize =
XLogPageHeaderSize((XLogPageHeader) state->readBuf);
Assert(readLen <= XLOG_BLCKSZ); addLen = pageHeaderSize;
if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
addLen = pageHeaderSize;
else
addLen = XLOG_BLCKSZ - reqLen;
}
/* Do we have enough data to check the header length? */ /* Return if we already have it. */
if (readLen <= SizeOfXLogShortPHD) if (reqLen + addLen <= state->readLen)
goto err; return false;
}
Assert(readLen >= reqLen); /* Data is not in our buffer, request the caller for it. */
XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
Assert((pageptr % XLOG_BLCKSZ) == 0);
hdr = (XLogPageHeader) state->readBuf; /*
* Every time we request to load new data of a page to the caller, even if
* we looked at a part of it before, we need to do verification on the
* next invocation as the caller might now be rereading data from a
* different source.
*/
state->page_verified = false;
/* still not enough */ /*
if (readLen < XLogPageHeaderSize(hdr)) * Whenever switching to a new WAL segment, we read the first page of the
* file and validate its header, even if that's not where the target
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header. Don't do this
* if the caller requested the first page in the segment.
*/
if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{ {
readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), /*
state->currRecPtr, * Then we'll see that the targetSegNo now matches the ws_segno, and
state->readBuf); * will not come back here, but will request the actual target page.
if (readLen < 0) */
goto err; state->readPagePtr = pageptr - targetPageOff;
state->reqLen = XLOG_BLCKSZ;
return true;
} }
/* /*
* Now that we know we have the full header, validate it. * Request the caller to load the page. We need at least a short page
* header so that we can validate it.
*/ */
if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) state->readPagePtr = pageptr;
goto err; state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
return true;
/* update read state information */
state->seg.ws_segno = targetSegNo;
state->segoff = targetPageOff;
state->readLen = readLen;
return readLen;
err:
XLogReaderInvalReadState(state);
return -1;
} }
/* /*
...@@ -673,9 +853,7 @@ err: ...@@ -673,9 +853,7 @@ err:
static void static void
XLogReaderInvalReadState(XLogReaderState *state) XLogReaderInvalReadState(XLogReaderState *state)
{ {
state->seg.ws_segno = 0; state->readPagePtr = InvalidXLogRecPtr;
state->segoff = 0;
state->readLen = 0;
} }
/* /*
...@@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state) ...@@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state)
* *
* This is just a convenience subroutine to avoid duplicated code in * This is just a convenience subroutine to avoid duplicated code in
* XLogReadRecord. It's not intended for use from anywhere else. * XLogReadRecord. It's not intended for use from anywhere else.
*
* If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
*/ */
static bool static bool
ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record, XLogRecPtr PrevRecPtr, XLogRecord *record)
bool randAccess)
{ {
if (record->xl_tot_len < SizeOfXLogRecord) if (record->xl_tot_len < SizeOfXLogRecord)
{ {
...@@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, ...@@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
return false; return false;
} }
if (randAccess) if (PrevRecPtr == InvalidXLogRecPtr)
{ {
/* /*
* We can't exactly verify the prev-link, but surely it should be less * We can't exactly verify the prev-link, but surely it should be less
...@@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, ...@@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* here. * here.
*/ */
XLogFindNextRecordState *
InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr)
{
XLogFindNextRecordState *state = (XLogFindNextRecordState *)
palloc_extended(sizeof(XLogFindNextRecordState),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
if (!state)
return NULL;
state->reader_state = reader_state;
state->targetRecPtr = start_ptr;
state->currRecPtr = start_ptr;
return state;
}
/* /*
* Find the first record with an lsn >= RecPtr. * Find the first record with an lsn >= RecPtr.
* *
...@@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, ...@@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* This positions the reader, like XLogBeginRead(), so that the next call to * This positions the reader, like XLogBeginRead(), so that the next call to
* XLogReadRecord() will read the next valid record. * XLogReadRecord() will read the next valid record.
*/ */
XLogRecPtr bool
XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogFindNextRecord(XLogFindNextRecordState *state)
{ {
XLogRecPtr tmpRecPtr;
XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header; XLogPageHeader header;
XLogRecord *record;
XLogReadRecordResult result;
char *errormsg; char *errormsg;
Assert(!XLogRecPtrIsInvalid(RecPtr)); Assert(!XLogRecPtrIsInvalid(state->currRecPtr));
/* /*
* skip over potential continuation data, keeping in mind that it may span * skip over potential continuation data, keeping in mind that it may span
* multiple pages * multiple pages
*/ */
tmpRecPtr = RecPtr;
while (true) while (true)
{ {
XLogRecPtr targetPagePtr; XLogRecPtr targetPagePtr;
int targetRecOff; int targetRecOff;
uint32 pageHeaderSize; uint32 pageHeaderSize;
int readLen;
/* /*
* Compute targetRecOff. It should typically be equal or greater than * Compute targetRecOff. It should typically be equal or greater than
...@@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* that, except when caller has explicitly specified the offset that * that, except when caller has explicitly specified the offset that
* falls somewhere there or when we are skipping multi-page * falls somewhere there or when we are skipping multi-page
* continuation record. It doesn't matter though because * continuation record. It doesn't matter though because
* ReadPageInternal() is prepared to handle that and will read at * XLogNeedData() is prepared to handle that and will read at least
* least short page-header worth of data * short page-header worth of data
*/ */
targetRecOff = tmpRecPtr % XLOG_BLCKSZ; targetRecOff = state->currRecPtr % XLOG_BLCKSZ;
/* scroll back to page boundary */ /* scroll back to page boundary */
targetPagePtr = tmpRecPtr - targetRecOff; targetPagePtr = state->currRecPtr - targetRecOff;
/* Read the page containing the record */ if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff,
readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); targetRecOff != 0))
if (readLen < 0) return true;
if (!state->reader_state->page_verified)
goto err; goto err;
header = (XLogPageHeader) state->readBuf; header = (XLogPageHeader) state->reader_state->readBuf;
pageHeaderSize = XLogPageHeaderSize(header); pageHeaderSize = XLogPageHeaderSize(header);
/* make sure we have enough data for the page header */ /* we should have read the page header */
readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); Assert(state->reader_state->readLen >= pageHeaderSize);
if (readLen < 0)
goto err;
/* skip over potential continuation data */ /* skip over potential continuation data */
if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
...@@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* Note that record headers are MAXALIGN'ed * Note that record headers are MAXALIGN'ed
*/ */
if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; state->currRecPtr = targetPagePtr + XLOG_BLCKSZ;
else else
{ {
/* /*
* The previous continuation record ends in this page. Set * The previous continuation record ends in this page. Set
* tmpRecPtr to point to the first valid record * state->currRecPtr to point to the first valid record
*/ */
tmpRecPtr = targetPagePtr + pageHeaderSize state->currRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(header->xlp_rem_len); + MAXALIGN(header->xlp_rem_len);
break; break;
} }
} }
else else
{ {
tmpRecPtr = targetPagePtr + pageHeaderSize; state->currRecPtr = targetPagePtr + pageHeaderSize;
break; break;
} }
} }
...@@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* because either we're at the first record after the beginning of a page * because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation. * or we just jumped over the remaining data of a continuation.
*/ */
XLogBeginRead(state, tmpRecPtr); XLogBeginRead(state->reader_state, state->currRecPtr);
while (XLogReadRecord(state, &errormsg) != NULL) while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) !=
XLREAD_FAIL)
{ {
if (result == XLREAD_NEED_DATA)
return true;
/* past the record we've found, break out */ /* past the record we've found, break out */
if (RecPtr <= state->ReadRecPtr) if (state->targetRecPtr <= state->reader_state->ReadRecPtr)
{ {
/* Rewind the reader to the beginning of the last record. */ /* Rewind the reader to the beginning of the last record. */
found = state->ReadRecPtr; state->currRecPtr = state->reader_state->ReadRecPtr;
XLogBeginRead(state, found); XLogBeginRead(state->reader_state, state->currRecPtr);
return found; return false;
} }
} }
err: err:
XLogReaderInvalReadState(state); XLogReaderInvalReadState(state->reader_state);
return InvalidXLogRecPtr; state->currRecPtr = InvalidXLogRecPtr;;
return false;
} }
#endif /* FRONTEND */ #endif /* FRONTEND */
/* /*
* Helper function to ease writing of XLogRoutine->page_read callbacks. * Helper function to ease writing of routines that read raw WAL data.
* If this function is used, caller must supply a segment_open callback in * If this function is used, caller must supply a segment_open callback and
* 'state', as that is used here. * segment_close callback as that is used here.
* *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'. * fetched from timeline 'tli'.
...@@ -1057,6 +1255,7 @@ err: ...@@ -1057,6 +1255,7 @@ err:
*/ */
bool bool
WALRead(XLogReaderState *state, WALRead(XLogReaderState *state,
WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALReadError *errinfo) WALReadError *errinfo)
{ {
...@@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state, ...@@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state,
XLogSegNo nextSegNo; XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0) if (state->seg.ws_file >= 0)
state->routine.segment_close(state); segclosefn(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
state->routine.segment_open(state, nextSegNo, &tli); segopenfn(state, nextSegNo, &tli);
/* This shouldn't happen -- indicates a bug in segment_open */ /* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0); Assert(state->seg.ws_file >= 0);
......
...@@ -686,8 +686,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, ...@@ -686,8 +686,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
void void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
{ {
const XLogRecPtr lastReadPage = (state->seg.ws_segno * const XLogRecPtr lastReadPage = state->readPagePtr;
state->segcxt.ws_segsize + state->segoff);
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ); Assert(wantLength <= XLOG_BLCKSZ);
...@@ -702,7 +701,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa ...@@ -702,7 +701,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* current TLI has since become historical. * current TLI has since become historical.
*/ */
if (lastReadPage == wantPage && if (lastReadPage == wantPage &&
state->readLen != 0 && state->page_verified &&
lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1)) lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1))
return; return;
...@@ -824,10 +823,12 @@ wal_segment_close(XLogReaderState *state) ...@@ -824,10 +823,12 @@ wal_segment_close(XLogReaderState *state)
* exists for normal backends, so we have to do a check/sleep/repeat style of * exists for normal backends, so we have to do a check/sleep/repeat style of
* loop for now. * loop for now.
*/ */
int bool
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_local_xlog_page(XLogReaderState *state)
int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
{ {
XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen;
char *cur_page = state->readBuf;
XLogRecPtr read_upto, XLogRecPtr read_upto,
loc; loc;
TimeLineID tli; TimeLineID tli;
...@@ -926,7 +927,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, ...@@ -926,7 +927,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
else if (targetPagePtr + reqLen > read_upto) else if (targetPagePtr + reqLen > read_upto)
{ {
/* not enough data there */ /* not enough data there */
return -1; XLogReaderSetInputData(state, -1);
return false;
} }
else else
{ {
...@@ -939,12 +941,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, ...@@ -939,12 +941,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be * as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete. * zero-padded up to the page boundary if it's incomplete.
*/ */
if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, if (!WALRead(state, wal_segment_open, wal_segment_close,
&errinfo)) cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */ /* number of valid bytes in the buffer */
return count; state->readPagePtr = targetPagePtr;
XLogReaderSetInputData(state, count);
return true;
} }
/* /*
......
...@@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon, TransactionId xmin_horizon,
bool need_full_snapshot, bool need_full_snapshot,
bool fast_forward, bool fast_forward,
XLogReaderRoutine *xl_routine, LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot; ctx->slot = slot;
ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
if (!ctx->reader) if (!ctx->reader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"))); errmsg("out of memory")));
ctx->page_read = page_read;
ctx->reorder = ReorderBufferAllocate(); ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder = ctx->snapshot_builder =
...@@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin, ...@@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin,
List *output_plugin_options, List *output_plugin_options,
bool need_full_snapshot, bool need_full_snapshot,
XLogRecPtr restart_lsn, XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine, LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin, ...@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false, need_full_snapshot, false,
xl_routine, prepare_write, do_write, page_read, cleanup_cb, prepare_write, do_write,
update_progress); update_progress);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
...@@ -476,7 +479,8 @@ LogicalDecodingContext * ...@@ -476,7 +479,8 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn, CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options, List *output_plugin_options,
bool fast_forward, bool fast_forward,
XLogReaderRoutine *xl_routine, LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ...@@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options, ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false, start_lsn, InvalidTransactionId, false,
fast_forward, xl_routine, prepare_write, fast_forward, page_read, cleanup_cb,
do_write, update_progress); prepare_write, do_write, update_progress);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context); old_context = MemoryContextSwitchTo(ctx->context);
...@@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) ...@@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL; char *err = NULL;
/* the read_page callback waits for new WAL */ /* the read_page callback waits for new WAL */
record = XLogReadRecord(ctx->reader, &err); while (XLogReadRecord(ctx->reader, &record, &err) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
if (err) if (err)
elog(ERROR, "%s", err); elog(ERROR, "%s", err);
if (!record) if (!record)
......
...@@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
options, options,
false, false,
XL_ROUTINE(.page_read = read_local_xlog_page, read_local_xlog_page,
.segment_open = wal_segment_open, wal_segment_close,
.segment_close = wal_segment_close),
LogicalOutputPrepareWrite, LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL); LogicalOutputWrite, NULL);
...@@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
XLogRecord *record; XLogRecord *record;
char *errm = NULL; char *errm = NULL;
record = XLogReadRecord(ctx->reader, &errm); while (XLogReadRecord(ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
if (errm) if (errm)
elog(ERROR, "%s", errm); elog(ERROR, "%s", errm);
......
...@@ -153,9 +153,8 @@ create_logical_replication_slot(char *name, char *plugin, ...@@ -153,9 +153,8 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL, ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */ false, /* just catalogs is OK */
restart_lsn, restart_lsn,
XL_ROUTINE(.page_read = read_local_xlog_page, read_local_xlog_page,
.segment_open = wal_segment_open, wal_segment_close,
.segment_close = wal_segment_close),
NULL, NULL, NULL); NULL, NULL, NULL);
/* /*
...@@ -512,9 +511,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ...@@ -512,9 +511,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL, NIL,
true, /* fast_forward */ true, /* fast_forward */
XL_ROUTINE(.page_read = read_local_xlog_page, read_local_xlog_page,
.segment_open = wal_segment_open, wal_segment_close,
.segment_close = wal_segment_close),
NULL, NULL, NULL); NULL, NULL, NULL);
/* /*
...@@ -536,7 +534,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ...@@ -536,7 +534,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode, * Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly. * but snapbuilder/slot statuses are updated properly.
*/ */
record = XLogReadRecord(ctx->reader, &errm); while (XLogReadRecord(ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
if (errm) if (errm)
elog(ERROR, "%s", errm); elog(ERROR, "%s", errm);
......
...@@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd)
/* create xlogreader for physical replication */ /* create xlogreader for physical replication */
xlogreader = xlogreader =
XLogReaderAllocate(wal_segment_size, NULL, XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
NULL);
if (!xlogreader) if (!xlogreader)
ereport(ERROR, ereport(ERROR,
...@@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd)
* which has to do a plain sleep/busy loop, because the walsender's latch gets * which has to do a plain sleep/busy loop, because the walsender's latch gets
* set every time WAL is flushed. * set every time WAL is flushed.
*/ */
static int static bool
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, logical_read_xlog_page(XLogReaderState *state)
XLogRecPtr targetRecPtr, char *cur_page)
{ {
XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen;
char *cur_page = state->readBuf;
XLogRecPtr flushptr; XLogRecPtr flushptr;
int count; int count;
WALReadError errinfo; WALReadError errinfo;
...@@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* fail if not (implies we are going to shut down) */ /* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen) if (flushptr < targetPagePtr + reqLen)
return -1; {
XLogReaderSetInputData(state, -1);
return false;
}
if (targetPagePtr + XLOG_BLCKSZ <= flushptr) if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */ count = XLOG_BLCKSZ; /* more than one block available */
...@@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */ count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */ /* now actually read the data, we know it's there */
if (!WALRead(state, if (!WALRead(state, WalSndSegmentOpen, wal_segment_close,
cur_page, cur_page,
targetPagePtr, targetPagePtr,
XLOG_BLCKSZ, XLOG_BLCKSZ,
...@@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
CheckXLogRemoved(segno, state->seg.ws_tli); CheckXLogRemoved(segno, state->seg.ws_tli);
return count; XLogReaderSetInputData(state, count);
return true;
} }
/* /*
...@@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr, InvalidXLogRecPtr,
XL_ROUTINE(.page_read = logical_read_xlog_page, logical_read_xlog_page,
.segment_open = WalSndSegmentOpen, wal_segment_close,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData, WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress); WalSndUpdateProgress);
...@@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) ...@@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/ */
logical_decoding_ctx = logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false, CreateDecodingContext(cmd->startpoint, cmd->options, false,
XL_ROUTINE(.page_read = logical_read_xlog_page, logical_read_xlog_page,
.segment_open = WalSndSegmentOpen, wal_segment_close,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData, WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress); WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader; xlogreader = logical_decoding_ctx->reader;
...@@ -2745,7 +2746,7 @@ XLogSendPhysical(void) ...@@ -2745,7 +2746,7 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes); enlargeStringInfo(&output_message, nbytes);
retry: retry:
if (!WALRead(xlogreader, if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close,
&output_message.data[output_message.len], &output_message.data[output_message.len],
startptr, startptr,
nbytes, nbytes,
...@@ -2843,7 +2844,12 @@ XLogSendLogical(void) ...@@ -2843,7 +2844,12 @@ XLogSendLogical(void)
*/ */
WalSndCaughtUp = false; WalSndCaughtUp = false;
record = XLogReadRecord(logical_decoding_ctx->reader, &errm); while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader))
break;
}
/* xlog record was invalid */ /* xlog record was invalid */
if (errm != NULL) if (errm != NULL)
......
...@@ -41,15 +41,9 @@ static int xlogreadfd = -1; ...@@ -41,15 +41,9 @@ static int xlogreadfd = -1;
static XLogSegNo xlogreadsegno = -1; static XLogSegNo xlogreadsegno = -1;
static char xlogfpath[MAXPGPATH]; static char xlogfpath[MAXPGPATH];
typedef struct XLogPageReadPrivate static bool SimpleXLogPageRead(XLogReaderState *xlogreader,
{ const char *datadir, int *tliIndex,
const char *restoreCommand; const char *restoreCommand);
int tliIndex;
} XLogPageReadPrivate;
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
/* /*
* Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
...@@ -66,20 +60,22 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, ...@@ -66,20 +60,22 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
XLogRecord *record; XLogRecord *record;
XLogReaderState *xlogreader; XLogReaderState *xlogreader;
char *errormsg; char *errormsg;
XLogPageReadPrivate private;
private.tliIndex = tliIndex; xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
XLogBeginRead(xlogreader, startpoint); XLogBeginRead(xlogreader, startpoint);
do do
{ {
record = XLogReadRecord(xlogreader, &errormsg); while (XLogReadRecord(xlogreader, &record, &errormsg) ==
XLREAD_NEED_DATA)
{
if (!SimpleXLogPageRead(xlogreader, datadir,
&tliIndex, restoreCommand))
break;
}
if (record == NULL) if (record == NULL)
{ {
...@@ -123,19 +119,19 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, ...@@ -123,19 +119,19 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
XLogRecord *record; XLogRecord *record;
XLogReaderState *xlogreader; XLogReaderState *xlogreader;
char *errormsg; char *errormsg;
XLogPageReadPrivate private;
XLogRecPtr endptr; XLogRecPtr endptr;
private.tliIndex = tliIndex; xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
XLogBeginRead(xlogreader, ptr); XLogBeginRead(xlogreader, ptr);
record = XLogReadRecord(xlogreader, &errormsg); while (XLogReadRecord(xlogreader, &record, &errormsg) ==
XLREAD_NEED_DATA)
{
if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex, restoreCommand))
break;
}
if (record == NULL) if (record == NULL)
{ {
if (errormsg) if (errormsg)
...@@ -170,7 +166,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, ...@@ -170,7 +166,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
XLogRecPtr searchptr; XLogRecPtr searchptr;
XLogReaderState *xlogreader; XLogReaderState *xlogreader;
char *errormsg; char *errormsg;
XLogPageReadPrivate private;
/* /*
* The given fork pointer points to the end of the last common record, * The given fork pointer points to the end of the last common record,
...@@ -186,11 +181,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, ...@@ -186,11 +181,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
forkptr += SizeOfXLogShortPHD; forkptr += SizeOfXLogShortPHD;
} }
private.tliIndex = tliIndex; xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL);
private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
...@@ -200,7 +191,13 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, ...@@ -200,7 +191,13 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
uint8 info; uint8 info;
XLogBeginRead(xlogreader, searchptr); XLogBeginRead(xlogreader, searchptr);
record = XLogReadRecord(xlogreader, &errormsg); while (XLogReadRecord(xlogreader, &record, &errormsg) ==
XLREAD_NEED_DATA)
{
if (!SimpleXLogPageRead(xlogreader, datadir,
&tliIndex, restoreCommand))
break;
}
if (record == NULL) if (record == NULL)
{ {
...@@ -246,16 +243,19 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, ...@@ -246,16 +243,19 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
} }
/* XLogReader callback function, to read a WAL page */ /* XLogReader callback function, to read a WAL page */
static int static bool
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf) int *tliIndex, const char *restoreCommand)
{ {
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; XLogRecPtr targetPagePtr = xlogreader->readPagePtr;
char *readBuf = xlogreader->readBuf;
uint32 targetPageOff; uint32 targetPageOff;
XLogRecPtr targetSegEnd; XLogRecPtr targetSegEnd;
XLogSegNo targetSegNo; XLogSegNo targetSegNo;
int r; int r;
Assert(xlogreader->reqLen <= XLOG_BLCKSZ);
XLByteToSeg(targetPagePtr, targetSegNo, WalSegSz); XLByteToSeg(targetPagePtr, targetSegNo, WalSegSz);
XLogSegNoOffsetToRecPtr(targetSegNo + 1, 0, WalSegSz, targetSegEnd); XLogSegNoOffsetToRecPtr(targetSegNo + 1, 0, WalSegSz, targetSegEnd);
targetPageOff = XLogSegmentOffset(targetPagePtr, WalSegSz); targetPageOff = XLogSegmentOffset(targetPagePtr, WalSegSz);
...@@ -283,14 +283,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, ...@@ -283,14 +283,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
* be done both forward and backward, consider also switching timeline * be done both forward and backward, consider also switching timeline
* accordingly. * accordingly.
*/ */
while (private->tliIndex < targetNentries - 1 && while (*tliIndex < targetNentries - 1 &&
targetHistory[private->tliIndex].end < targetSegEnd) targetHistory[*tliIndex].end < targetSegEnd)
private->tliIndex++; (*tliIndex)++;
while (private->tliIndex > 0 && while (*tliIndex > 0 &&
targetHistory[private->tliIndex].begin >= targetSegEnd) targetHistory[*tliIndex].begin >= targetSegEnd)
private->tliIndex--; (*tliIndex)--;
XLogFileName(xlogfname, targetHistory[private->tliIndex].tli, XLogFileName(xlogfname, targetHistory[*tliIndex].tli,
xlogreadsegno, WalSegSz); xlogreadsegno, WalSegSz);
snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
...@@ -303,10 +303,11 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, ...@@ -303,10 +303,11 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
/* /*
* If we have no restore_command to execute, then exit. * If we have no restore_command to execute, then exit.
*/ */
if (private->restoreCommand == NULL) if (restoreCommand == NULL)
{ {
pg_log_error("could not open file \"%s\": %m", xlogfpath); pg_log_error("could not open file \"%s\": %m", xlogfpath);
return -1; XLogReaderSetInputData(xlogreader, -1);
return false;
} }
/* /*
...@@ -316,10 +317,13 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, ...@@ -316,10 +317,13 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
xlogreadfd = RestoreArchivedFile(xlogreader->segcxt.ws_dir, xlogreadfd = RestoreArchivedFile(xlogreader->segcxt.ws_dir,
xlogfname, xlogfname,
WalSegSz, WalSegSz,
private->restoreCommand); restoreCommand);
if (xlogreadfd < 0) if (xlogreadfd < 0)
return -1; {
XLogReaderSetInputData(xlogreader, -1);
return false;
}
else else
pg_log_debug("using file \"%s\" restored from archive", pg_log_debug("using file \"%s\" restored from archive",
xlogfpath); xlogfpath);
...@@ -335,7 +339,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, ...@@ -335,7 +339,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
{ {
pg_log_error("could not seek in file \"%s\": %m", xlogfpath); pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
return -1; XLogReaderSetInputData(xlogreader, -1);
return false;
} }
...@@ -348,13 +353,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, ...@@ -348,13 +353,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
pg_log_error("could not read file \"%s\": read %d of %zu", pg_log_error("could not read file \"%s\": read %d of %zu",
xlogfpath, r, (Size) XLOG_BLCKSZ); xlogfpath, r, (Size) XLOG_BLCKSZ);
return -1; XLogReaderSetInputData(xlogreader, -1);
return false;
} }
Assert(targetSegNo == xlogreadsegno); Assert(targetSegNo == xlogreadsegno);
xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli;
return XLOG_BLCKSZ; XLogReaderSetInputData(xlogreader, XLOG_BLCKSZ);
return true;
} }
/* /*
......
...@@ -29,14 +29,6 @@ static const char *progname; ...@@ -29,14 +29,6 @@ static const char *progname;
static int WalSegSz; static int WalSegSz;
typedef struct XLogDumpPrivate
{
TimeLineID timeline;
XLogRecPtr startptr;
XLogRecPtr endptr;
bool endptr_reached;
} XLogDumpPrivate;
typedef struct XLogDumpConfig typedef struct XLogDumpConfig
{ {
/* display options */ /* display options */
...@@ -330,30 +322,41 @@ WALDumpCloseSegment(XLogReaderState *state) ...@@ -330,30 +322,41 @@ WALDumpCloseSegment(XLogReaderState *state)
state->seg.ws_file = -1; state->seg.ws_file = -1;
} }
/* pg_waldump's XLogReaderRoutine->page_read callback */ /*
static int * pg_waldump's WAL page rader
WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, *
XLogRecPtr targetPtr, char *readBuff) * timeline and startptr specifies the LSN, and reads up to endptr.
*/
static bool
WALDumpReadPage(XLogReaderState *state, TimeLineID timeline,
XLogRecPtr startptr, XLogRecPtr endptr)
{ {
XLogDumpPrivate *private = state->private_data; XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen;
char *readBuff = state->readBuf;
int count = XLOG_BLCKSZ; int count = XLOG_BLCKSZ;
WALReadError errinfo; WALReadError errinfo;
if (private->endptr != InvalidXLogRecPtr) /* determine the number of bytes to read on the page */
if (endptr != InvalidXLogRecPtr)
{ {
if (targetPagePtr + XLOG_BLCKSZ <= private->endptr) if (targetPagePtr + XLOG_BLCKSZ <= endptr)
count = XLOG_BLCKSZ; count = XLOG_BLCKSZ;
else if (targetPagePtr + reqLen <= private->endptr) else if (targetPagePtr + reqLen <= endptr)
count = private->endptr - targetPagePtr; count = endptr - targetPagePtr;
else else
{ {
private->endptr_reached = true; /* Notify xlogreader that we didn't read at all */
return -1; XLogReaderSetInputData(state, -1);
return false;
} }
} }
if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, /* We should read more than requested by xlogreader */
&errinfo)) Assert(count >= state->readLen);
if (!WALRead(state, WALDumpOpenSegment, WALDumpCloseSegment,
readBuff, targetPagePtr, count, timeline, &errinfo))
{ {
WALOpenSegment *seg = &errinfo.wre_seg; WALOpenSegment *seg = &errinfo.wre_seg;
char fname[MAXPGPATH]; char fname[MAXPGPATH];
...@@ -373,7 +376,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, ...@@ -373,7 +376,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
(Size) errinfo.wre_req); (Size) errinfo.wre_req);
} }
return count; /* Notify xlogreader of how many bytes we have read */
XLogReaderSetInputData(state, count);
return true;
} }
/* /*
...@@ -754,7 +759,10 @@ main(int argc, char **argv) ...@@ -754,7 +759,10 @@ main(int argc, char **argv)
uint32 xlogid; uint32 xlogid;
uint32 xrecoff; uint32 xrecoff;
XLogReaderState *xlogreader_state; XLogReaderState *xlogreader_state;
XLogDumpPrivate private; XLogFindNextRecordState *findnext_state;
TimeLineID timeline;
XLogRecPtr startptr;
XLogRecPtr endptr;
XLogDumpConfig config; XLogDumpConfig config;
XLogDumpStats stats; XLogDumpStats stats;
XLogRecord *record; XLogRecord *record;
...@@ -800,14 +808,12 @@ main(int argc, char **argv) ...@@ -800,14 +808,12 @@ main(int argc, char **argv)
} }
} }
memset(&private, 0, sizeof(XLogDumpPrivate));
memset(&config, 0, sizeof(XLogDumpConfig)); memset(&config, 0, sizeof(XLogDumpConfig));
memset(&stats, 0, sizeof(XLogDumpStats)); memset(&stats, 0, sizeof(XLogDumpStats));
private.timeline = 1; timeline = 1;
private.startptr = InvalidXLogRecPtr; startptr = InvalidXLogRecPtr;
private.endptr = InvalidXLogRecPtr; endptr = InvalidXLogRecPtr;
private.endptr_reached = false;
config.quiet = false; config.quiet = false;
config.bkp_details = false; config.bkp_details = false;
...@@ -841,7 +847,7 @@ main(int argc, char **argv) ...@@ -841,7 +847,7 @@ main(int argc, char **argv)
optarg); optarg);
goto bad_argument; goto bad_argument;
} }
private.endptr = (uint64) xlogid << 32 | xrecoff; endptr = (uint64) xlogid << 32 | xrecoff;
break; break;
case 'f': case 'f':
config.follow = true; config.follow = true;
...@@ -894,10 +900,10 @@ main(int argc, char **argv) ...@@ -894,10 +900,10 @@ main(int argc, char **argv)
goto bad_argument; goto bad_argument;
} }
else else
private.startptr = (uint64) xlogid << 32 | xrecoff; startptr = (uint64) xlogid << 32 | xrecoff;
break; break;
case 't': case 't':
if (sscanf(optarg, "%d", &private.timeline) != 1) if (sscanf(optarg, "%d", &timeline) != 1)
{ {
pg_log_error("could not parse timeline \"%s\"", optarg); pg_log_error("could not parse timeline \"%s\"", optarg);
goto bad_argument; goto bad_argument;
...@@ -974,21 +980,21 @@ main(int argc, char **argv) ...@@ -974,21 +980,21 @@ main(int argc, char **argv)
close(fd); close(fd);
/* parse position from file */ /* parse position from file */
XLogFromFileName(fname, &private.timeline, &segno, WalSegSz); XLogFromFileName(fname, &timeline, &segno, WalSegSz);
if (XLogRecPtrIsInvalid(private.startptr)) if (XLogRecPtrIsInvalid(startptr))
XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, private.startptr); XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, startptr);
else if (!XLByteInSeg(private.startptr, segno, WalSegSz)) else if (!XLByteInSeg(startptr, segno, WalSegSz))
{ {
pg_log_error("start WAL location %X/%X is not inside file \"%s\"", pg_log_error("start WAL location %X/%X is not inside file \"%s\"",
LSN_FORMAT_ARGS(private.startptr), LSN_FORMAT_ARGS(startptr),
fname); fname);
goto bad_argument; goto bad_argument;
} }
/* no second file specified, set end position */ /* no second file specified, set end position */
if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(private.endptr)) if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(endptr))
XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, private.endptr); XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, endptr);
/* parse ENDSEG if passed */ /* parse ENDSEG if passed */
if (optind + 1 < argc) if (optind + 1 < argc)
...@@ -1004,26 +1010,26 @@ main(int argc, char **argv) ...@@ -1004,26 +1010,26 @@ main(int argc, char **argv)
close(fd); close(fd);
/* parse position from file */ /* parse position from file */
XLogFromFileName(fname, &private.timeline, &endsegno, WalSegSz); XLogFromFileName(fname, &timeline, &endsegno, WalSegSz);
if (endsegno < segno) if (endsegno < segno)
fatal_error("ENDSEG %s is before STARTSEG %s", fatal_error("ENDSEG %s is before STARTSEG %s",
argv[optind + 1], argv[optind]); argv[optind + 1], argv[optind]);
if (XLogRecPtrIsInvalid(private.endptr)) if (XLogRecPtrIsInvalid(endptr))
XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz, XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz,
private.endptr); endptr);
/* set segno to endsegno for check of --end */ /* set segno to endsegno for check of --end */
segno = endsegno; segno = endsegno;
} }
if (!XLByteInSeg(private.endptr, segno, WalSegSz) && if (!XLByteInSeg(endptr, segno, WalSegSz) &&
private.endptr != (segno + 1) * WalSegSz) endptr != (segno + 1) * WalSegSz)
{ {
pg_log_error("end WAL location %X/%X is not inside file \"%s\"", pg_log_error("end WAL location %X/%X is not inside file \"%s\"",
LSN_FORMAT_ARGS(private.endptr), LSN_FORMAT_ARGS(endptr),
argv[argc - 1]); argv[argc - 1]);
goto bad_argument; goto bad_argument;
} }
...@@ -1032,7 +1038,7 @@ main(int argc, char **argv) ...@@ -1032,7 +1038,7 @@ main(int argc, char **argv)
waldir = identify_target_directory(waldir, NULL); waldir = identify_target_directory(waldir, NULL);
/* we don't know what to print */ /* we don't know what to print */
if (XLogRecPtrIsInvalid(private.startptr)) if (XLogRecPtrIsInvalid(startptr))
{ {
pg_log_error("no start WAL location given"); pg_log_error("no start WAL location given");
goto bad_argument; goto bad_argument;
...@@ -1042,42 +1048,56 @@ main(int argc, char **argv) ...@@ -1042,42 +1048,56 @@ main(int argc, char **argv)
/* we have everything we need, start reading */ /* we have everything we need, start reading */
xlogreader_state = xlogreader_state =
XLogReaderAllocate(WalSegSz, waldir, XLogReaderAllocate(WalSegSz, waldir, WALDumpCloseSegment);
XL_ROUTINE(.page_read = WALDumpReadPage,
.segment_open = WALDumpOpenSegment,
.segment_close = WALDumpCloseSegment),
&private);
if (!xlogreader_state) if (!xlogreader_state)
fatal_error("out of memory"); fatal_error("out of memory");
findnext_state =
InitXLogFindNextRecord(xlogreader_state, startptr);
if (!findnext_state)
fatal_error("out of memory");
/* first find a valid recptr to start from */ /* first find a valid recptr to start from */
first_record = XLogFindNextRecord(xlogreader_state, private.startptr); while (XLogFindNextRecord(findnext_state))
{
if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr))
break;
}
first_record = findnext_state->currRecPtr;
if (first_record == InvalidXLogRecPtr) if (first_record == InvalidXLogRecPtr)
fatal_error("could not find a valid record after %X/%X", fatal_error("could not find a valid record after %X/%X",
LSN_FORMAT_ARGS(private.startptr)); LSN_FORMAT_ARGS(startptr));
/* /*
* Display a message that we're skipping data if `from` wasn't a pointer * Display a message that we're skipping data if `from` wasn't a pointer
* to the start of a record and also wasn't a pointer to the beginning of * to the start of a record and also wasn't a pointer to the beginning of
* a segment (e.g. we were used in file mode). * a segment (e.g. we were used in file mode).
*/ */
if (first_record != private.startptr && if (first_record != startptr &&
XLogSegmentOffset(private.startptr, WalSegSz) != 0) XLogSegmentOffset(startptr, WalSegSz) != 0)
printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n", printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n",
"first record is after %X/%X, at %X/%X, skipping over %u bytes\n", "first record is after %X/%X, at %X/%X, skipping over %u bytes\n",
(first_record - private.startptr)), (first_record - startptr)),
LSN_FORMAT_ARGS(private.startptr), LSN_FORMAT_ARGS(startptr),
LSN_FORMAT_ARGS(first_record), LSN_FORMAT_ARGS(first_record),
(uint32) (first_record - private.startptr)); (uint32) (first_record - startptr));
for (;;) for (;;)
{ {
/* try to read the next record */ /* try to read the next record */
record = XLogReadRecord(xlogreader_state, &errormsg); while (XLogReadRecord(xlogreader_state, &record, &errormsg) ==
XLREAD_NEED_DATA)
{
if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr))
break;
}
if (!record) if (!record)
{ {
if (!config.follow || private.endptr_reached) if (!config.follow)
break; break;
else else
{ {
......
...@@ -56,65 +56,17 @@ typedef struct WALSegmentContext ...@@ -56,65 +56,17 @@ typedef struct WALSegmentContext
} WALSegmentContext; } WALSegmentContext;
typedef struct XLogReaderState XLogReaderState; typedef struct XLogReaderState XLogReaderState;
typedef struct XLogFindNextRecordState XLogFindNextRecordState;
/* Function type definitions for various xlogreader interactions */ /* Function type definition for the segment cleanup callback */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, typedef void (*WALSegmentCleanupCB) (XLogReaderState *xlogreader);
XLogRecPtr targetPagePtr,
int reqLen, /* Function type definition for the open/close callbacks for WALRead() */
XLogRecPtr targetRecPtr,
char *readBuf);
typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
XLogSegNo nextSegNo, XLogSegNo nextSegNo,
TimeLineID *tli_p); TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine
{
/*
* Data input callback
*
* This callback shall read at least reqLen valid bytes of the xlog page
* starting at targetPagePtr, and store them in readBuf. The callback
* shall return the number of bytes read (never more than XLOG_BLCKSZ), or
* -1 on failure. The callback shall sleep, if necessary, to wait for the
* requested bytes to become available. The callback will not be invoked
* again for the same page unless more than the returned number of bytes
* are needed.
*
* targetRecPtr is the position of the WAL record we're reading. Usually
* it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
* to read and verify the page or segment header, before it reads the
* actual WAL record it's interested in. In that case, targetRecPtr can
* be used to determine which timeline to read the page from.
*
* The callback shall set ->seg.ws_tli to the TLI of the file the page was
* read from.
*/
XLogPageReadCB page_read;
/*
* Callback to open the specified WAL segment for reading. ->seg.ws_file
* shall be set to the file descriptor of the opened segment. In case of
* failure, an error shall be raised by the callback and it shall not
* return.
*
* "nextSegNo" is the number of the segment to be opened.
*
* "tli_p" is an input/output argument. WALRead() uses it to pass the
* timeline in which the new segment should be found, but the callback can
* use it to return the TLI that it actually opened.
*/
WALSegmentOpenCB segment_open;
/*
* WAL segment close callback. ->seg.ws_file shall be set to a negative
* number.
*/
WALSegmentCloseCB segment_close;
} XLogReaderRoutine;
#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct typedef struct
{ {
/* Is this block ref in use? */ /* Is this block ref in use? */
...@@ -144,12 +96,36 @@ typedef struct ...@@ -144,12 +96,36 @@ typedef struct
uint16 data_bufsz; uint16 data_bufsz;
} DecodedBkpBlock; } DecodedBkpBlock;
/* Return code from XLogReadRecord */
typedef enum XLogReadRecordResult
{
XLREAD_SUCCESS, /* record is successfully read */
XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */
XLREAD_FAIL /* failed during reading a record */
} XLogReadRecordResult;
/*
* internal state of XLogReadRecord
*
* XLogReadState runs a state machine while reading a record. Theses states
* are not seen outside the function. Each state may repeat several times
* exiting requesting caller for new data. See the comment of XLogReadRecrod
* for details.
*/
typedef enum XLogReadRecordState
{
XLREAD_NEXT_RECORD,
XLREAD_TOT_LEN,
XLREAD_FIRST_FRAGMENT,
XLREAD_CONTINUATION
} XLogReadRecordState;
struct XLogReaderState struct XLogReaderState
{ {
/* /*
* Operational callbacks * Operational callbacks
*/ */
XLogReaderRoutine routine; WALSegmentCleanupCB cleanup_cb;
/* ---------------------------------------- /* ----------------------------------------
* Public parameters * Public parameters
...@@ -162,19 +138,31 @@ struct XLogReaderState ...@@ -162,19 +138,31 @@ struct XLogReaderState
*/ */
uint64 system_identifier; uint64 system_identifier;
/*
* Opaque data for callbacks to use. Not used by XLogReader.
*/
void *private_data;
/* /*
* Start and end point of last record read. EndRecPtr is also used as the * Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid. * starting position and ReadRecPtr to invalid.
*/ */
XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr ReadRecPtr; /* start of last record read or being read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */
XLogRecPtr PrevRecPtr; /* start of previous record read */
/* ----------------------------------------
* Communication with page reader
* readBuf is XLOG_BLCKSZ bytes, valid up to at least reqLen bytes.
* ----------------------------------------
*/
/* variables the clients of xlogreader can examine */
XLogRecPtr readPagePtr; /* page pointer to read */
int32 reqLen; /* bytes requested to the caller */
char *readBuf; /* buffer to store data */
bool page_verified; /* is the page header on the buffer verified? */
bool record_verified;/* is the current record header verified? */
/* variables set by the client of xlogreader */
int32 readLen; /* actual bytes copied into readBuf by client,
* which should be >= reqLen. Client should
* use XLogReaderSetInputData() to set. */
/* ---------------------------------------- /* ----------------------------------------
* Decoded representation of current record * Decoded representation of current record
...@@ -203,13 +191,6 @@ struct XLogReaderState ...@@ -203,13 +191,6 @@ struct XLogReaderState
* ---------------------------------------- * ----------------------------------------
*/ */
/*
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
* readLen bytes)
*/
char *readBuf;
uint32 readLen;
/* last read XLOG position for data currently in readBuf */ /* last read XLOG position for data currently in readBuf */
WALSegmentContext segcxt; WALSegmentContext segcxt;
WALOpenSegment seg; WALOpenSegment seg;
...@@ -222,8 +203,6 @@ struct XLogReaderState ...@@ -222,8 +203,6 @@ struct XLogReaderState
XLogRecPtr latestPagePtr; XLogRecPtr latestPagePtr;
TimeLineID latestPageTLI; TimeLineID latestPageTLI;
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
/* timeline to read it from, 0 if a lookup is required */ /* timeline to read it from, 0 if a lookup is required */
TimeLineID currTLI; TimeLineID currTLI;
...@@ -250,16 +229,37 @@ struct XLogReaderState ...@@ -250,16 +229,37 @@ struct XLogReaderState
char *readRecordBuf; char *readRecordBuf;
uint32 readRecordBufSize; uint32 readRecordBufSize;
/*
* XLogReadRecord() state
*/
XLogReadRecordState readRecordState; /* state machine state */
int recordGotLen; /* amount of current record that has already
* been read */
int recordRemainLen; /* length of current record that remains */
XLogRecPtr recordContRecPtr; /* where the current record continues */
/* Buffer to hold error message */ /* Buffer to hold error message */
char *errormsg_buf; char *errormsg_buf;
}; };
struct XLogFindNextRecordState
{
XLogReaderState *reader_state;
XLogRecPtr targetRecPtr;
XLogRecPtr currRecPtr;
};
/* Report that data is available for decoding. */
static inline void
XLogReaderSetInputData(XLogReaderState *state, int32 len)
{
state->readLen = len;
}
/* Get a new XLogReader */ /* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir, const char *waldir,
XLogReaderRoutine *routine, WALSegmentCleanupCB cleanup_cb);
void *private_data);
extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */ /* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state); extern void XLogReaderFree(XLogReaderState *state);
...@@ -267,11 +267,13 @@ extern void XLogReaderFree(XLogReaderState *state); ...@@ -267,11 +267,13 @@ extern void XLogReaderFree(XLogReaderState *state);
/* Position the XLogReader to given record */ /* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND #ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr);
extern bool XLogFindNextRecord(XLogFindNextRecordState *state);
#endif /* FRONTEND */ #endif /* FRONTEND */
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */ /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
XLogRecord **record,
char **errormsg); char **errormsg);
/* Validate a page */ /* Validate a page */
...@@ -292,6 +294,7 @@ typedef struct WALReadError ...@@ -292,6 +294,7 @@ typedef struct WALReadError
} WALReadError; } WALReadError;
extern bool WALRead(XLogReaderState *state, extern bool WALRead(XLogReaderState *state,
WALSegmentOpenCB segopenfn, WALSegmentCloseCB sgclosefn,
char *buf, XLogRecPtr startptr, Size count, char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALReadError *errinfo); TimeLineID tli, WALReadError *errinfo);
......
...@@ -47,9 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, ...@@ -47,9 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel); extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state, extern bool read_local_xlog_page(XLogReaderState *state);
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
extern void wal_segment_open(XLogReaderState *state, extern void wal_segment_open(XLogReaderState *state,
XLogSegNo nextSegNo, XLogSegNo nextSegNo,
TimeLineID *tli_p); TimeLineID *tli_p);
......
...@@ -29,6 +29,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC ...@@ -29,6 +29,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
TransactionId xid TransactionId xid
); );
typedef struct LogicalDecodingContext LogicalDecodingContext;
typedef bool (*LogicalDecodingXLogPageReadCB)(XLogReaderState *ctx);
typedef struct LogicalDecodingContext typedef struct LogicalDecodingContext
{ {
/* memory context this is all allocated in */ /* memory context this is all allocated in */
...@@ -39,6 +43,7 @@ typedef struct LogicalDecodingContext ...@@ -39,6 +43,7 @@ typedef struct LogicalDecodingContext
/* infrastructure pieces for decoding */ /* infrastructure pieces for decoding */
XLogReaderState *reader; XLogReaderState *reader;
LogicalDecodingXLogPageReadCB page_read;
struct ReorderBuffer *reorder; struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder; struct SnapBuild *snapshot_builder;
...@@ -105,14 +110,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, ...@@ -105,14 +110,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
List *output_plugin_options, List *output_plugin_options,
bool need_full_snapshot, bool need_full_snapshot,
XLogRecPtr restart_lsn, XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine, LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress); LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options, List *output_plugin_options,
bool fast_forward, bool fast_forward,
XLogReaderRoutine *xl_routine, LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress); LogicalOutputPluginWriterUpdateProgress update_progress);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment