Commit 234a2660 authored by Alvaro Herrera's avatar Alvaro Herrera

Fix code comments regarding logical decoding

Back in 3b02ea4f I added some comments in various places to explain
how logical decoding and other things worked.  Not all of the changes
were welcome, because they were misleading or wrong.  This changes them
a little bit to make them more accurate.

Some other comments are also changed to be more accurate.  Also, fix a
bunch of typos.

Author: Álvaro Herrera, Craig Ringer

Andres Freund reviewed some parts of this.
parent 21c2b1c6
...@@ -305,10 +305,18 @@ CreateInitDecodingContext(char *plugin, ...@@ -305,10 +305,18 @@ CreateInitDecodingContext(char *plugin,
* Create a new decoding context, for a logical slot that has previously been * Create a new decoding context, for a logical slot that has previously been
* used already. * used already.
* *
* start_lsn contains the LSN of the last received data or InvalidXLogRecPtr * start_lsn
* output_plugin_options contains options passed to the output plugin * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
* read_page, prepare_write, do_write are callbacks that have to be filled to * from the slot's confirmed_flush; otherwise, start from the specified
* perform the use-case dependent, actual, work. * location (but move it forwards to confirmed_flush if it's older than
* that, see below).
*
* output_plugin_options
* contains options passed to the output plugin.
*
* read_page, prepare_write, do_write
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
* *
* Needs to be called while in a memory context that's at least as long lived * Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created * as the decoding context because further memory contexts will be created
...@@ -745,7 +753,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ...@@ -745,7 +753,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
* replication slot. * replication slot.
* *
* Note that in the most cases, we won't be able to immediately use the xmin * Note that in the most cases, we won't be able to immediately use the xmin
* to increase the xmin horizon, we need to wait till the client has confirmed * to increase the xmin horizon: we need to wait till the client has confirmed
* receiving current_lsn with LogicalConfirmReceivedLocation(). * receiving current_lsn with LogicalConfirmReceivedLocation().
*/ */
void void
...@@ -890,7 +898,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) ...@@ -890,7 +898,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
MyReplicationSlot->data.confirmed_flush = lsn; MyReplicationSlot->data.confirmed_flush = lsn;
/* if were past the location required for bumping xmin, do so */ /* if we're past the location required for bumping xmin, do so */
if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
MyReplicationSlot->candidate_xmin_lsn <= lsn) MyReplicationSlot->candidate_xmin_lsn <= lsn)
{ {
...@@ -926,7 +934,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) ...@@ -926,7 +934,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex); SpinLockRelease(&MyReplicationSlot->mutex);
/* first write new xmin to disk, so we know whats up after a crash */ /* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart) if (updated_xmin || updated_restart)
{ {
ReplicationSlotMarkDirty(); ReplicationSlotMarkDirty();
......
...@@ -238,10 +238,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -238,10 +238,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
PG_TRY(); PG_TRY();
{ {
/* /* restart at slot's confirmed_flush */
* Passing InvalidXLogRecPtr here causes replay to start at the slot's
* confirmed_flush.
*/
ctx = CreateDecodingContext(InvalidXLogRecPtr, ctx = CreateDecodingContext(InvalidXLogRecPtr,
options, options,
logical_read_local_xlog_page, logical_read_local_xlog_page,
...@@ -265,13 +262,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -265,13 +262,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx->output_writer_private = p; ctx->output_writer_private = p;
/* /*
* We start reading xlog from the restart lsn, even though in * Decoding of WAL must start at restart_lsn so that the entirety of
* CreateDecodingContext we set the snapshot builder up using the * xacts that committed after the slot's confirmed_flush can be
* slot's confirmed_flush. This means we might read xlog we don't * accumulated into reorder buffers.
* actually decode rows from, but the snapshot builder might need it
* to get to a consistent point. The point we start returning data to
* *users* at is the confirmed_flush lsn set up in the decoding
* context.
*/ */
startptr = MyReplicationSlot->data.restart_lsn; startptr = MyReplicationSlot->data.restart_lsn;
......
...@@ -66,7 +66,12 @@ typedef struct ReplicationSlotPersistentData ...@@ -66,7 +66,12 @@ typedef struct ReplicationSlotPersistentData
/* oldest LSN that might be required by this replication slot */ /* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn; XLogRecPtr restart_lsn;
/* oldest LSN that the client has acked receipt for */ /*
* Oldest LSN that the client has acked receipt for. This is used as the
* start_lsn point in case the client doesn't specify one, and also as a
* safety measure to back off in case the client specifies a start_lsn
* that's further in the future than this value.
*/
XLogRecPtr confirmed_flush; XLogRecPtr confirmed_flush;
/* plugin name */ /* plugin name */
...@@ -113,11 +118,10 @@ typedef struct ReplicationSlot ...@@ -113,11 +118,10 @@ typedef struct ReplicationSlot
/* all the remaining data is only used for logical slots */ /* all the remaining data is only used for logical slots */
/* ---- /*
* When the client has confirmed flushes >= candidate_xmin_lsn we can * When the client has confirmed flushes >= candidate_xmin_lsn we can
* advance the catalog xmin, when restart_valid has been passed, * advance the catalog xmin. When restart_valid has been passed,
* restart_lsn can be increased. * restart_lsn can be increased.
* ----
*/ */
TransactionId candidate_catalog_xmin; TransactionId candidate_catalog_xmin;
XLogRecPtr candidate_xmin_lsn; XLogRecPtr candidate_xmin_lsn;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment