Commit 808e13b2 authored by Amit Kapila's avatar Amit Kapila

Extend the BufFile interface.

Allow BufFile to support temporary files that can be used by the single
backend when the corresponding files need to be survived across the
transaction and need to be opened and closed multiple times. Such files
need to be created as a member of a SharedFileSet.

Additionally, this commit implements the interface for BufFileTruncate to
allow files to be truncated up to a particular offset and extends the
BufFileSeek API to support the SEEK_END case. This also adds an option to
provide a mode while opening the shared BufFiles instead of always opening
in read-only mode.

These enhancements in BufFile interface are required for the upcoming
patch to allow the replication apply worker, to handle streamed
in-progress transactions.

Author: Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent adc8fc61
...@@ -1202,6 +1202,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1202,6 +1202,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>BufFileWrite</literal></entry> <entry><literal>BufFileWrite</literal></entry>
<entry>Waiting for a write to a buffered file.</entry> <entry>Waiting for a write to a buffered file.</entry>
</row> </row>
<row>
<entry><literal>BufFileTruncate</literal></entry>
<entry>Waiting for a buffered file to be truncated.</entry>
</row>
<row> <row>
<entry><literal>ControlFileRead</literal></entry> <entry><literal>ControlFileRead</literal></entry>
<entry>Waiting for a read from the <filename>pg_control</filename> <entry>Waiting for a read from the <filename>pg_control</filename>
......
...@@ -3940,6 +3940,9 @@ pgstat_get_wait_io(WaitEventIO w) ...@@ -3940,6 +3940,9 @@ pgstat_get_wait_io(WaitEventIO w)
case WAIT_EVENT_BUFFILE_WRITE: case WAIT_EVENT_BUFFILE_WRITE:
event_name = "BufFileWrite"; event_name = "BufFileWrite";
break; break;
case WAIT_EVENT_BUFFILE_TRUNCATE:
event_name = "BufFileTruncate";
break;
case WAIT_EVENT_CONTROL_FILE_READ: case WAIT_EVENT_CONTROL_FILE_READ:
event_name = "ControlFileRead"; event_name = "ControlFileRead";
break; break;
......
...@@ -32,10 +32,14 @@ ...@@ -32,10 +32,14 @@
* (by opening multiple fd.c temporary files). This is an essential feature * (by opening multiple fd.c temporary files). This is an essential feature
* for sorts and hashjoins on large amounts of data. * for sorts and hashjoins on large amounts of data.
* *
* BufFile supports temporary files that can be made read-only and shared with * BufFile supports temporary files that can be shared with other backends, as
* other backends, as infrastructure for parallel execution. Such files need * infrastructure for parallel execution. Such files need to be created as a
* to be created as a member of a SharedFileSet that all participants are * member of a SharedFileSet that all participants are attached to.
* attached to. *
* BufFile also supports temporary files that can be used by the single backend
* when the corresponding files need to be survived across the transaction and
* need to be opened and closed multiple times. Such files need to be created
* as a member of a SharedFileSet.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -277,7 +281,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) ...@@ -277,7 +281,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
* backends and render it read-only. * backends and render it read-only.
*/ */
BufFile * BufFile *
BufFileOpenShared(SharedFileSet *fileset, const char *name) BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
{ {
BufFile *file; BufFile *file;
char segment_name[MAXPGPATH]; char segment_name[MAXPGPATH];
...@@ -301,7 +305,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) ...@@ -301,7 +305,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name)
} }
/* Try to load a segment. */ /* Try to load a segment. */
SharedSegmentName(segment_name, name, nfiles); SharedSegmentName(segment_name, name, nfiles);
files[nfiles] = SharedFileSetOpen(fileset, segment_name); files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode);
if (files[nfiles] <= 0) if (files[nfiles] <= 0)
break; break;
++nfiles; ++nfiles;
...@@ -321,7 +325,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) ...@@ -321,7 +325,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name)
file = makeBufFileCommon(nfiles); file = makeBufFileCommon(nfiles);
file->files = files; file->files = files;
file->readOnly = true; /* Can't write to files opened this way */ file->readOnly = (mode == O_RDONLY) ? true : false;
file->fileset = fileset; file->fileset = fileset;
file->name = pstrdup(name); file->name = pstrdup(name);
...@@ -666,11 +670,21 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence) ...@@ -666,11 +670,21 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
newFile = file->curFile; newFile = file->curFile;
newOffset = (file->curOffset + file->pos) + offset; newOffset = (file->curOffset + file->pos) + offset;
break; break;
#ifdef NOT_USED
case SEEK_END: case SEEK_END:
/* could be implemented, not needed currently */
/*
* The file size of the last file gives us the end offset of that
* file.
*/
newFile = file->numFiles - 1;
newOffset = FileSize(file->files[file->numFiles - 1]);
if (newOffset < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m",
FilePathName(file->files[file->numFiles - 1]),
file->name)));
break; break;
#endif
default: default:
elog(ERROR, "invalid whence: %d", whence); elog(ERROR, "invalid whence: %d", whence);
return EOF; return EOF;
...@@ -838,3 +852,98 @@ BufFileAppend(BufFile *target, BufFile *source) ...@@ -838,3 +852,98 @@ BufFileAppend(BufFile *target, BufFile *source)
return startBlock; return startBlock;
} }
/*
* Truncate a BufFile created by BufFileCreateShared up to the given fileno and
* the offset.
*/
void
BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
{
int numFiles = file->numFiles;
int newFile = fileno;
off_t newOffset = file->curOffset;
char segment_name[MAXPGPATH];
int i;
/*
* Loop over all the files up to the given fileno and remove the files
* that are greater than the fileno and truncate the given file up to the
* offset. Note that we also remove the given fileno if the offset is 0
* provided it is not the first file in which we truncate it.
*/
for (i = file->numFiles - 1; i >= fileno; i--)
{
if ((i != fileno || offset == 0) && i != 0)
{
SharedSegmentName(segment_name, file->name, i);
FileClose(file->files[i]);
if (!SharedFileSetDelete(file->fileset, segment_name, true))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not delete shared fileset \"%s\": %m",
segment_name)));
numFiles--;
newOffset = MAX_PHYSICAL_FILESIZE;
/*
* This is required to indicate that we have deleted the given
* fileno.
*/
if (i == fileno)
newFile--;
}
else
{
if (FileTruncate(file->files[i], offset,
WAIT_EVENT_BUFFILE_TRUNCATE) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not truncate file \"%s\": %m",
FilePathName(file->files[i]))));
newOffset = offset;
}
}
file->numFiles = numFiles;
/*
* If the truncate point is within existing buffer then we can just adjust
* pos within buffer.
*/
if (newFile == file->curFile &&
newOffset >= file->curOffset &&
newOffset <= file->curOffset + file->nbytes)
{
/* No need to reset the current pos if the new pos is greater. */
if (newOffset <= file->curOffset + file->pos)
file->pos = (int) (newOffset - file->curOffset);
/* Adjust the nbytes for the current buffer. */
file->nbytes = (int) (newOffset - file->curOffset);
}
else if (newFile == file->curFile &&
newOffset < file->curOffset)
{
/*
* The truncate point is within the existing file but prior to the
* current position, so we can forget the current buffer and reset the
* current position.
*/
file->curOffset = newOffset;
file->pos = 0;
file->nbytes = 0;
}
else if (newFile < file->curFile)
{
/*
* The truncate point is prior to the current file, so need to reset
* the current position accordingly.
*/
file->curFile = newFile;
file->curOffset = newOffset;
file->pos = 0;
file->nbytes = 0;
}
/* Nothing to do, if the truncate point is beyond current file. */
}
...@@ -1743,18 +1743,17 @@ PathNameCreateTemporaryFile(const char *path, bool error_on_failure) ...@@ -1743,18 +1743,17 @@ PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
/* /*
* Open a file that was created with PathNameCreateTemporaryFile, possibly in * Open a file that was created with PathNameCreateTemporaryFile, possibly in
* another backend. Files opened this way don't count against the * another backend. Files opened this way don't count against the
* temp_file_limit of the caller, are read-only and are automatically closed * temp_file_limit of the caller, are automatically closed at the end of the
* at the end of the transaction but are not deleted on close. * transaction but are not deleted on close.
*/ */
File File
PathNameOpenTemporaryFile(const char *path) PathNameOpenTemporaryFile(const char *path, int mode)
{ {
File file; File file;
ResourceOwnerEnlargeFiles(CurrentResourceOwner); ResourceOwnerEnlargeFiles(CurrentResourceOwner);
/* We open the file read-only. */ file = PathNameOpenFile(path, mode | PG_BINARY);
file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
/* If no such file, then we don't raise an error. */ /* If no such file, then we don't raise an error. */
if (file <= 0 && errno != ENOENT) if (file <= 0 && errno != ENOENT)
......
...@@ -13,6 +13,10 @@ ...@@ -13,6 +13,10 @@
* files can be discovered by name, and a shared ownership semantics so that * files can be discovered by name, and a shared ownership semantics so that
* shared files survive until the last user detaches. * shared files survive until the last user detaches.
* *
* SharedFileSets can be used by backends when the temporary files need to be
* opened/closed multiple times and the underlying files need to survive across
* transactions.
*
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -25,25 +29,36 @@ ...@@ -25,25 +29,36 @@
#include "common/hashfn.h" #include "common/hashfn.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/dsm.h" #include "storage/dsm.h"
#include "storage/ipc.h"
#include "storage/sharedfileset.h" #include "storage/sharedfileset.h"
#include "utils/builtins.h" #include "utils/builtins.h"
static List *filesetlist = NIL;
static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum); static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum);
static void SharedFileSetDeleteOnProcExit(int status, Datum arg);
static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace); static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace);
static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name); static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name);
static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name);
/* /*
* Initialize a space for temporary files that can be opened for read-only * Initialize a space for temporary files that can be opened by other backends.
* access by other backends. Other backends must attach to it before * Other backends must attach to it before accessing it. Associate this
* accessing it. Associate this SharedFileSet with 'seg'. Any contained * SharedFileSet with 'seg'. Any contained files will be deleted when the
* files will be deleted when the last backend detaches. * last backend detaches.
*
* We can also use this interface if the temporary files are used only by
* single backend but the files need to be opened and closed multiple times
* and also the underlying files need to survive across transactions. For
* such cases, dsm segment 'seg' should be passed as NULL. Callers are
* expected to explicitly remove such files by using SharedFileSetDelete/
* SharedFileSetDeleteAll or we remove such files on proc exit.
* *
* Files will be distributed over the tablespaces configured in * Files will be distributed over the tablespaces configured in
* temp_tablespaces. * temp_tablespaces.
* *
* Under the covers the set is one or more directories which will eventually * Under the covers the set is one or more directories which will eventually
* be deleted when there are no backends attached. * be deleted.
*/ */
void void
SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
...@@ -84,7 +99,25 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) ...@@ -84,7 +99,25 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
} }
/* Register our cleanup callback. */ /* Register our cleanup callback. */
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); if (seg)
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
else
{
static bool registered_cleanup = false;
if (!registered_cleanup)
{
/*
* We must not have registered any fileset before registering the
* fileset clean up.
*/
Assert(filesetlist == NIL);
on_proc_exit(SharedFileSetDeleteOnProcExit, 0);
registered_cleanup = true;
}
filesetlist = lcons((void *) fileset, filesetlist);
}
} }
/* /*
...@@ -147,13 +180,13 @@ SharedFileSetCreate(SharedFileSet *fileset, const char *name) ...@@ -147,13 +180,13 @@ SharedFileSetCreate(SharedFileSet *fileset, const char *name)
* another backend. * another backend.
*/ */
File File
SharedFileSetOpen(SharedFileSet *fileset, const char *name) SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
File file; File file;
SharedFilePath(path, fileset, name); SharedFilePath(path, fileset, name);
file = PathNameOpenTemporaryFile(path); file = PathNameOpenTemporaryFile(path, mode);
return file; return file;
} }
...@@ -192,6 +225,9 @@ SharedFileSetDeleteAll(SharedFileSet *fileset) ...@@ -192,6 +225,9 @@ SharedFileSetDeleteAll(SharedFileSet *fileset)
SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]); SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]);
PathNameDeleteTemporaryDir(dirpath); PathNameDeleteTemporaryDir(dirpath);
} }
/* Unregister the shared fileset */
SharedFileSetUnregister(fileset);
} }
/* /*
...@@ -222,6 +258,59 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum) ...@@ -222,6 +258,59 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum)
SharedFileSetDeleteAll(fileset); SharedFileSetDeleteAll(fileset);
} }
/*
* Callback function that will be invoked on the process exit. This will
* process the list of all the registered sharedfilesets and delete the
* underlying files.
*/
static void
SharedFileSetDeleteOnProcExit(int status, Datum arg)
{
ListCell *l;
/* Loop over all the pending shared fileset entry */
foreach(l, filesetlist)
{
SharedFileSet *fileset = (SharedFileSet *) lfirst(l);
SharedFileSetDeleteAll(fileset);
}
filesetlist = NIL;
}
/*
* Unregister the shared fileset entry registered for cleanup on proc exit.
*/
void
SharedFileSetUnregister(SharedFileSet *input_fileset)
{
bool found = false;
ListCell *l;
/*
* If the caller is following the dsm based cleanup then we don't maintain
* the filesetlist so return.
*/
if (filesetlist == NIL)
return;
foreach(l, filesetlist)
{
SharedFileSet *fileset = (SharedFileSet *) lfirst(l);
/* Remove the entry from the list */
if (input_fileset == fileset)
{
filesetlist = list_delete_cell(filesetlist, l);
found = true;
break;
}
}
Assert(found);
}
/* /*
* Build the path for the directory holding the files backing a SharedFileSet * Build the path for the directory holding the files backing a SharedFileSet
* in a given tablespace. * in a given tablespace.
......
...@@ -78,6 +78,8 @@ ...@@ -78,6 +78,8 @@
#include "postgres.h" #include "postgres.h"
#include <fcntl.h>
#include "storage/buffile.h" #include "storage/buffile.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/logtape.h" #include "utils/logtape.h"
...@@ -551,7 +553,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, ...@@ -551,7 +553,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
lt = &lts->tapes[i]; lt = &lts->tapes[i];
pg_itoa(i, filename); pg_itoa(i, filename);
file = BufFileOpenShared(fileset, filename); file = BufFileOpenShared(fileset, filename, O_RDONLY);
filesize = BufFileSize(file); filesize = BufFileSize(file);
/* /*
......
...@@ -559,7 +559,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) ...@@ -559,7 +559,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
sts_filename(name, accessor, accessor->read_participant); sts_filename(name, accessor, accessor->read_participant);
accessor->read_file = accessor->read_file =
BufFileOpenShared(accessor->fileset, name); BufFileOpenShared(accessor->fileset, name, O_RDONLY);
} }
/* Seek and load the chunk header. */ /* Seek and load the chunk header. */
......
...@@ -916,6 +916,7 @@ typedef enum ...@@ -916,6 +916,7 @@ typedef enum
WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO, WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO,
WAIT_EVENT_BUFFILE_READ, WAIT_EVENT_BUFFILE_READ,
WAIT_EVENT_BUFFILE_WRITE, WAIT_EVENT_BUFFILE_WRITE,
WAIT_EVENT_BUFFILE_TRUNCATE,
WAIT_EVENT_CONTROL_FILE_READ, WAIT_EVENT_CONTROL_FILE_READ,
WAIT_EVENT_CONTROL_FILE_SYNC, WAIT_EVENT_CONTROL_FILE_SYNC,
WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE, WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE,
......
...@@ -48,7 +48,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); ...@@ -48,7 +48,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
extern void BufFileExportShared(BufFile *file); extern void BufFileExportShared(BufFile *file);
extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name,
int mode);
extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
#endif /* BUFFILE_H */ #endif /* BUFFILE_H */
...@@ -94,7 +94,7 @@ extern mode_t FileGetRawMode(File file); ...@@ -94,7 +94,7 @@ extern mode_t FileGetRawMode(File file);
/* Operations used for sharing named temporary files */ /* Operations used for sharing named temporary files */
extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
extern File PathNameOpenTemporaryFile(const char *name); extern File PathNameOpenTemporaryFile(const char *path, int mode);
extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure); extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure);
extern void PathNameCreateTemporaryDir(const char *base, const char *name); extern void PathNameCreateTemporaryDir(const char *base, const char *name);
extern void PathNameDeleteTemporaryDir(const char *name); extern void PathNameDeleteTemporaryDir(const char *name);
......
...@@ -37,9 +37,11 @@ typedef struct SharedFileSet ...@@ -37,9 +37,11 @@ typedef struct SharedFileSet
extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg); extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg);
extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg); extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg);
extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name); extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name);
extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name); extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name,
int mode);
extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name, extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name,
bool error_on_failure); bool error_on_failure);
extern void SharedFileSetDeleteAll(SharedFileSet *fileset); extern void SharedFileSetDeleteAll(SharedFileSet *fileset);
extern void SharedFileSetUnregister(SharedFileSet *input_fileset);
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment