Commit 1573995f authored by Alvaro Herrera's avatar Alvaro Herrera

Rewrite comments in replication slot advance implementation

The code added by 9c7d06d6 was a bit obscure; clarify that by
rewriting the comments.  Lack of clarity has already caused bugs, so
it's a worthy goal.
Co-authored-by: default avatarArseny Sher <a.sher@postgrespro.ru>
Co-authored-by: default avatarMichaël Paquier <michael@paquier.xyz>
Co-authored-by: default avatarÁlvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: default avatarPetr Jelínek <petr.jelinek@2ndquadrant.com>
Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad
parent 309765fa
...@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin, ...@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
* that, see below). * that, see below).
* *
* output_plugin_options * output_plugin_options
* contains options passed to the output plugin. * options passed to the output plugin.
*
* fast_forward
* bypass the generation of logical changes.
* *
* read_page, prepare_write, do_write, update_progress * read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent, * callbacks that have to be filled to perform the use-case dependent,
......
...@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) ...@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
} }
/* /*
* Helper function for advancing physical replication slot forward. * Helper function for advancing our physical replication slot forward.
* The LSN position to move to is compared simply to the slot's *
* restart_lsn, knowing that any position older than that would be * The LSN position to move to is compared simply to the slot's restart_lsn,
* removed by successive checkpoints. * knowing that any position older than that would be removed by successive
* checkpoints.
*/ */
static XLogRecPtr static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr moveto) pg_physical_replication_slot_advance(XLogRecPtr moveto)
...@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) ...@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
} }
/* /*
* Helper function for advancing logical replication slot forward. * Helper function for advancing our logical replication slot forward.
*
* The slot's restart_lsn is used as start point for reading records, * The slot's restart_lsn is used as start point for reading records,
* while confirmed_lsn is used as base point for the decoding context. * while confirmed_lsn is used as base point for the decoding context.
* The LSN position to move to is checked by doing a per-record scan and *
* logical decoding which makes sure that confirmed_lsn is updated to a * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
* LSN which allows the future slot consumer to get consistent logical * because we need to digest WAL to advance restart_lsn allowing to recycle
* changes. * WAL and removal of old catalog tuples. As decoding is done in fast_forward
* mode, no changes are generated anyway.
*/ */
static XLogRecPtr static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto) pg_logical_replication_slot_advance(XLogRecPtr moveto)
{ {
LogicalDecodingContext *ctx; LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner; ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; XLogRecPtr startlsn;
XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush; XLogRecPtr retlsn;
PG_TRY(); PG_TRY();
{ {
/* restart at slot's confirmed_flush */ /*
* Create our decoding context in fast_forward mode, passing start_lsn
* as InvalidXLogRecPtr, so that we start processing from my slot's
* confirmed_flush.
*/
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL, NIL,
true, true, /* fast_forward */
logical_read_local_xlog_page, logical_read_local_xlog_page,
NULL, NULL, NULL); NULL, NULL, NULL);
/*
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
startlsn = MyReplicationSlot->data.restart_lsn;
/* Initialize our return value in case we don't do anything */
retlsn = MyReplicationSlot->data.confirmed_flush;
/* invalidate non-timetravel entries */ /* invalidate non-timetravel entries */
InvalidateSystemCaches(); InvalidateSystemCaches();
/* Decode until we run out of records */ /* Decode at least one record, until we run out of records */
while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || while ((!XLogRecPtrIsInvalid(startlsn) &&
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) startlsn < moveto) ||
(!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
ctx->reader->EndRecPtr < moveto))
{ {
XLogRecord *record;
char *errm = NULL; char *errm = NULL;
XLogRecord *record;
/*
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
record = XLogReadRecord(ctx->reader, startlsn, &errm); record = XLogReadRecord(ctx->reader, startlsn, &errm);
if (errm) if (errm)
elog(ERROR, "%s", errm); elog(ERROR, "%s", errm);
/* /* Read sequentially from now on */
* Now that we've set up the xlog reader state, subsequent calls
* pass InvalidXLogRecPtr to say "continue from last record"
*/
startlsn = InvalidXLogRecPtr; startlsn = InvalidXLogRecPtr;
/* /*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will * Process the record. Storage-level changes are ignored in
* store the description into our tuplestore. * fast_forward mode, but other modules (such as snapbuilder)
* might still have critical updates to do.
*/ */
if (record != NULL) if (record)
LogicalDecodingProcessRecord(ctx, ctx->reader); LogicalDecodingProcessRecord(ctx, ctx->reader);
/* Stop once the moving point wanted by caller has been reached */ /* Stop once the requested target has been reached */
if (moveto <= ctx->reader->EndRecPtr) if (moveto <= ctx->reader->EndRecPtr)
break; break;
...@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ...@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
LogicalConfirmReceivedLocation(moveto); LogicalConfirmReceivedLocation(moveto);
/* /*
* If only the confirmed_flush_lsn has changed the slot won't get * If only the confirmed_flush LSN has changed the slot won't get
* marked as dirty by the above. Callers on the walsender * marked as dirty by the above. Callers on the walsender
* interface are expected to keep track of their own progress and * interface are expected to keep track of their own progress and
* don't need it written out. But SQL-interface users cannot * don't need it written out. But SQL-interface users cannot
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment