Commit 56e19d93 authored by Andres Freund's avatar Andres Freund

Don't use on-disk snapshots for exported logical decoding snapshot.

Logical decoding stores historical snapshots on disk, so that logical
decoding can restart without having to reconstruct a snapshot from
scratch (for which the resources are not guaranteed to be present
anymore).  These serialized snapshots were also used when creating a
new slot via the walsender interface, which can export a "full"
snapshot (i.e. one that can read all tables, not just catalog ones).

The problem is that the serialized snapshots are only useful for
catalogs and not for normal user tables.  Thus the use of such a
serialized snapshot could result in an inconsistent snapshot being
exported, which could lead to queries returning wrong data.  This
would only happen if logical slots are created while another logical
slot already exists.

Author: Petr Jelinek
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com
Backport: 9.4, where logical decoding was introduced.
parent 7834d20b
...@@ -114,6 +114,7 @@ static LogicalDecodingContext * ...@@ -114,6 +114,7 @@ static LogicalDecodingContext *
StartupDecodingContext(List *output_plugin_options, StartupDecodingContext(List *output_plugin_options,
XLogRecPtr start_lsn, XLogRecPtr start_lsn,
TransactionId xmin_horizon, TransactionId xmin_horizon,
bool need_full_snapshot,
XLogPageReadCB read_page, XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write) LogicalOutputPluginWriterWrite do_write)
...@@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options, ...@@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate(); ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder = ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn); AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
need_full_snapshot);
ctx->reorder->private_data = ctx; ctx->reorder->private_data = ctx;
...@@ -297,7 +299,8 @@ CreateInitDecodingContext(char *plugin, ...@@ -297,7 +299,8 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave(); ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
read_page, prepare_write, do_write); need_full_snapshot, read_page, prepare_write,
do_write);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context); old_context = MemoryContextSwitchTo(ctx->context);
...@@ -386,7 +389,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ...@@ -386,7 +389,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
} }
ctx = StartupDecodingContext(output_plugin_options, ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, start_lsn, InvalidTransactionId, false,
read_page, prepare_write, do_write); read_page, prepare_write, do_write);
/* call output plugin initialization callback */ /* call output plugin initialization callback */
......
...@@ -165,6 +165,9 @@ struct SnapBuild ...@@ -165,6 +165,9 @@ struct SnapBuild
*/ */
TransactionId initial_xmin_horizon; TransactionId initial_xmin_horizon;
/* Indicates if we are building full snapshot or just catalog one .*/
bool building_full_snapshot;
/* /*
* Snapshot that's valid to see the catalog state seen at this moment. * Snapshot that's valid to see the catalog state seen at this moment.
*/ */
...@@ -281,7 +284,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); ...@@ -281,7 +284,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
SnapBuild * SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer *reorder, AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon, TransactionId xmin_horizon,
XLogRecPtr start_lsn) XLogRecPtr start_lsn,
bool need_full_snapshot)
{ {
MemoryContext context; MemoryContext context;
MemoryContext oldcontext; MemoryContext oldcontext;
...@@ -308,6 +312,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, ...@@ -308,6 +312,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon; builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn; builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
...@@ -1245,7 +1250,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn ...@@ -1245,7 +1250,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* *
* a) There were no running transactions when the xl_running_xacts record * a) There were no running transactions when the xl_running_xacts record
* was inserted, jump to CONSISTENT immediately. We might find such a * was inserted, jump to CONSISTENT immediately. We might find such a
* state we were waiting for b) and c). * state we were waiting for b) or c).
* *
* b) Wait for all toplevel transactions that were running to end. We * b) Wait for all toplevel transactions that were running to end. We
* simply track the number of in-progress toplevel transactions and * simply track the number of in-progress toplevel transactions and
...@@ -1260,7 +1265,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn ...@@ -1260,7 +1265,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* at all. * at all.
* *
* c) This (in a previous run) or another decoding slot serialized a * c) This (in a previous run) or another decoding slot serialized a
* snapshot to disk that we can use. * snapshot to disk that we can use. Can't use this method for the
* initial snapshot when slot is being created and needs full snapshot
* for export or direct use, as that snapshot will only contain catalog
* modifying transactions.
* --- * ---
*/ */
...@@ -1315,8 +1323,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn ...@@ -1315,8 +1323,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false; return false;
} }
/* c) valid on disk state */ /* c) valid on disk state and not building full snapshot */
else if (SnapBuildRestore(builder, lsn)) else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{ {
/* there won't be any state to cleanup */ /* there won't be any state to cleanup */
return false; return false;
......
...@@ -54,7 +54,8 @@ struct xl_running_xacts; ...@@ -54,7 +54,8 @@ struct xl_running_xacts;
extern void CheckPointSnapBuild(void); extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn); TransactionId xmin_horizon, XLogRecPtr start_lsn,
bool need_full_snapshot);
extern void FreeSnapshotBuilder(SnapBuild *cache); extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap); extern void SnapBuildSnapDecRefcount(Snapshot snap);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment