Commit b060dbe0 authored by Alvaro Herrera's avatar Alvaro Herrera

Rework XLogReader callback system

Code review for 0dc8ead4, prompted by a bug closed by 91c40548.

XLogReader's system for opening and closing segments had gotten too
complicated, with callbacks being passed at both the XLogReaderAllocate
level (read_page) as well as at the WALRead level (segment_open).  This
was confusing and hard to follow, so restructure things so that these
callbacks are passed together at XLogReaderAllocate time, and add
another callback to the set (segment_close) to make it a coherent whole.
Also, ensure XLogReaderState is an argument to all the callbacks, so
that they can grab at the ->private data if necessary.

Document the whole arrangement more clearly.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: default avatarKyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
parent 871696ba
...@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) ...@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg; char *errormsg;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL, xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
&read_local_xlog_page, NULL); XL_ROUTINE(.page_read = &read_local_xlog_page,
.segment_open = &wal_segment_open,
.segment_close = &wal_segment_close),
NULL);
if (!xlogreader) if (!xlogreader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
......
...@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata, ...@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader) if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
NULL, NULL); XL_ROUTINE(), NULL);
if (!debug_reader) if (!debug_reader)
{ {
...@@ -6478,8 +6478,12 @@ StartupXLOG(void) ...@@ -6478,8 +6478,12 @@ StartupXLOG(void)
/* Set up XLOG reader facility */ /* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate)); MemSet(&private, 0, sizeof(XLogPageReadPrivate));
xlogreader = XLogReaderAllocate(wal_segment_size, NULL, xlogreader =
&XLogPageRead, &private); XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &XLogPageRead,
.segment_open = NULL,
.segment_close = wal_segment_close),
&private);
if (!xlogreader) if (!xlogreader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
......
...@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) ...@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/ */
XLogReaderState * XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderAllocate(int wal_segment_size, const char *waldir,
XLogPageReadCB pagereadfunc, void *private_data) XLogReaderRoutine *routine, void *private_data)
{ {
XLogReaderState *state; XLogReaderState *state;
...@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
if (!state) if (!state)
return NULL; return NULL;
/* initialize caller-provided support functions */
state->routine = *routine;
state->max_block_id = -1; state->max_block_id = -1;
/* /*
...@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, ...@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir); waldir);
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */ /* system_identifier initialized to zeroes above */
state->private_data = private_data; state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
...@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) ...@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
int block_id; int block_id;
if (state->seg.ws_file != -1) if (state->seg.ws_file != -1)
close(state->seg.ws_file); state->routine.segment_close(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{ {
...@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) ...@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord(). * to XLogReadRecord().
* *
* If the read_page callback fails to read the requested data, NULL is * If the page_read callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg * returned. The callback is expected to have reported the error; errormsg
* is set to NULL. * is set to NULL.
* *
...@@ -559,10 +561,10 @@ err: ...@@ -559,10 +561,10 @@ err:
/* /*
* Read a single xlog page including at least [pageptr, reqLen] of valid data * Read a single xlog page including at least [pageptr, reqLen] of valid data
* via the read_page() callback. * via the page_read() callback.
* *
* Returns -1 if the required page cannot be read for some reason; errormsg_buf * Returns -1 if the required page cannot be read for some reason; errormsg_buf
* is set in that case (unless the error occurs in the read_page callback). * is set in that case (unless the error occurs in the page_read callback).
* *
* We fetch the page from a reader-local cache if we know we have the required * We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data. * data and if there hasn't been any error since caching the data.
...@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) ...@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* Data is not in our buffer. * Data is not in our buffer.
* *
* Every time we actually read the segment, even if we looked at parts of * Every time we actually read the segment, even if we looked at parts of
* it before, we need to do verification as the read_page callback might * it before, we need to do verification as the page_read callback might
* now be rereading data from a different source. * now be rereading data from a different source.
* *
* Whenever switching to a new WAL segment, we read the first page of the * Whenever switching to a new WAL segment, we read the first page of the
...@@ -601,7 +603,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) ...@@ -601,7 +603,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{ {
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr, state->currRecPtr,
state->readBuf); state->readBuf);
if (readLen < 0) if (readLen < 0)
...@@ -619,7 +621,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) ...@@ -619,7 +621,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header * First, read the requested data length, but at least a short page header
* so that we can validate it. * so that we can validate it.
*/ */
readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr, state->currRecPtr,
state->readBuf); state->readBuf);
if (readLen < 0) if (readLen < 0)
...@@ -638,7 +640,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) ...@@ -638,7 +640,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */ /* still not enough */
if (readLen < XLogPageHeaderSize(hdr)) if (readLen < XLogPageHeaderSize(hdr))
{ {
readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr, state->currRecPtr,
state->readBuf); state->readBuf);
if (readLen < 0) if (readLen < 0)
...@@ -1041,11 +1043,14 @@ err: ...@@ -1041,11 +1043,14 @@ err:
#endif /* FRONTEND */ #endif /* FRONTEND */
/* /*
* Helper function to ease writing of XLogRoutine->page_read callbacks.
* If this function is used, caller must supply an open_segment callback in
* 'state', as that is used here.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'. * fetched from timeline 'tli'.
* *
* 'seg/segcxt' identify the last segment used. 'openSegment' is a callback * 'seg/segcxt' identify the last segment used.
* to open the next segment, if necessary.
* *
* Returns true if succeeded, false if an error occurs, in which case * Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details. * 'errinfo' receives error details.
...@@ -1054,9 +1059,10 @@ err: ...@@ -1054,9 +1059,10 @@ err:
* WAL buffers when possible. * WAL buffers when possible.
*/ */
bool bool
WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt, WALOpenSegment *seg, WALSegmentContext *segcxt,
WALSegmentOpen openSegment, WALReadError *errinfo) WALReadError *errinfo)
{ {
char *p; char *p;
XLogRecPtr recptr; XLogRecPtr recptr;
...@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, ...@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
XLogSegNo nextSegNo; XLogSegNo nextSegNo;
if (seg->ws_file >= 0) if (seg->ws_file >= 0)
close(seg->ws_file); state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
seg->ws_file = openSegment(nextSegNo, segcxt, &tli); seg->ws_file = state->routine.segment_open(state, nextSegNo,
segcxt, &tli);
/* Update the current segment info. */ /* Update the current segment info. */
seg->ws_tli = tli; seg->ws_tli = tli;
......
...@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa ...@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
} }
} }
/* openSegment callback for WALRead */ /* XLogReaderRoutine->segment_open callback for local pg_wal files */
static int int
wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p) WALSegmentContext *segcxt, TimeLineID *tli_p)
{ {
TimeLineID tli = *tli_p; TimeLineID tli = *tli_p;
char path[MAXPGPATH]; char path[MAXPGPATH];
...@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, ...@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
return -1; /* keep compiler quiet */ return -1; /* keep compiler quiet */
} }
/* stock XLogReaderRoutine->segment_close callback */
void
wal_segment_close(XLogReaderState *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/* /*
* read_page callback for reading local xlog files * XLogReaderRoutine->page_read callback for reading local xlog files
* *
* Public because it would likely be very helpful for someone writing another * Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker. * output method outside walsender, e.g. in a bgworker.
...@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, ...@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be * as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete. * zero-padded up to the page boundary if it's incomplete.
*/ */
if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
&state->segcxt, wal_segment_open, &errinfo)) &state->seg, &state->segcxt,
&errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */ /* number of valid bytes in the buffer */
......
...@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon, TransactionId xmin_horizon,
bool need_full_snapshot, bool need_full_snapshot,
bool fast_forward, bool fast_forward,
XLogPageReadCB read_page, XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot; ctx->slot = slot;
ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader) if (!ctx->reader)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
...@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
* Otherwise, we set for decoding to start from the given LSN without * Otherwise, we set for decoding to start from the given LSN without
* marking WAL reserved beforehand. In that scenario, it's up to the * marking WAL reserved beforehand. In that scenario, it's up to the
* caller to guarantee that WAL remains available. * caller to guarantee that WAL remains available.
* read_page, prepare_write, do_write, update_progress -- * xl_routine -- XLogReaderRoutine for underlying XLogReader
* prepare_write, do_write, update_progress --
* callbacks that perform the use-case dependent, actual, work. * callbacks that perform the use-case dependent, actual, work.
* *
* Needs to be called while in a memory context that's at least as long lived * Needs to be called while in a memory context that's at least as long lived
...@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin, ...@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options, List *output_plugin_options,
bool need_full_snapshot, bool need_full_snapshot,
XLogRecPtr restart_lsn, XLogRecPtr restart_lsn,
XLogPageReadCB read_page, XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ...@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false, need_full_snapshot, false,
read_page, prepare_write, do_write, xl_routine, prepare_write, do_write,
update_progress); update_progress);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
...@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin, ...@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
* fast_forward * fast_forward
* bypass the generation of logical changes. * bypass the generation of logical changes.
* *
* read_page, prepare_write, do_write, update_progress * xl_routine
* XLogReaderRoutine used by underlying xlogreader
*
* prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent, * callbacks that have to be filled to perform the use-case dependent,
* actual work. * actual work.
* *
...@@ -372,7 +376,7 @@ LogicalDecodingContext * ...@@ -372,7 +376,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn, CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options, List *output_plugin_options,
bool fast_forward, bool fast_forward,
XLogPageReadCB read_page, XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress) LogicalOutputPluginWriterUpdateProgress update_progress)
...@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ...@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options, ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false, start_lsn, InvalidTransactionId, false,
fast_forward, read_page, prepare_write, fast_forward, xl_routine, prepare_write,
do_write, update_progress); do_write, update_progress);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
......
...@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
options, options,
false, false,
read_local_xlog_page, XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
LogicalOutputPrepareWrite, LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL); LogicalOutputWrite, NULL);
......
...@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin, ...@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL, ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */ false, /* just catalogs is OK */
restart_lsn, restart_lsn,
read_local_xlog_page, NULL, NULL, XL_ROUTINE(.page_read = read_local_xlog_page,
NULL); .segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL);
/* /*
* If caller needs us to determine the decoding start point, do so now. * If caller needs us to determine the decoding start point, do so now.
...@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ...@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL, NIL,
true, /* fast_forward */ true, /* fast_forward */
read_local_xlog_page, XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL); NULL, NULL, NULL);
/* /*
......
...@@ -54,8 +54,8 @@ ...@@ -54,8 +54,8 @@
#include "access/transam.h" #include "access/transam.h"
#include "access/xact.h" #include "access/xact.h"
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h" #include "access/xlogutils.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
...@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); ...@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p); WALSegmentContext *segcxt, TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx); static void UpdateSpillStats(LogicalDecodingContext *ctx);
...@@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd)
} }
/* /*
* read_page callback for logical decoding contexts, as a walsender process. * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
* walsender process.
* *
* Inside the walsender we can do better than read_local_xlog_page, * Inside the walsender we can do better than read_local_xlog_page,
* which has to do a plain sleep/busy loop, because the walsender's latch gets * which has to do a plain sleep/busy loop, because the walsender's latch gets
...@@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */ count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */ /* now actually read the data, we know it's there */
if (!WALRead(cur_page, if (!WALRead(state,
cur_page,
targetPagePtr, targetPagePtr,
XLOG_BLCKSZ, XLOG_BLCKSZ,
sendSeg->ws_tli, /* Pass the current TLI because only sendSeg->ws_tli, /* Pass the current TLI because only
...@@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* TLI is needed. */ * TLI is needed. */
sendSeg, sendSeg,
sendCxt, sendCxt,
WalSndSegmentOpen,
&errinfo)) &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
...@@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr, InvalidXLogRecPtr,
logical_read_xlog_page, XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData, WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress); WalSndUpdateProgress);
...@@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) ...@@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/ */
logical_decoding_ctx = logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false, CreateDecodingContext(cmd->startpoint, cmd->options, false,
logical_read_xlog_page, XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData, WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress); WalSndUpdateProgress);
...@@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg) ...@@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
} }
/* walsender's openSegment callback for WALRead */ /* XLogReaderRoutine->segment_open callback */
static int static int
WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, WalSndSegmentOpen(XLogReaderState *state,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p) TimeLineID *tli_p)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
...@@ -2531,6 +2537,12 @@ XLogSendPhysical(void) ...@@ -2531,6 +2537,12 @@ XLogSendPhysical(void)
Size nbytes; Size nbytes;
XLogSegNo segno; XLogSegNo segno;
WALReadError errinfo; WALReadError errinfo;
static XLogReaderState fake_xlogreader =
{
/* Fake xlogreader state for WALRead */
.routine.segment_open = WalSndSegmentOpen,
.routine.segment_close = wal_segment_close
};
/* If requested switch the WAL sender to the stopping state. */ /* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING) if (got_STOPPING)
...@@ -2748,7 +2760,8 @@ XLogSendPhysical(void) ...@@ -2748,7 +2760,8 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes); enlargeStringInfo(&output_message, nbytes);
retry: retry:
if (!WALRead(&output_message.data[output_message.len], if (!WALRead(&fake_xlogreader,
&output_message.data[output_message.len],
startptr, startptr,
nbytes, nbytes,
sendSeg->ws_tli, /* Pass the current TLI because only sendSeg->ws_tli, /* Pass the current TLI because only
...@@ -2756,7 +2769,6 @@ retry: ...@@ -2756,7 +2769,6 @@ retry:
* TLI is needed. */ * TLI is needed. */
sendSeg, sendSeg,
sendCxt, sendCxt,
WalSndSegmentOpen,
&errinfo)) &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
......
...@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, ...@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
private.tliIndex = tliIndex; private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand; private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private); &private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
...@@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, ...@@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
private.tliIndex = tliIndex; private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand; private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private); &private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
...@@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, ...@@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
private.tliIndex = tliIndex; private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand; private.restoreCommand = restoreCommand;
xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, xlogreader = XLogReaderAllocate(WalSegSz, datadir,
XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private); &private);
if (xlogreader == NULL) if (xlogreader == NULL)
pg_fatal("out of memory"); pg_fatal("out of memory");
......
...@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname) ...@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
return NULL; /* not reached */ return NULL; /* not reached */
} }
/* pg_waldump's openSegment callback for WALRead */ /* pg_waldump's XLogReaderRoutine->segment_open callback */
static int static int
WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, WALDumpOpenSegment(XLogReaderState *state,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p) TimeLineID *tli_p)
{ {
TimeLineID tli = *tli_p; TimeLineID tli = *tli_p;
...@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, ...@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
} }
/* /*
* XLogReader read_page callback * pg_waldump's XLogReaderRoutine->segment_close callback. Same as
* wal_segment_close
*/ */
static void
WALDumpCloseSegment(XLogReaderState *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/* pg_waldump's XLogReaderRoutine->page_read callback */
static int static int
WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetPtr, char *readBuff) XLogRecPtr targetPtr, char *readBuff)
...@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, ...@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
} }
} }
if (!WALRead(readBuff, targetPagePtr, count, private->timeline, if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
&state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) &state->seg, &state->segcxt,
&errinfo))
{ {
WALOpenSegment *seg = &errinfo.wre_seg; WALOpenSegment *seg = &errinfo.wre_seg;
char fname[MAXPGPATH]; char fname[MAXPGPATH];
...@@ -1031,7 +1043,11 @@ main(int argc, char **argv) ...@@ -1031,7 +1043,11 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */ /* done with argument parsing, do the actual work */
/* we have everything we need, start reading */ /* we have everything we need, start reading */
xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, xlogreader_state =
XLogReaderAllocate(WalSegSz, waldir,
XL_ROUTINE(.page_read = WALDumpReadPage,
.segment_open = WALDumpOpenSegment,
.segment_close = WALDumpCloseSegment),
&private); &private);
if (!xlogreader_state) if (!xlogreader_state)
fatal_error("out of memory"); fatal_error("out of memory");
......
...@@ -17,6 +17,13 @@ ...@@ -17,6 +17,13 @@
* XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord() * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
* until it returns NULL. * until it returns NULL.
* *
* Callers supply a page_read callback if they want to to call
* XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
* otherwise. The WALRead function can be used as a helper to write
* page_read callbacks, but it is not mandatory; callers that use it,
* must supply open_segment callbacks. The close_segment callback
* must always be supplied.
*
* After reading a record with XLogReadRecord(), it's decomposed into * After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed * the per-block and main data parts, and the parts can be accessed
* with the XLogRec* macros and functions. You can also decode a * with the XLogRec* macros and functions. You can also decode a
...@@ -50,12 +57,69 @@ typedef struct WALSegmentContext ...@@ -50,12 +57,69 @@ typedef struct WALSegmentContext
typedef struct XLogReaderState XLogReaderState; typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */ /* Function type definitions for various xlogreader interactions */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr, XLogRecPtr targetPagePtr,
int reqLen, int reqLen,
XLogRecPtr targetRecPtr, XLogRecPtr targetRecPtr,
char *readBuf); char *readBuf);
typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
XLogSegNo nextSegNo,
WALSegmentContext *segcxt,
TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine
{
/*
* Data input callback
*
* This callback shall read at least reqLen valid bytes of the xlog page
* starting at targetPagePtr, and store them in readBuf. The callback
* shall return the number of bytes read (never more than XLOG_BLCKSZ), or
* -1 on failure. The callback shall sleep, if necessary, to wait for the
* requested bytes to become available. The callback will not be invoked
* again for the same page unless more than the returned number of bytes
* are needed.
*
* targetRecPtr is the position of the WAL record we're reading. Usually
* it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
* to read and verify the page or segment header, before it reads the
* actual WAL record it's interested in. In that case, targetRecPtr can
* be used to determine which timeline to read the page from.
*
* The callback shall set ->seg.ws_tli to the TLI of the file the page was
* read from.
*/
XLogPageReadCB page_read;
/*
* Callback to open the specified WAL segment for reading. The file
* descriptor of the opened segment shall be returned. In case of
* failure, an error shall be raised by the callback and it shall not
* return.
*
* "nextSegNo" is the number of the segment to be opened.
*
* "segcxt" is additional information about the segment.
*
* "tli_p" is an input/output argument. XLogRead() uses it to pass the
* timeline in which the new segment should be found, but the callback can
* use it to return the TLI that it actually opened.
*
* BasicOpenFile() is the preferred way to open the segment file in
* backend code, whereas open(2) should be used in frontend.
*/
WALSegmentOpenCB segment_open;
/*
* WAL segment close callback. ->seg.ws_file shall be set to a negative
* number.
*/
WALSegmentCloseCB segment_close;
} XLogReaderRoutine;
#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct typedef struct
{ {
...@@ -88,33 +152,16 @@ typedef struct ...@@ -88,33 +152,16 @@ typedef struct
struct XLogReaderState struct XLogReaderState
{ {
/*
* Operational callbacks
*/
XLogReaderRoutine routine;
/* ---------------------------------------- /* ----------------------------------------
* Public parameters * Public parameters
* ---------------------------------------- * ----------------------------------------
*/ */
/*
* Data input callback (mandatory).
*
* This callback shall read at least reqLen valid bytes of the xlog page
* starting at targetPagePtr, and store them in readBuf. The callback
* shall return the number of bytes read (never more than XLOG_BLCKSZ), or
* -1 on failure. The callback shall sleep, if necessary, to wait for the
* requested bytes to become available. The callback will not be invoked
* again for the same page unless more than the returned number of bytes
* are needed.
*
* targetRecPtr is the position of the WAL record we're reading. Usually
* it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
* to read and verify the page or segment header, before it reads the
* actual WAL record it's interested in. In that case, targetRecPtr can
* be used to determine which timeline to read the page from.
*
* The callback shall set ->seg.ws_tli to the TLI of the file the page was
* read from.
*/
XLogPageReadCB read_page;
/* /*
* System identifier of the xlog files we're about to read. Set to zero * System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant. * (the default value) if unknown or unimportant.
...@@ -214,30 +261,13 @@ struct XLogReaderState ...@@ -214,30 +261,13 @@ struct XLogReaderState
/* Get a new XLogReader */ /* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir, const char *waldir,
XLogPageReadCB pagereadfunc, XLogReaderRoutine *routine,
void *private_data); void *private_data);
extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */ /* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state); extern void XLogReaderFree(XLogReaderState *state);
/*
* Callback to open the specified WAL segment for reading. Returns a valid
* file descriptor when the file was opened successfully.
*
* "nextSegNo" is the number of the segment to be opened.
*
* "segcxt" is additional information about the segment.
*
* "tli_p" is an input/output argument. XLogRead() uses it to pass the
* timeline in which the new segment should be found, but the callback can use
* it to return the TLI that it actually opened.
*
* BasicOpenFile() is the preferred way to open the segment file in backend
* code, whereas open(2) should be used in frontend.
*/
typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p);
/* Initialize supporting structures */ /* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir); int segsize, const char *waldir);
...@@ -269,9 +299,10 @@ typedef struct WALReadError ...@@ -269,9 +299,10 @@ typedef struct WALReadError
WALOpenSegment wre_seg; /* Segment we tried to read from. */ WALOpenSegment wre_seg; /* Segment we tried to read from. */
} WALReadError; } WALReadError;
extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, extern bool WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg, TimeLineID tli, WALOpenSegment *seg,
WALSegmentContext *segcxt, WALSegmentOpen openSegment, WALSegmentContext *segcxt,
WALReadError *errinfo); WALReadError *errinfo);
/* Functions for decoding an XLogRecord */ /* Functions for decoding an XLogRecord */
......
...@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); ...@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state, extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page); XLogRecPtr targetRecPtr, char *cur_page);
extern int wal_segment_open(XLogReaderState *state,
XLogSegNo nextSegNo,
WALSegmentContext *segcxt,
TimeLineID *tli_p);
extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state, extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength); XLogRecPtr wantPage, uint32 wantLength);
......
...@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, ...@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options, List *output_plugin_options,
bool need_full_snapshot, bool need_full_snapshot,
XLogRecPtr restart_lsn, XLogRecPtr restart_lsn,
XLogPageReadCB read_page, XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress); LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options, List *output_plugin_options,
bool fast_forward, bool fast_forward,
XLogPageReadCB read_page, XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress); LogicalOutputPluginWriterUpdateProgress update_progress);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment