Commit 24c5f1a1 authored by Alvaro Herrera's avatar Alvaro Herrera

Enable logical slots to follow timeline switches

When decoding from a logical slot, it's necessary for xlog reading to be
able to read xlog from historical (i.e. not current) timelines;
otherwise, decoding fails after failover, because the archives are in
the historical timeline.  This is required to make "failover logical
slots" possible; it currently has no other use, although theoretically
it could be used by an extension that creates a slot on a standby and
continues to replay from the slot when the standby is promoted.

This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.

Author: Craig Ringer
Reviewed-By: Oleksii Kliukin, Andres Freund, Petr Jelínek
parent 3b02ea4f
......@@ -118,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
return NULL;
}
#ifndef FRONTEND
/* Will be loaded on first read */
state->timelineHistory = NIL;
#endif
return state;
}
......@@ -137,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
pfree(state->errormsg_buf);
if (state->readRecordBuf)
pfree(state->readRecordBuf);
#ifndef FRONTEND
if (state->timelineHistory)
list_free_deep(state->timelineHistory);
#endif
pfree(state->readBuf);
pfree(state);
}
......
......@@ -19,6 +19,7 @@
#include <unistd.h>
#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
......@@ -659,6 +660,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
......@@ -674,7 +676,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
sendTLI != tli)
{
char path[MAXPGPATH];
......@@ -701,6 +704,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
sendTLI = tli;
}
/* Need to seek in the file? */
......@@ -748,6 +752,147 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
}
/*
* Determine XLogReaderState->currTLI and ->currTLIValidUntil;
* XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
* decision. This may later be used to determine which xlog segment file to
* open, etc.
*
* We switch to an xlog segment from the new timeline eagerly when on a
* historical timeline, as soon as we reach the start of the xlog segment
* containing the timeline switch. The server copied the segment to the new
* timeline so all the data up to the switch point is the same, but there's no
* guarantee the old segment will still exist. It may have been deleted or
* renamed with a .partial suffix so we can't necessarily keep reading from
* the old TLI even though tliSwitchPoint says it's OK.
*
* Because of this, callers MAY NOT assume that currTLI is the timeline that
* will be in a page's xlp_tli; the page may begin on an older timeline or we
* might be reading from historical timeline data on a segment that's been
* copied to a new timeline.
*/
static void
XLogReadDetermineTimeline(XLogReaderState *state)
{
/* Read the history on first time through */
if (state->timelineHistory == NIL)
state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
/*
* Are we reading the record immediately following the one we read last
* time? If not, then don't use the cached timeline info.
*/
if (state->currRecPtr != state->EndRecPtr)
{
state->currTLI = 0;
state->currTLIValidUntil = InvalidXLogRecPtr;
}
/*
* Are we reading a timeline that used to be the latest one, but became
* historical? This can happen in a replica that gets promoted, and in a
* cascading replica whose upstream gets promoted. In either case,
* re-read the timeline history data. We cannot read past the timeline
* switch point, because either the records in the old timeline might be
* invalid, or worse, they may valid but *different* from the ones we
* should be reading.
*/
if (state->currTLIValidUntil == InvalidXLogRecPtr &&
state->currTLI != ThisTimeLineID &&
state->currTLI != 0)
{
/* re-read timeline history */
list_free_deep(state->timelineHistory);
state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
elog(DEBUG2, "timeline %u became historical during decoding",
state->currTLI);
/* then invalidate the cached timeline info */
state->currTLI = 0;
state->currTLIValidUntil = InvalidXLogRecPtr;
}
/*
* Are we reading a record immediately following a timeline switch? If
* so, we must follow the switch too.
*/
if (state->currRecPtr == state->EndRecPtr &&
state->currTLI != 0 &&
state->currTLIValidUntil != InvalidXLogRecPtr &&
state->currRecPtr >= state->currTLIValidUntil)
{
elog(DEBUG2,
"requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline",
(uint32) (state->currRecPtr >> 32),
(uint32) state->currRecPtr,
state->currTLI,
(uint32) (state->currTLIValidUntil >> 32),
(uint32) (state->currTLIValidUntil));
/* invalidate TLI info so we look up the next TLI */
state->currTLI = 0;
state->currTLIValidUntil = InvalidXLogRecPtr;
}
if (state->currTLI == 0)
{
/*
* Something changed; work out what timeline this record is on. We
* might read it from the segment on this TLI or, if the segment is
* also contained by newer timelines, the copy from a newer TLI.
*/
state->currTLI = tliOfPointInHistory(state->currRecPtr,
state->timelineHistory);
/*
* Look for the most recent timeline that's on the same xlog segment
* as this record, since that's the only one we can assume is still
* readable.
*/
while (state->currTLI != ThisTimeLineID &&
state->currTLIValidUntil == InvalidXLogRecPtr)
{
XLogRecPtr tliSwitch;
TimeLineID nextTLI;
CHECK_FOR_INTERRUPTS();
tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
&nextTLI);
/* round ValidUntil down to start of seg containing the switch */
state->currTLIValidUntil =
((tliSwitch / XLogSegSize) * XLogSegSize);
if (state->currRecPtr >= state->currTLIValidUntil)
{
/*
* The new currTLI ends on this WAL segment so check the next
* TLI to see if it's the last one on the segment.
*
* If that's the current TLI we'll stop searching.
*/
state->currTLI = nextTLI;
state->currTLIValidUntil = InvalidXLogRecPtr;
}
}
/*
* We're now either reading from the first xlog segment in the current
* server's timeline or the most recent historical timeline that
* exists on the target segment.
*/
elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
(uint32) (state->currRecPtr >> 32),
(uint32) state->currRecPtr,
state->currTLI,
(uint32) (state->currTLIValidUntil >> 32),
(uint32) (state->currTLIValidUntil),
ThisTimeLineID);
}
}
/*
* read_page callback for reading local xlog files
*
......@@ -761,48 +906,101 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
*/
int
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
TimeLineID *pageTLI)
{
XLogRecPtr flushptr,
XLogRecPtr read_upto,
loc;
int count;
loc = targetPagePtr + reqLen;
/* Make sure enough xlog is available... */
while (1)
{
/*
* TODO: we're going to have to do something more intelligent about
* timelines on standbys. Use readTimeLineHistory() and
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
* that case earlier, but the code and TODO is left in here for when
* that changes.
* Check which timeline to get the record from.
*
* We have to do it each time through the loop because if we're in
* recovery as a cascading standby, the current timeline might've
* become historical.
*/
if (!RecoveryInProgress())
XLogReadDetermineTimeline(state);
if (state->currTLI == ThisTimeLineID)
{
*pageTLI = ThisTimeLineID;
flushptr = GetFlushRecPtr();
/*
* We're reading from the current timeline so we might have to
* wait for the desired record to be generated (or, for a standby,
* received & replayed)
*/
if (!RecoveryInProgress())
{
*pageTLI = ThisTimeLineID;
read_upto = GetFlushRecPtr();
}
else
read_upto = GetXLogReplayRecPtr(pageTLI);
if (loc <= read_upto)
break;
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
}
else
flushptr = GetXLogReplayRecPtr(pageTLI);
if (loc <= flushptr)
{
/*
* We're on a historical timeline, so limit reading to the switch
* point where we moved to the next timeline.
*
* We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
* about the new timeline, so we must've received past the end of
* it.
*/
read_upto = state->currTLIValidUntil;
/*
* Setting pageTLI to our wanted record's TLI is slightly wrong;
* the page might begin on an older timeline if it contains a
* timeline switch, since its xlog segment will have been copied
* from the prior timeline. This is pretty harmless though, as
* nothing cares so long as the timeline doesn't go backwards. We
* should read the page header instead; FIXME someday.
*/
*pageTLI = state->currTLI;
/* No need to wait on a historical timeline */
break;
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
}
}
/* more than one block available */
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
{
/*
* more than one block available; read only that block, have caller
* come back if they need more.
*/
count = XLOG_BLCKSZ;
/* not enough data there */
else if (targetPagePtr + reqLen > flushptr)
}
else if (targetPagePtr + reqLen > read_upto)
{
/* not enough data there */
return -1;
/* part of the page available */
}
else
count = flushptr - targetPagePtr;
{
/* enough bytes available to satisfy the request */
count = read_upto - targetPagePtr;
}
/*
* Even though we just determined how much of the page can be validly read
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
/* number of valid bytes in the buffer */
return count;
}
......@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
/* compute the current end-of-wal */
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
else
end_of_wal = GetXLogReplayRecPtr(NULL);
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
......@@ -273,7 +267,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* slot's confirmed_flush. This means we might read xlog we don't
* actually decode rows from, but the snapshot builder might need it
* to get to a consistent point. The point we start returning data to
* *users* at is the candidate restart lsn from the decoding context.
* *users* at is the confirmed_flush lsn set up in the decoding
* context.
*/
startptr = MyReplicationSlot->data.restart_lsn;
......@@ -282,8 +277,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
else
end_of_wal = GetXLogReplayRecPtr(NULL);
/* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
XLogRecord *record;
char *errm = NULL;
......
......@@ -27,6 +27,10 @@
#include "access/xlogrecord.h"
#ifndef FRONTEND
#include "nodes/pg_list.h"
#endif
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
......@@ -160,11 +164,25 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
/* timeline to read it from, 0 if a lookup is required */
TimeLineID currTLI;
/*
* Safe point to read to in currTLI. If currTLI is historical, then this
* is set to the end of the last whole segment that contains that TLI;
* if currTLI is ThisTimeLineID, this is InvalidXLogRecPtr. This is *not*
* the tliSwitchPoint.
*/
XLogRecPtr currTLIValidUntil;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
uint32 readRecordBufSize;
#ifndef FRONTEND
/* cached timeline history, only available in backend */
List *timelineHistory;
#endif
/* Buffer to hold error message */
char *errormsg_buf;
};
......
......@@ -13,6 +13,7 @@ SUBDIRS = \
test_parser \
test_rls_hooks \
test_shm_mq \
test_slot_timelines \
worker_spi
all: submake-errcodes
......
# src/test/modules/test_slot_timelines/Makefile
MODULES = test_slot_timelines
PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
EXTENSION = test_slot_timelines
DATA = test_slot_timelines--1.0.sql
EXTRA_INSTALL=contrib/test_decoding
REGRESS=load_extension
REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/test_slot_timelines
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
A test module for logical decoding failover and timeline following.
This module provides a minimal way to maintain logical slots on replicas
that mirror the state on the master. It doesn't make decoding possible,
just tracking slot state so that a decoding client that's using the master
can follow a physical failover to the standby. The master doesn't know
about the slots on the standby, they're synced by a client that connects
to both.
This is intentionally not part of the test_decoding module because that's meant
to serve as example code, where this module exercises internal server features
by unsafely exposing internal state to SQL. It's not the right way to do
failover, it's just a simple way to test it from the perl TAP framework to
prove the feature works.
In a practical implementation of this approach a bgworker on the master would
monitor slot positions and relay them to a bgworker on the standby that applies
the position updates without exposing slot internals to SQL. That's too complex
for this test framework though.
CREATE EXTENSION test_slot_timelines;
SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
test_slot_timelines_create_logical_slot
-----------------------------------------
(1 row)
SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
test_slot_timelines_advance_logical_slot
------------------------------------------
(1 row)
SELECT pg_drop_replication_slot('test_slot');
pg_drop_replication_slot
--------------------------
(1 row)
CREATE EXTENSION test_slot_timelines;
SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
SELECT pg_drop_replication_slot('test_slot');
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
RETURNS void
LANGUAGE c AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
RETURNS void
LANGUAGE c AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn)
IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
/*--------------------------------------------------------------------------
*
* test_slot_timelines.c
* Test harness code for slot timeline following
*
* Copyright (c) 2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_slot_timelines/test_slot_timelines.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/transam.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "replication/slot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
static void clear_slot_transient_state(void);
/*
* Create a new logical slot, with invalid LSN and xid, directly. This does not
* use the snapshot builder or logical decoding machinery. It's only intended
* for creating a slot on a replica that mirrors the state of a slot on an
* upstream master.
*
* Note that this is test harness code. You shouldn't expose slot internals
* to SQL like this for any real world usage. See the README.
*/
Datum
test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
{
char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
CheckSlotRequirements();
ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
/* register the plugin name with the slot */
StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
/*
* Initialize persistent state to placeholders to be set by
* test_slot_timelines_advance_logical_slot .
*/
MyReplicationSlot->data.xmin = InvalidTransactionId;
MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
clear_slot_transient_state();
ReplicationSlotRelease();
PG_RETURN_VOID();
}
/*
* Set the state of a slot.
*
* This doesn't maintain the non-persistent state at all,
* but since the slot isn't in use that's OK.
*
* There's intentionally no check to prevent slots going backwards
* because they can actually go backwards if the master crashes when
* it hasn't yet flushed slot state to disk then we copy the older
* slot state after recovery.
*
* There's no checking done for xmin or catalog xmin either, since
* we can't really do anything useful that accounts for xid wrap-around.
*
* Note that this is test harness code. You shouldn't expose slot internals
* to SQL like this for any real world usage. See the README.
*/
Datum
test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
{
char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
TransactionId new_xmin = (TransactionId) PG_GETARG_INT64(1);
TransactionId new_catalog_xmin = (TransactionId) PG_GETARG_INT64(2);
XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
CheckSlotRequirements();
ReplicationSlotAcquire(slotname);
if (MyReplicationSlot->data.database != MyDatabaseId)
elog(ERROR, "Trying to update a slot on a different database");
MyReplicationSlot->data.xmin = new_xmin;
MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
MyReplicationSlot->data.restart_lsn = restart_lsn;
MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
clear_slot_transient_state();
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
PG_RETURN_VOID();
}
static void
clear_slot_transient_state(void)
{
Assert(MyReplicationSlot != NULL);
/*
* Make sure the slot state is the same as if it were newly loaded from
* disk on recovery.
*/
MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
}
# test_slot_timelines extension
comment = 'Test utility for slot timeline following and logical decoding'
default_version = '1.0'
module_pathname = '$libdir/test_slot_timelines'
relocatable = true
......@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment