Commit f731cfa9 authored by Michael Paquier's avatar Michael Paquier

Fix a couple of bugs with replication slot advancing feature

A review of the code has showed up a couple of issues fixed by this
commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position, which
was correct).
- The actual slot update was incorrect for both physical and logical
slots.  Physical slots need to use their restart_lsn as base comparison
point (confirmed_flush was used because of previous point), and logical
slots need to begin reading WAL from restart_lsn (confirmed_flush was
used as well), while confirmed_flush is compiled depending on the
decoding context and record read, and is the LSN position returned back
to the caller.
- Never return 0/0 if a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
to the caller over and over without raising an error.  Instead return
the LSN the slot has been advanced to.  With repetitive calls, the same
position is returned hence caller can directly monitor the difference in
progress in bytes by doing simply LSN difference calculations, which
should be monotonic.

Note that as the slot is owned by the backend advancing it, then the
read of those fields is fine lock-less, while updates need to happen
while the slot mutex is held, so fix that on the way as well.  Other
locks for in-memory data of replication slots have been already fixed
previously.

Some of those issues have been pointed out by Petr and Simon during the
patch, while I noticed some of them after looking at the code.  This
also visibly takes of a recently-discovered bug causing assertion
failures which can be triggered by a two-step slot forwarding which
first advanced the slot to a WAL page boundary and secondly advanced it
to the latest position, say 'FF/FFFFFFF' to make sure that the newest
LSN is used as forward point.  It would have been nice to drop a test
for that, but the set of operators working on pg_lsn limits it, so this
is left for a future exercise.

Author: Michael Paquier
Reviewed-by: Petr Jelinek, Simon Riggs
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
parent 321f648a
...@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) ...@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
/* /*
* Helper function for advancing physical replication slot forward. * Helper function for advancing physical replication slot forward.
* The LSN position to move to is compared simply to the slot's
* restart_lsn, knowing that any position older than that would be
* removed by successive checkpoints.
*/ */
static XLogRecPtr static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) pg_physical_replication_slot_advance(XLogRecPtr moveto)
{ {
XLogRecPtr retlsn = InvalidXLogRecPtr; XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
XLogRecPtr retlsn = startlsn;
SpinLockAcquire(&MyReplicationSlot->mutex); if (startlsn < moveto)
if (MyReplicationSlot->data.restart_lsn < moveto)
{ {
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.restart_lsn = moveto; MyReplicationSlot->data.restart_lsn = moveto;
SpinLockRelease(&MyReplicationSlot->mutex);
retlsn = moveto; retlsn = moveto;
} }
SpinLockRelease(&MyReplicationSlot->mutex);
return retlsn; return retlsn;
} }
/* /*
* Helper function for advancing logical replication slot forward. * Helper function for advancing logical replication slot forward.
* The slot's restart_lsn is used as start point for reading records,
* while confirmed_lsn is used as base point for the decoding context.
* The LSN position to move to is checked by doing a per-record scan and
* logical decoding which makes sure that confirmed_lsn is updated to a
* LSN which allows the future slot consumer to get consistent logical
* changes.
*/ */
static XLogRecPtr static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) pg_logical_replication_slot_advance(XLogRecPtr moveto)
{ {
LogicalDecodingContext *ctx; LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner; ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr retlsn = InvalidXLogRecPtr; XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush;
PG_TRY(); PG_TRY();
{ {
...@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) ...@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
if (record != NULL) if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader); LogicalDecodingProcessRecord(ctx, ctx->reader);
/* check limits */ /* Stop once the moving point wanted by caller has been reached */
if (moveto <= ctx->reader->EndRecPtr) if (moveto <= ctx->reader->EndRecPtr)
break; break;
...@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ...@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
Name slotname = PG_GETARG_NAME(0); Name slotname = PG_GETARG_NAME(0);
XLogRecPtr moveto = PG_GETARG_LSN(1); XLogRecPtr moveto = PG_GETARG_LSN(1);
XLogRecPtr endlsn; XLogRecPtr endlsn;
XLogRecPtr startlsn; XLogRecPtr minlsn;
TupleDesc tupdesc; TupleDesc tupdesc;
Datum values[2]; Datum values[2];
bool nulls[2]; bool nulls[2];
...@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ...@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */ /* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true); ReplicationSlotAcquire(NameStr(*slotname), true);
startlsn = MyReplicationSlot->data.confirmed_flush; /*
if (moveto < startlsn) * Check if the slot is not moving backwards. Physical slots rely simply
* on restart_lsn as a minimum point, while logical slots have confirmed
* consumption up to confirmed_lsn, meaning that in both cases data older
* than that is not available anymore.
*/
if (OidIsValid(MyReplicationSlot->data.database))
minlsn = MyReplicationSlot->data.confirmed_flush;
else
minlsn = MyReplicationSlot->data.restart_lsn;
if (moveto < minlsn)
{ {
ReplicationSlotRelease(); ReplicationSlotRelease();
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move slot to %X/%X, minimum is %X/%X", errmsg("cannot move slot to %X/%X, minimum is %X/%X",
(uint32) (moveto >> 32), (uint32) moveto, (uint32) (moveto >> 32), (uint32) moveto,
(uint32) (startlsn >> 32), (uint32) startlsn))); (uint32) (minlsn >> 32), (uint32) minlsn)));
} }
/* Do the actual slot update, depending on the slot type */
if (OidIsValid(MyReplicationSlot->data.database)) if (OidIsValid(MyReplicationSlot->data.database))
endlsn = pg_logical_replication_slot_advance(startlsn, moveto); endlsn = pg_logical_replication_slot_advance(moveto);
else else
endlsn = pg_physical_replication_slot_advance(startlsn, moveto); endlsn = pg_physical_replication_slot_advance(moveto);
values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false; nulls[0] = false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment