Commit 0fd38e13 authored by Andres Freund's avatar Andres Freund

Don't skip SQL backends in logical decoding for visibility computation.

The logical decoding patchset introduced PROC_IN_LOGICAL_DECODING flag
PGXACT flag, that allows such backends to be skipped when computing
the xmin horizon/snapshots. That's fine and sensible for walsenders
streaming out logical changes, but not at all fine for SQL backends
doing logical decoding. If the latter set that flag any change they
have performed outside of logical decoding will not be regarded as
visible - which e.g. can lead to that change being vacuumed away.

Note that not setting the flag for SQL backends isn't particularly
bothersome - the SQL backend doesn't do streaming, so it only runs for
a limited amount of time.

Per buildfarm member 'tick' and Alvaro.

Backpatch to 9.4, where logical decoding was introduced.
parent 75ef4352
-- test that we can insert the result of a get_changes call into a
-- logged relation. That's really not a good idea in practical terms,
-- but provides a nice test.
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
......
-- test that we can insert the result of a get_changes call into a
-- logged relation. That's really not a good idea in practical terms,
-- but provides a nice test.
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-- slot works
......
......@@ -146,10 +146,19 @@ StartupDecodingContext(List *output_plugin_options,
* logical decoding backend which doesn't need to be checked individually
* when computing the xmin horizon because the xmin is enforced via
* replication slots.
*
* We can only do so if we're outside of a transaction (i.e. the case when
* streaming changes via walsender), otherwise a already setup
* snapshot/xid would end up being ignored. That's not a particularly
* bothersome restriction since the SQL interface can't be used for
* streaming anyway.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
LWLockRelease(ProcArrayLock);
if (!IsTransactionOrTransactionBlock())
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
LWLockRelease(ProcArrayLock);
}
ctx->slot = slot;
......
......@@ -43,7 +43,7 @@ struct XidCache
#define PROC_IN_ANALYZE 0x04 /* currently running analyze */
#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */
#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical
* decoding */
* decoding outside xact */
/* flags reset at EOXact */
#define PROC_VACUUM_STATE_MASK \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment