Commit d2070380 authored by Amit Kapila's avatar Amit Kapila

Fix running out of file descriptors for spill files.

Currently while decoding changes, if the number of changes exceeds a
certain threshold, we spill those to disk.  And this happens for each
(sub)transaction.  Now, while reading all these files, we don't close them
until we read all the files.  While reading these files, if the number of
such files exceeds the maximum number of file descriptors, the operation
errors out.

Use PathNameOpenFile interface to open these files as that internally has
the mechanism to release kernel FDs as needed to get us under the
max_safe_fds limit.

Reported-by: Amit Khandekar
Author: Amit Khandekar
Reviewed-by: Amit Kapila
Backpatch-through: 9.4
Discussion: https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
parent 4b25f5d0
...@@ -131,13 +131,21 @@ typedef struct ReorderBufferTupleCidEnt ...@@ -131,13 +131,21 @@ typedef struct ReorderBufferTupleCidEnt
CommandId combocid; /* just for debugging */ CommandId combocid; /* just for debugging */
} ReorderBufferTupleCidEnt; } ReorderBufferTupleCidEnt;
/* Virtual file descriptor with file offset tracking */
typedef struct TXNEntryFile
{
File vfd; /* -1 when the file is closed */
off_t curOffset; /* offset for next write or read. Reset to 0
* when vfd is opened. */
} TXNEntryFile;
/* k-way in-order change iteration support structures */ /* k-way in-order change iteration support structures */
typedef struct ReorderBufferIterTXNEntry typedef struct ReorderBufferIterTXNEntry
{ {
XLogRecPtr lsn; XLogRecPtr lsn;
ReorderBufferChange *change; ReorderBufferChange *change;
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
int fd; TXNEntryFile file;
XLogSegNo segno; XLogSegNo segno;
} ReorderBufferIterTXNEntry; } ReorderBufferIterTXNEntry;
...@@ -207,7 +215,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); ...@@ -207,7 +215,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
* subtransactions * subtransactions
* --------------------------------------- * ---------------------------------------
*/ */
static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferIterTXNState *volatile *iter_state);
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state); ReorderBufferIterTXNState *state);
...@@ -223,7 +232,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); ...@@ -223,7 +232,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change); int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
int *fd, XLogSegNo *segno); TXNEntryFile *file, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change); char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
...@@ -996,15 +1005,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) ...@@ -996,15 +1005,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
/* /*
* Allocate & initialize an iterator which iterates in lsn order over a * Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions. * transaction and all its subtransactions.
*
* Note: The iterator state is returned through iter_state parameter rather
* than the function's return value. This is because the state gets cleaned up
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
* back the state even if this function throws an exception.
*/ */
static ReorderBufferIterTXNState * static void
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferIterTXNState *volatile *iter_state)
{ {
Size nr_txns = 0; Size nr_txns = 0;
ReorderBufferIterTXNState *state; ReorderBufferIterTXNState *state;
dlist_iter cur_txn_i; dlist_iter cur_txn_i;
int32 off; int32 off;
*iter_state = NULL;
/* /*
* Calculate the size of our heap: one element for every transaction that * Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder * contains changes. (Besides the transactions already in the reorder
...@@ -1039,7 +1056,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1039,7 +1056,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
for (off = 0; off < state->nr_txns; off++) for (off = 0; off < state->nr_txns; off++)
{ {
state->entries[off].fd = -1; state->entries[off].file.vfd = -1;
state->entries[off].segno = 0; state->entries[off].segno = 0;
} }
...@@ -1048,6 +1065,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1048,6 +1065,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferIterCompare, ReorderBufferIterCompare,
state); state);
/* Now that the state fields are initialized, it is safe to return it. */
*iter_state = state;
/* /*
* Now insert items into the binary heap, in an unordered fashion. (We * Now insert items into the binary heap, in an unordered fashion. (We
* will run a heap assembly step at the end; this is more efficient.) * will run a heap assembly step at the end; this is more efficient.)
...@@ -1064,7 +1084,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1064,7 +1084,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{ {
/* serialize remaining changes */ /* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn); ReorderBufferSerializeTXN(rb, txn);
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
&state->entries[off].segno); &state->entries[off].segno);
} }
...@@ -1094,7 +1114,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1094,7 +1114,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* serialize remaining changes */ /* serialize remaining changes */
ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn, ReorderBufferRestoreChanges(rb, cur_txn,
&state->entries[off].fd, &state->entries[off].file,
&state->entries[off].segno); &state->entries[off].segno);
} }
cur_change = dlist_head_element(ReorderBufferChange, node, cur_change = dlist_head_element(ReorderBufferChange, node,
...@@ -1110,8 +1130,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -1110,8 +1130,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* assemble a valid binary heap */ /* assemble a valid binary heap */
binaryheap_build(state->heap); binaryheap_build(state->heap);
return state;
} }
/* /*
...@@ -1175,7 +1193,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ...@@ -1175,7 +1193,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node); dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node); dlist_push_tail(&state->old_change, &change->node);
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno)) &state->entries[off].segno))
{ {
/* successfully restored changes from disk */ /* successfully restored changes from disk */
...@@ -1214,8 +1232,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, ...@@ -1214,8 +1232,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
for (off = 0; off < state->nr_txns; off++) for (off = 0; off < state->nr_txns; off++)
{ {
if (state->entries[off].fd != -1) if (state->entries[off].file.vfd != -1)
CloseTransientFile(state->entries[off].fd); FileClose(state->entries[off].file.vfd);
} }
/* free memory we might have "leaked" in the last *Next call */ /* free memory we might have "leaked" in the last *Next call */
...@@ -1558,7 +1576,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ...@@ -1558,7 +1576,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb->begin(rb, txn); rb->begin(rb, txn);
iterstate = ReorderBufferIterTXNInit(rb, txn); ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{ {
Relation relation = NULL; Relation relation = NULL;
...@@ -2765,11 +2783,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change) ...@@ -2765,11 +2783,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
*/ */
static Size static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
int *fd, XLogSegNo *segno) TXNEntryFile *file, XLogSegNo *segno)
{ {
Size restored = 0; Size restored = 0;
XLogSegNo last_segno; XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter; dlist_mutable_iter cleanup_iter;
File *fd = &file->vfd;
Assert(txn->first_lsn != InvalidXLogRecPtr); Assert(txn->first_lsn != InvalidXLogRecPtr);
Assert(txn->final_lsn != InvalidXLogRecPtr); Assert(txn->final_lsn != InvalidXLogRecPtr);
...@@ -2810,7 +2829,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2810,7 +2829,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno); *segno);
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
/* No harm in resetting the offset even in case of failure */
file->curOffset = 0;
if (*fd < 0 && errno == ENOENT) if (*fd < 0 && errno == ENOENT)
{ {
*fd = -1; *fd = -1;
...@@ -2830,14 +2853,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2830,14 +2853,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file. * end of this file.
*/ */
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); readBytes = FileRead(file->vfd, rb->outbuf,
readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); sizeof(ReorderBufferDiskChange),
pgstat_report_wait_end(); file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
/* eof */ /* eof */
if (readBytes == 0) if (readBytes == 0)
{ {
CloseTransientFile(*fd); FileClose(*fd);
*fd = -1; *fd = -1;
(*segno)++; (*segno)++;
continue; continue;
...@@ -2853,16 +2876,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2853,16 +2876,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
readBytes, readBytes,
(uint32) sizeof(ReorderBufferDiskChange)))); (uint32) sizeof(ReorderBufferDiskChange))));
file->curOffset += readBytes;
ondisk = (ReorderBufferDiskChange *) rb->outbuf; ondisk = (ReorderBufferDiskChange *) rb->outbuf;
ReorderBufferSerializeReserve(rb, ReorderBufferSerializeReserve(rb,
sizeof(ReorderBufferDiskChange) + ondisk->size); sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf; ondisk = (ReorderBufferDiskChange *) rb->outbuf;
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); readBytes = FileRead(file->vfd,
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), rb->outbuf + sizeof(ReorderBufferDiskChange),
ondisk->size - sizeof(ReorderBufferDiskChange)); ondisk->size - sizeof(ReorderBufferDiskChange),
pgstat_report_wait_end(); file->curOffset,
WAIT_EVENT_REORDER_BUFFER_READ);
if (readBytes < 0) if (readBytes < 0)
ereport(ERROR, ereport(ERROR,
...@@ -2875,6 +2901,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2875,6 +2901,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
readBytes, readBytes,
(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
file->curOffset += readBytes;
/* /*
* ok, read a full change from disk, now restore it into proper * ok, read a full change from disk, now restore it into proper
* in-memory format * in-memory format
......
...@@ -7,7 +7,7 @@ use strict; ...@@ -7,7 +7,7 @@ use strict;
use warnings; use warnings;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 10; use Test::More tests => 11;
use Config; use Config;
# Initialize master node # Initialize master node
...@@ -135,5 +135,43 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'), ...@@ -135,5 +135,43 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
is($node_master->slot('otherdb_slot')->{'slot_name'}, is($node_master->slot('otherdb_slot')->{'slot_name'},
undef, 'logical slot was actually dropped with DB'); undef, 'logical slot was actually dropped with DB');
# Test to ensure that we don't run out of file descriptors even if there
# are more spill files than maxAllocatedDescs.
# Set max_files_per_process to a small value to make it more likely to run out
# of max open file descriptors.
$node_master->safe_psql('postgres',
'ALTER SYSTEM SET max_files_per_process = 26;');
$node_master->restart;
$node_master->safe_psql(
'postgres', q{
do $$
BEGIN
FOR i IN 1..10 LOOP
BEGIN
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
EXCEPTION
when division_by_zero then perform 'dummy';
END;
END LOOP;
END $$;
});
$result = $node_master->safe_psql('postgres',
qq[
set logical_decoding_work_mem to 64; -- generate plenty of .spill files
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
]);
$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
is($result, $expected, 'got expected output from spilling subxacts session');
# Reset back max_files_per_process
$node_master->safe_psql('postgres',
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
$node_master->restart;
# done with the node # done with the node
$node_master->stop; $node_master->stop;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment