Commit 8aa75e13 authored by Alvaro Herrera's avatar Alvaro Herrera

Refrain from duplicating data in reorderbuffers

If a walsender exits leaving data in reorderbuffers, the next walsender
that tries to decode the same transaction would append its decoded data
in the same spill files without truncating it first, which effectively
duplicate the data.  Avoid that by removing any leftover reorderbuffer
spill files when a walsender starts.

Backpatch to 9.4; this bug has been there from the very beginning of
logical decoding.

Author: Craig Ringer, revised by me
Reviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
parent 286c0ab2
......@@ -196,6 +196,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
......@@ -214,7 +217,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
/*
* Allocate a new ReorderBuffer
* Allocate a new ReorderBuffer and clean out any old serialized state from
* prior ReorderBuffer instances for the same slot.
*/
ReorderBuffer *
ReorderBufferAllocate(void)
......@@ -223,6 +227,8 @@ ReorderBufferAllocate(void)
HASHCTL hash_ctl;
MemoryContext new_ctx;
Assert(MyReplicationSlot != NULL);
/* allocate memory in own context, to have better accountability */
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
"ReorderBuffer",
......@@ -269,6 +275,13 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->toplevel_by_lsn);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
* prior exit avoided calling ReorderBufferFree. Failure to do this can
* produce duplicated txns, and it's very cheap if there's nothing there.
*/
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
return buffer;
}
......@@ -285,6 +298,9 @@ ReorderBufferFree(ReorderBuffer *rb)
* memory context.
*/
MemoryContextDelete(context);
/* Free disk space used by unconsumed reorder buffers */
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
}
/*
......@@ -2030,7 +2046,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
char path[MAXPGPATH];
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
......@@ -2058,21 +2073,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (fd == -1 ||
!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
{
XLogRecPtr recptr;
char path[MAXPGPATH];
if (fd != -1)
CloseTransientFile(fd);
XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr, wal_segment_size);
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
NameStr(MyReplicationSlot->data.name), txn->xid,
(uint32) (recptr >> 32), (uint32) recptr);
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
curOpenSegNo);
/* open segment, create it if necessary */
fd = OpenTransientFile(path,
......@@ -2081,8 +2094,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
errmsg("could not open file \"%s\": %m", path)));
}
ReorderBufferSerializeChange(rb, txn, fd, change);
......@@ -2300,25 +2312,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd == -1)
{
XLogRecPtr recptr;
char path[MAXPGPATH];
/* first time in */
if (*segno == 0)
{
XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
}
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
XLogSegNoOffsetToRecPtr(*segno, 0, recptr, wal_segment_size);
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
NameStr(MyReplicationSlot->data.name), txn->xid,
(uint32) (recptr >> 32), (uint32) recptr);
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (*fd < 0 && errno == ENOENT)
......@@ -2332,7 +2339,6 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
}
/*
......@@ -2554,13 +2560,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
for (cur = first; cur <= last; cur++)
{
char path[MAXPGPATH];
XLogRecPtr recptr;
XLogSegNoOffsetToRecPtr(cur, 0, recptr, wal_segment_size);
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
NameStr(MyReplicationSlot->data.name), txn->xid,
(uint32) (recptr >> 32), (uint32) recptr);
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
......@@ -2568,6 +2569,63 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
}
/*
* Remove any leftover serialized reorder buffers from a slot directory after a
* prior crash or decoding session exit.
*/
static void
ReorderBufferCleanupSerializedTXNs(const char *slotname)
{
DIR *spill_dir;
struct dirent *spill_de;
struct stat statbuf;
char path[MAXPGPATH * 2 + 12];
sprintf(path, "pg_replslot/%s", slotname);
/* we're only handling directories here, skip if it's not ours */
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
return;
spill_dir = AllocateDir(path);
while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
{
/* only look at names that can be ours */
if (strncmp(spill_de->d_name, "xid", 3) == 0)
{
snprintf(path, sizeof(path),
"pg_replslot/%s/%s", slotname,
spill_de->d_name);
if (unlink(path) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
path, slotname)));
}
}
FreeDir(spill_dir);
}
/*
* Given a replication slot, transaction ID and segment number, fill in the
* corresponding spill file into 'path', which is a caller-owned buffer of size
* at least MAXPGPATH.
*/
static void
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
XLogSegNo segno)
{
XLogRecPtr recptr;
XLogSegNoOffsetToRecPtr(segno, 0, recptr, wal_segment_size);
snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
NameStr(MyReplicationSlot->data.name),
xid,
(uint32) (recptr >> 32), (uint32) recptr);
}
/*
* Delete all data spilled to disk after we've restarted/crashed. It will be
* recreated when the respective slots are reused.
......@@ -2578,15 +2636,9 @@ StartupReorderBuffer(void)
DIR *logical_dir;
struct dirent *logical_de;
DIR *spill_dir;
struct dirent *spill_de;
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{
struct stat statbuf;
char path[MAXPGPATH * 2 + 12];
if (strcmp(logical_de->d_name, ".") == 0 ||
strcmp(logical_de->d_name, "..") == 0)
continue;
......@@ -2599,33 +2651,7 @@ StartupReorderBuffer(void)
* ok, has to be a surviving logical slot, iterate and delete
* everything starting with xid-*
*/
sprintf(path, "pg_replslot/%s", logical_de->d_name);
/* we're only creating directories here, skip if it's not our's */
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
continue;
spill_dir = AllocateDir(path);
while ((spill_de = ReadDir(spill_dir, path)) != NULL)
{
if (strcmp(spill_de->d_name, ".") == 0 ||
strcmp(spill_de->d_name, "..") == 0)
continue;
/* only look at names that can be ours */
if (strncmp(spill_de->d_name, "xid", 3) == 0)
{
sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
spill_de->d_name);
if (unlink(path) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m",
path)));
}
}
FreeDir(spill_dir);
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
}
FreeDir(logical_dir);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment