Commit 007556bf authored by Tom Lane's avatar Tom Lane

Remove fixed limit on the number of concurrent AllocateFile() requests.

AllocateFile(), AllocateDir(), and some sister routines share a small array
for remembering requests, so that the files can be closed on transaction
failure.  Previously that array had a fixed size, MAX_ALLOCATED_DESCS (32).
While historically that had seemed sufficient, Steve Toutant pointed out
that this meant you couldn't scan more than 32 file_fdw foreign tables in
one query, because file_fdw depends on the COPY code which uses
AllocateFile().  There are probably other cases, or will be in the future,
where this nonconfigurable limit impedes users.

We can't completely remove any such limit, at least not without a lot of
work, since each such request requires a kernel file descriptor and most
platforms limit the number we can have.  (In principle we could
"virtualize" these descriptors, as fd.c already does for the main VFD pool,
but not without an additional layer of overhead and a lot of notational
impact on the calling code.)  But we can at least let the array size be
configurable.  Hence, change the code to allow up to max_safe_fds/2
allocated file requests.  On modern platforms this should allow several
hundred concurrent file_fdw scans, or more if one increases the value of
max_files_per_process.  To go much further than that, we'd need to do some
more work on the data structure, since the current code for closing
requests has potentially O(N^2) runtime; but it should still be all right
for request counts in this range.

Back-patch to 9.1 where contrib/file_fdw was introduced.
parent d535136b
...@@ -43,8 +43,8 @@ ...@@ -43,8 +43,8 @@
* wrappers around fopen(3), opendir(3), popen(3) and open(2), respectively. * wrappers around fopen(3), opendir(3), popen(3) and open(2), respectively.
* They behave like the corresponding native functions, except that the handle * They behave like the corresponding native functions, except that the handle
* is registered with the current subtransaction, and will be automatically * is registered with the current subtransaction, and will be automatically
* closed at abort. These are intended for short operations like reading a * closed at abort. These are intended mainly for short operations like
* configuration file, and there is a fixed limit on the number of files that * reading a configuration file; there is a limit on the number of files that
* can be opened using these functions at any one time. * can be opened using these functions at any one time.
* *
* Finally, BasicOpenFile is just a thin wrapper around open() that can * Finally, BasicOpenFile is just a thin wrapper around open() that can
...@@ -198,13 +198,7 @@ static uint64 temporary_files_size = 0; ...@@ -198,13 +198,7 @@ static uint64 temporary_files_size = 0;
/* /*
* List of OS handles opened with AllocateFile, AllocateDir and * List of OS handles opened with AllocateFile, AllocateDir and
* OpenTransientFile. * OpenTransientFile.
*
* Since we don't want to encourage heavy use of those functions,
* it seems OK to put a pretty small maximum limit on the number of
* simultaneously allocated descs.
*/ */
#define MAX_ALLOCATED_DESCS 32
typedef enum typedef enum
{ {
AllocateDescFile, AllocateDescFile,
...@@ -216,17 +210,18 @@ typedef enum ...@@ -216,17 +210,18 @@ typedef enum
typedef struct typedef struct
{ {
AllocateDescKind kind; AllocateDescKind kind;
SubTransactionId create_subid;
union union
{ {
FILE *file; FILE *file;
DIR *dir; DIR *dir;
int fd; int fd;
} desc; } desc;
SubTransactionId create_subid;
} AllocateDesc; } AllocateDesc;
static int numAllocatedDescs = 0; static int numAllocatedDescs = 0;
static AllocateDesc allocatedDescs[MAX_ALLOCATED_DESCS]; static int maxAllocatedDescs = 0;
static AllocateDesc *allocatedDescs = NULL;
/* /*
* Number of temporary files opened during the current session; * Number of temporary files opened during the current session;
...@@ -252,6 +247,7 @@ static int nextTempTableSpace = 0; ...@@ -252,6 +247,7 @@ static int nextTempTableSpace = 0;
* Insert - put a file at the front of the Lru ring * Insert - put a file at the front of the Lru ring
* LruInsert - put a file at the front of the Lru ring and open it * LruInsert - put a file at the front of the Lru ring and open it
* ReleaseLruFile - Release an fd by closing the last entry in the Lru ring * ReleaseLruFile - Release an fd by closing the last entry in the Lru ring
* ReleaseLruFiles - Release fd(s) until we're under the max_safe_fds limit
* AllocateVfd - grab a free (or new) file record (from VfdArray) * AllocateVfd - grab a free (or new) file record (from VfdArray)
* FreeVfd - free a file record * FreeVfd - free a file record
* *
...@@ -279,11 +275,14 @@ static void LruDelete(File file); ...@@ -279,11 +275,14 @@ static void LruDelete(File file);
static void Insert(File file); static void Insert(File file);
static int LruInsert(File file); static int LruInsert(File file);
static bool ReleaseLruFile(void); static bool ReleaseLruFile(void);
static void ReleaseLruFiles(void);
static File AllocateVfd(void); static File AllocateVfd(void);
static void FreeVfd(File file); static void FreeVfd(File file);
static int FileAccess(File file); static int FileAccess(File file);
static File OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError); static File OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError);
static bool reserveAllocatedDesc(void);
static int FreeDesc(AllocateDesc *desc);
static void AtProcExit_Files(int code, Datum arg); static void AtProcExit_Files(int code, Datum arg);
static void CleanupTempFiles(bool isProcExit); static void CleanupTempFiles(bool isProcExit);
static void RemovePgTempFilesInDir(const char *tmpdirname); static void RemovePgTempFilesInDir(const char *tmpdirname);
...@@ -693,11 +692,8 @@ LruInsert(File file) ...@@ -693,11 +692,8 @@ LruInsert(File file)
if (FileIsNotOpen(file)) if (FileIsNotOpen(file))
{ {
while (nfile + numAllocatedDescs >= max_safe_fds) /* Close excess kernel FDs. */
{ ReleaseLruFiles();
if (!ReleaseLruFile())
break;
}
/* /*
* The open could still fail for lack of file descriptors, eg due to * The open could still fail for lack of file descriptors, eg due to
...@@ -736,6 +732,9 @@ LruInsert(File file) ...@@ -736,6 +732,9 @@ LruInsert(File file)
return 0; return 0;
} }
/*
* Release one kernel FD by closing the least-recently-used VFD.
*/
static bool static bool
ReleaseLruFile(void) ReleaseLruFile(void)
{ {
...@@ -754,6 +753,20 @@ ReleaseLruFile(void) ...@@ -754,6 +753,20 @@ ReleaseLruFile(void)
return false; /* no files available to free */ return false; /* no files available to free */
} }
/*
* Release kernel FDs as needed to get under the max_safe_fds limit.
* After calling this, it's OK to try to open another file.
*/
static void
ReleaseLruFiles(void)
{
while (nfile + numAllocatedDescs >= max_safe_fds)
{
if (!ReleaseLruFile())
break;
}
}
static File static File
AllocateVfd(void) AllocateVfd(void)
{ {
...@@ -907,11 +920,8 @@ PathNameOpenFile(FileName fileName, int fileFlags, int fileMode) ...@@ -907,11 +920,8 @@ PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
file = AllocateVfd(); file = AllocateVfd();
vfdP = &VfdCache[file]; vfdP = &VfdCache[file];
while (nfile + numAllocatedDescs >= max_safe_fds) /* Close excess kernel FDs. */
{ ReleaseLruFiles();
if (!ReleaseLruFile())
break;
}
vfdP->fd = BasicOpenFile(fileName, fileFlags, fileMode); vfdP->fd = BasicOpenFile(fileName, fileFlags, fileMode);
...@@ -1490,6 +1500,66 @@ FilePathName(File file) ...@@ -1490,6 +1500,66 @@ FilePathName(File file)
} }
/*
* Make room for another allocatedDescs[] array entry if needed and possible.
* Returns true if an array element is available.
*/
static bool
reserveAllocatedDesc(void)
{
AllocateDesc *newDescs;
int newMax;
/* Quick out if array already has a free slot. */
if (numAllocatedDescs < maxAllocatedDescs)
return true;
/*
* If the array hasn't yet been created in the current process, initialize
* it with FD_MINFREE / 2 elements. In many scenarios this is as many as
* we will ever need, anyway. We don't want to look at max_safe_fds
* immediately because set_max_safe_fds() may not have run yet.
*/
if (allocatedDescs == NULL)
{
newMax = FD_MINFREE / 2;
newDescs = (AllocateDesc *) malloc(newMax * sizeof(AllocateDesc));
/* Out of memory already? Treat as fatal error. */
if (newDescs == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
allocatedDescs = newDescs;
maxAllocatedDescs = newMax;
return true;
}
/*
* Consider enlarging the array beyond the initial allocation used above.
* By the time this happens, max_safe_fds should be known accurately.
*
* We mustn't let allocated descriptors hog all the available FDs, and in
* practice we'd better leave a reasonable number of FDs for VFD use. So
* set the maximum to max_safe_fds / 2. (This should certainly be at
* least as large as the initial size, FD_MINFREE / 2.)
*/
newMax = max_safe_fds / 2;
if (newMax > maxAllocatedDescs)
{
newDescs = (AllocateDesc *) realloc(allocatedDescs,
newMax * sizeof(AllocateDesc));
/* Treat out-of-memory as a non-fatal error. */
if (newDescs == NULL)
return false;
allocatedDescs = newDescs;
maxAllocatedDescs = newMax;
return true;
}
/* Can't enlarge allocatedDescs[] any more. */
return false;
}
/* /*
* Routines that want to use stdio (ie, FILE*) should use AllocateFile * Routines that want to use stdio (ie, FILE*) should use AllocateFile
* rather than plain fopen(). This lets fd.c deal with freeing FDs if * rather than plain fopen(). This lets fd.c deal with freeing FDs if
...@@ -1515,16 +1585,15 @@ AllocateFile(const char *name, const char *mode) ...@@ -1515,16 +1585,15 @@ AllocateFile(const char *name, const char *mode)
DO_DB(elog(LOG, "AllocateFile: Allocated %d (%s)", DO_DB(elog(LOG, "AllocateFile: Allocated %d (%s)",
numAllocatedDescs, name)); numAllocatedDescs, name));
/* /* Can we allocate another non-virtual FD? */
* The test against MAX_ALLOCATED_DESCS prevents us from overflowing if (!reserveAllocatedDesc())
* allocatedFiles[]; the test against max_safe_fds prevents AllocateFile ereport(ERROR,
* from hogging every one of the available FDs, which'd lead to infinite (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
* looping. errmsg("exceeded maxAllocatedDescs (%d) while trying to open file \"%s\"",
*/ maxAllocatedDescs, name)));
if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
numAllocatedDescs >= max_safe_fds - 1) /* Close excess kernel FDs. */
elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to open file \"%s\"", ReleaseLruFiles();
name);
TryAgain: TryAgain:
if ((file = fopen(name, mode)) != NULL) if ((file = fopen(name, mode)) != NULL)
...@@ -1566,16 +1635,15 @@ OpenTransientFile(FileName fileName, int fileFlags, int fileMode) ...@@ -1566,16 +1635,15 @@ OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
DO_DB(elog(LOG, "OpenTransientFile: Allocated %d (%s)", DO_DB(elog(LOG, "OpenTransientFile: Allocated %d (%s)",
numAllocatedDescs, fileName)); numAllocatedDescs, fileName));
/* /* Can we allocate another non-virtual FD? */
* The test against MAX_ALLOCATED_DESCS prevents us from overflowing if (!reserveAllocatedDesc())
* allocatedFiles[]; the test against max_safe_fds prevents BasicOpenFile ereport(ERROR,
* from hogging every one of the available FDs, which'd lead to infinite (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
* looping. errmsg("exceeded maxAllocatedDescs (%d) while trying to open file \"%s\"",
*/ maxAllocatedDescs, fileName)));
if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
numAllocatedDescs >= max_safe_fds - 1) /* Close excess kernel FDs. */
elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to open file \"%s\"", ReleaseLruFiles();
fileName);
fd = BasicOpenFile(fileName, fileFlags, fileMode); fd = BasicOpenFile(fileName, fileFlags, fileMode);
...@@ -1607,16 +1675,15 @@ OpenPipeStream(const char *command, const char *mode) ...@@ -1607,16 +1675,15 @@ OpenPipeStream(const char *command, const char *mode)
DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)", DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
numAllocatedDescs, command)); numAllocatedDescs, command));
/* /* Can we allocate another non-virtual FD? */
* The test against MAX_ALLOCATED_DESCS prevents us from overflowing if (!reserveAllocatedDesc())
* allocatedFiles[]; the test against max_safe_fds prevents AllocateFile ereport(ERROR,
* from hogging every one of the available FDs, which'd lead to infinite (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
* looping. errmsg("exceeded maxAllocatedDescs (%d) while trying to execute command \"%s\"",
*/ maxAllocatedDescs, command)));
if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
numAllocatedDescs >= max_safe_fds - 1) /* Close excess kernel FDs. */
elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"", ReleaseLruFiles();
command);
TryAgain: TryAgain:
fflush(stdout); fflush(stdout);
...@@ -1759,16 +1826,15 @@ AllocateDir(const char *dirname) ...@@ -1759,16 +1826,15 @@ AllocateDir(const char *dirname)
DO_DB(elog(LOG, "AllocateDir: Allocated %d (%s)", DO_DB(elog(LOG, "AllocateDir: Allocated %d (%s)",
numAllocatedDescs, dirname)); numAllocatedDescs, dirname));
/* /* Can we allocate another non-virtual FD? */
* The test against MAX_ALLOCATED_DESCS prevents us from overflowing if (!reserveAllocatedDesc())
* allocatedDescs[]; the test against max_safe_fds prevents AllocateDir ereport(ERROR,
* from hogging every one of the available FDs, which'd lead to infinite (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
* looping. errmsg("exceeded maxAllocatedDescs (%d) while trying to open directory \"%s\"",
*/ maxAllocatedDescs, dirname)));
if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
numAllocatedDescs >= max_safe_fds - 1) /* Close excess kernel FDs. */
elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to open directory \"%s\"", ReleaseLruFiles();
dirname);
TryAgain: TryAgain:
if ((dir = opendir(dirname)) != NULL) if ((dir = opendir(dirname)) != NULL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment