Commit ec5896ae authored by Andres Freund's avatar Andres Freund

Fix several weaknesses in slot and logical replication on-disk serialization.

Heikki noticed in 544E23C0.8090605@vmware.com that slot.c and
snapbuild.c were missing the FIN_CRC32 call when computing/checking
checksums of on disk files. That doesn't lower the the error detection
capabilities of the checksum, but is inconsistent with other usages.

In a followup mail Heikki also noticed that, contrary to a comment,
the 'version' and 'length' struct fields of replication slot's on disk
data where not covered by the checksum. That's not likely to lead to
actually missed corruption as those fields are cross checked with the
expected version and the actual file length. But it's wrong
nonetheless.

As fixing these issues makes existing on disk files unreadable, bump
the expected versions of on disk files for both slots and logical
decoding historic catalog snapshots.  This means that loading old
files will fail with
ERROR: "replication slot file ... has unsupported version 1"
and
ERROR: "snapbuild state file ... has unsupported version 1 instead of
2" respectively. Given the low likelihood of anybody already using
these new features in a production setup that seems acceptable.

Fixing these issues made me notice that there's no regression test
covering the loading of historic snapshot from disk - so add one.

Backpatch to 9.4 where these features were introduced.
parent bd4ae0f3
...@@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding ...@@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding
--extra-install=contrib/test_decoding \ --extra-install=contrib/test_decoding \
$(REGRESSCHECKS) $(REGRESSCHECKS)
ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
isolationcheck: all | submake-isolation submake-test_decoding isolationcheck: all | submake-isolation submake-test_decoding
$(MKDIR_P) isolation_output $(MKDIR_P) isolation_output
......
Parsed test spec with 3 sessions
starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
?column?
f
step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
?column?
f
step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
step s2c: COMMIT;
step s1init: <... completed>
?column?
init
step s1insert: INSERT INTO do_write DEFAULT VALUES;
step s1checkpoint: CHECKPOINT;
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
data
BEGIN
table public.do_write: INSERT: id[integer]:1 addedbys2[integer]:null
COMMIT
step s1insert: INSERT INTO do_write DEFAULT VALUES;
step s1alter: ALTER TABLE do_write ADD COLUMN addedbys1 int;
step s1insert: INSERT INTO do_write DEFAULT VALUES;
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
data
BEGIN
table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null
COMMIT
BEGIN
COMMIT
BEGIN
table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null
COMMIT
?column?
stop
# Force usage of ondisk decoding snapshots to test that code path.
setup
{
DROP TABLE IF EXISTS do_write;
CREATE TABLE do_write(id serial primary key);
}
teardown
{
DROP TABLE do_write;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
session "s1"
setup { SET synchronous_commit=on; }
step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');}
step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
step "s1checkpoint" { CHECKPOINT; }
step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
session "s2"
setup { SET synchronous_commit=on; }
step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
step "s2c" { COMMIT; }
session "s3"
setup { SET synchronous_commit=on; }
step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
step "s3c" { COMMIT; }
# Force usage of ondisk snapshot by starting and not finishing a
# transaction with a assigned xid after consistency has been
# reached. In combination with a checkpoint forcing a snapshot to be
# written and a new restart point computed that'll lead to the usage
# of the snapshot.
permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
...@@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk ...@@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version) offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001 #define SNAPBUILD_MAGIC 0x51A1E001
#define SNAPBUILD_VERSION 1 #define SNAPBUILD_VERSION 2
/* /*
* Store/Load a snapshot from disk, depending on the snapshot builder's state. * Store/Load a snapshot from disk, depending on the snapshot builder's state.
...@@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ...@@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
COMP_CRC32C(ondisk->checksum, ondisk_c, sz); COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz; ondisk_c += sz;
FIN_CRC32C(ondisk->checksum);
/* we have valid data now, open tempfile and write it there */ /* we have valid data now, open tempfile and write it there */
fd = OpenTransientFile(tmppath, fd = OpenTransientFile(tmppath,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
...@@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) ...@@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
CloseTransientFile(fd); CloseTransientFile(fd);
FIN_CRC32C(checksum);
/* verify checksum of what we've read */ /* verify checksum of what we've read */
if (!EQ_CRC32C(checksum, ondisk.checksum)) if (!EQ_CRC32C(checksum, ondisk.checksum))
ereport(ERROR, ereport(ERROR,
......
...@@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk ...@@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk
uint32 version; uint32 version;
uint32 length; uint32 length;
/*
* The actual data in the slot that follows can differ based on the above
* 'version'.
*/
ReplicationSlotPersistentData slotdata; ReplicationSlotPersistentData slotdata;
} ReplicationSlotOnDisk; } ReplicationSlotOnDisk;
/* size of the part of the slot that is version independent */ /* size of version independent data */
#define ReplicationSlotOnDiskConstantSize \ #define ReplicationSlotOnDiskConstantSize \
offsetof(ReplicationSlotOnDisk, slotdata) offsetof(ReplicationSlotOnDisk, slotdata)
/* size of the slots that is not version indepenent */ /* size of the part of the slot not covered by the checksum */
#define ReplicationSlotOnDiskDynamicSize \ #define SnapBuildOnDiskNotChecksummedSize \
offsetof(ReplicationSlotOnDisk, version)
/* size of the part covered by the checksum */
#define SnapBuildOnDiskChecksummedSize \
sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
/* size of the slot data that is version dependant */
#define ReplicationSlotOnDiskV2Size \
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */ #define SLOT_MAGIC 0x1051CA1 /* format identifier */
#define SLOT_VERSION 1 /* version for new files */ #define SLOT_VERSION 2 /* version for new files */
/* Control array for replication slot management */ /* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL; ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
...@@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) ...@@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
cp.magic = SLOT_MAGIC; cp.magic = SLOT_MAGIC;
INIT_CRC32C(cp.checksum); INIT_CRC32C(cp.checksum);
cp.version = 1; cp.version = SLOT_VERSION;
cp.length = ReplicationSlotOnDiskDynamicSize; cp.length = ReplicationSlotOnDiskV2Size;
SpinLockAcquire(&slot->mutex); SpinLockAcquire(&slot->mutex);
...@@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) ...@@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
SpinLockRelease(&slot->mutex); SpinLockRelease(&slot->mutex);
COMP_CRC32C(cp.checksum, COMP_CRC32C(cp.checksum,
(char *) (&cp) + ReplicationSlotOnDiskConstantSize, (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
ReplicationSlotOnDiskDynamicSize); SnapBuildOnDiskChecksummedSize);
FIN_CRC32C(cp.checksum);
if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
{ {
...@@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name) ...@@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name)
path, cp.version))); path, cp.version)));
/* boundary check on length */ /* boundary check on length */
if (cp.length != ReplicationSlotOnDiskDynamicSize) if (cp.length != ReplicationSlotOnDiskV2Size)
ereport(PANIC, ereport(PANIC,
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("replication slot file \"%s\" has corrupted length %u", errmsg("replication slot file \"%s\" has corrupted length %u",
...@@ -1182,8 +1194,9 @@ RestoreSlotFromDisk(const char *name) ...@@ -1182,8 +1194,9 @@ RestoreSlotFromDisk(const char *name)
/* now verify the CRC */ /* now verify the CRC */
INIT_CRC32C(checksum); INIT_CRC32C(checksum);
COMP_CRC32C(checksum, COMP_CRC32C(checksum,
(char *) &cp + ReplicationSlotOnDiskConstantSize, (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
ReplicationSlotOnDiskDynamicSize); SnapBuildOnDiskChecksummedSize);
FIN_CRC32C(checksum);
if (!EQ_CRC32C(checksum, cp.checksum)) if (!EQ_CRC32C(checksum, cp.checksum))
ereport(PANIC, ereport(PANIC,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment