Commit dc6c4c9d authored by Andres Freund's avatar Andres Freund

Add infrastructure for sharing temporary files between backends.

SharedFileSet allows temporary files to be created by one backend and
then exported for read-only access by other backends, with clean-up
managed by reference counting associated with a DSM segment.  This
includes changes to fd.c and buffile.c to support the new kind of
temporary file.

This will be used by an upcoming patch adding support for parallel
hash joins.

Author: Thomas Munro
Reviewed-By: Peter Geoghegan, Andres Freund, Robert Haas, Rushabh Lathia
Discussion:
    https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
    https://postgr.es/m/CAH2-WznJ_UgLux=_jTgCQ4yFz0iBntudsNKa1we3kN1BAG=88w@mail.gmail.com
parent 35438e57
......@@ -12,6 +12,6 @@ subdir = src/backend/storage/file
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = fd.o buffile.o copydir.o reinit.o
OBJS = fd.o buffile.o copydir.o reinit.o sharedfileset.o
include $(top_srcdir)/src/backend/common.mk
......@@ -31,12 +31,18 @@
* BufFile also supports temporary files that exceed the OS file size limit
* (by opening multiple fd.c temporary files). This is an essential feature
* for sorts and hashjoins on large amounts of data.
*
* BufFile supports temporary files that can be made read-only and shared with
* other backends, as infrastructure for parallel execution. Such files need
* to be created as a member of a SharedFileSet that all participants are
* attached to.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "executor/instrument.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/buffile.h"
......@@ -70,6 +76,10 @@ struct BufFile
bool isInterXact; /* keep open over transactions? */
bool dirty; /* does buffer need to be written? */
bool readOnly; /* has the file been set to read only? */
SharedFileSet *fileset; /* space for segment files if shared */
const char *name; /* name of this BufFile if shared */
/*
* resowner is the ResourceOwner to use for underlying temp files. (We
......@@ -94,6 +104,7 @@ static void extendBufFile(BufFile *file);
static void BufFileLoadBuffer(BufFile *file);
static void BufFileDumpBuffer(BufFile *file);
static int BufFileFlush(BufFile *file);
static File MakeNewSharedSegment(BufFile *file, int segment);
/*
......@@ -117,6 +128,9 @@ makeBufFile(File firstfile)
file->curOffset = 0L;
file->pos = 0;
file->nbytes = 0;
file->readOnly = false;
file->fileset = NULL;
file->name = NULL;
return file;
}
......@@ -134,7 +148,11 @@ extendBufFile(BufFile *file)
oldowner = CurrentResourceOwner;
CurrentResourceOwner = file->resowner;
pfile = OpenTemporaryFile(file->isInterXact);
if (file->fileset == NULL)
pfile = OpenTemporaryFile(file->isInterXact);
else
pfile = MakeNewSharedSegment(file, file->numFiles);
Assert(pfile >= 0);
CurrentResourceOwner = oldowner;
......@@ -175,6 +193,189 @@ BufFileCreateTemp(bool interXact)
return file;
}
/*
* Build the name for a given segment of a given BufFile.
*/
static void
SharedSegmentName(char *name, const char *buffile_name, int segment)
{
snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
}
/*
* Create a new segment file backing a shared BufFile.
*/
static File
MakeNewSharedSegment(BufFile *buffile, int segment)
{
char name[MAXPGPATH];
File file;
SharedSegmentName(name, buffile->name, segment);
file = SharedFileSetCreate(buffile->fileset, name);
/* SharedFileSetCreate would've errored out */
Assert(file > 0);
return file;
}
/*
* Create a BufFile that can be discovered and opened read-only by other
* backends that are attached to the same SharedFileSet using the same name.
*
* The naming scheme for shared BufFiles is left up to the calling code. The
* name will appear as part of one or more filenames on disk, and might
* provide clues to administrators about which subsystem is generating
* temporary file data. Since each SharedFileSet object is backed by one or
* more uniquely named temporary directory, names don't conflict with
* unrelated SharedFileSet objects.
*/
BufFile *
BufFileCreateShared(SharedFileSet *fileset, const char *name)
{
BufFile *file;
file = (BufFile *) palloc(sizeof(BufFile));
file->fileset = fileset;
file->name = pstrdup(name);
file->numFiles = 1;
file->files = (File *) palloc(sizeof(File));
file->files[0] = MakeNewSharedSegment(file, 0);
file->offsets = (off_t *) palloc(sizeof(off_t));
file->offsets[0] = 0L;
file->isInterXact = false;
file->dirty = false;
file->resowner = CurrentResourceOwner;
file->curFile = 0;
file->curOffset = 0L;
file->pos = 0;
file->nbytes = 0;
file->readOnly = false;
file->name = pstrdup(name);
return file;
}
/*
* Open a file that was previously created in another backend (or this one)
* with BufFileCreateShared in the same SharedFileSet using the same name.
* The backend that created the file must have called BufFileClose() or
* BufFileExport() to make sure that it is ready to be opened by other
* backends and render it read-only.
*/
BufFile *
BufFileOpenShared(SharedFileSet *fileset, const char *name)
{
BufFile *file = (BufFile *) palloc(sizeof(BufFile));
char segment_name[MAXPGPATH];
Size capacity = 16;
File *files = palloc(sizeof(File) * capacity);
int nfiles = 0;
file = (BufFile *) palloc(sizeof(BufFile));
files = palloc(sizeof(File) * capacity);
/*
* We don't know how many segments there are, so we'll probe the
* filesystem to find out.
*/
for (;;)
{
/* See if we need to expand our file segment array. */
if (nfiles + 1 > capacity)
{
capacity *= 2;
files = repalloc(files, sizeof(File) * capacity);
}
/* Try to load a segment. */
SharedSegmentName(segment_name, name, nfiles);
files[nfiles] = SharedFileSetOpen(fileset, segment_name);
if (files[nfiles] <= 0)
break;
++nfiles;
CHECK_FOR_INTERRUPTS();
}
/*
* If we didn't find any files at all, then no BufFile exists with this
* name.
*/
if (nfiles == 0)
return NULL;
file->numFiles = nfiles;
file->files = files;
file->offsets = (off_t *) palloc0(sizeof(off_t) * nfiles);
file->isInterXact = false;
file->dirty = false;
file->resowner = CurrentResourceOwner; /* Unused, can't extend */
file->curFile = 0;
file->curOffset = 0L;
file->pos = 0;
file->nbytes = 0;
file->readOnly = true; /* Can't write to files opened this way */
file->fileset = fileset;
file->name = pstrdup(name);
return file;
}
/*
* Delete a BufFile that was created by BufFileCreateShared in the given
* SharedFileSet using the given name.
*
* It is not necessary to delete files explicitly with this function. It is
* provided only as a way to delete files proactively, rather than waiting for
* the SharedFileSet to be cleaned up.
*
* Only one backend should attempt to delete a given name, and should know
* that it exists and has been exported or closed.
*/
void
BufFileDeleteShared(SharedFileSet *fileset, const char *name)
{
char segment_name[MAXPGPATH];
int segment = 0;
bool found = false;
/*
* We don't know how many segments the file has. We'll keep deleting
* until we run out. If we don't manage to find even an initial segment,
* raise an error.
*/
for (;;)
{
SharedSegmentName(segment_name, name, segment);
if (!SharedFileSetDelete(fileset, segment_name, true))
break;
found = true;
++segment;
CHECK_FOR_INTERRUPTS();
}
if (!found)
elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name);
}
/*
* BufFileExportShared --- flush and make read-only, in preparation for sharing.
*/
void
BufFileExportShared(BufFile *file)
{
/* Must be a file belonging to a SharedFileSet. */
Assert(file->fileset != NULL);
/* It's probably a bug if someone calls this twice. */
Assert(!file->readOnly);
BufFileFlush(file);
file->readOnly = true;
}
/*
* Close a BufFile
*
......@@ -390,6 +591,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
size_t nwritten = 0;
size_t nthistime;
Assert(!file->readOnly);
while (size > 0)
{
if (file->pos >= BLCKSZ)
......
This diff is collapsed.
/*-------------------------------------------------------------------------
*
* sharedfileset.c
* Shared temporary file management.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/file/sharedfileset.c
*
* SharefFileSets provide a temporary namespace (think directory) so that
* files can be discovered by name, and a shared ownership semantics so that
* shared files survive until the last user detaches.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/hash.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "storage/dsm.h"
#include "storage/sharedfileset.h"
#include "utils/builtins.h"
static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum);
static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace);
static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name);
static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name);
/*
* Initialize a space for temporary files that can be opened for read-only
* access by other backends. Other backends must attach to it before
* accessing it. Associate this SharedFileSet with 'seg'. Any contained
* files will be deleted when the last backend detaches.
*
* Files will be distributed over the tablespaces configured in
* temp_tablespaces.
*
* Under the covers the set is one or more directories which will eventually
* be deleted when there are no backends attached.
*/
void
SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
{
static uint32 counter = 0;
SpinLockInit(&fileset->mutex);
fileset->refcnt = 1;
fileset->creator_pid = MyProcPid;
fileset->number = counter;
counter = (counter + 1) % INT_MAX;
/* Capture the tablespace OIDs so that all backends agree on them. */
PrepareTempTablespaces();
fileset->ntablespaces =
GetTempTablespaces(&fileset->tablespaces[0],
lengthof(fileset->tablespaces));
if (fileset->ntablespaces == 0)
{
fileset->tablespaces[0] = DEFAULTTABLESPACE_OID;
fileset->ntablespaces = 1;
}
/* Register our cleanup callback. */
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
}
/*
* Attach to a set of directories that was created with SharedFileSetInit.
*/
void
SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
{
bool success;
SpinLockAcquire(&fileset->mutex);
if (fileset->refcnt == 0)
success = false;
else
{
++fileset->refcnt;
success = true;
}
SpinLockRelease(&fileset->mutex);
if (!success)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not attach to a SharedFileSet that is already destroyed")));
/* Register our cleanup callback. */
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
}
/*
* Create a new file in the given set.
*/
File
SharedFileSetCreate(SharedFileSet *fileset, const char *name)
{
char path[MAXPGPATH];
File file;
SharedFilePath(path, fileset, name);
file = PathNameCreateTemporaryFile(path, false);
/* If we failed, see if we need to create the directory on demand. */
if (file <= 0)
{
char tempdirpath[MAXPGPATH];
char filesetpath[MAXPGPATH];
Oid tablespace = ChooseTablespace(fileset, name);
TempTablespacePath(tempdirpath, tablespace);
SharedFileSetPath(filesetpath, fileset, tablespace);
PathNameCreateTemporaryDir(tempdirpath, filesetpath);
file = PathNameCreateTemporaryFile(path, true);
}
return file;
}
/*
* Open a file that was created with SharedFileSetCreate(), possibly in
* another backend.
*/
File
SharedFileSetOpen(SharedFileSet *fileset, const char *name)
{
char path[MAXPGPATH];
File file;
SharedFilePath(path, fileset, name);
file = PathNameOpenTemporaryFile(path);
return file;
}
/*
* Delete a file that was created with PathNameCreateShared().
* Return true if the file existed, false if didn't.
*/
bool
SharedFileSetDelete(SharedFileSet *fileset, const char *name,
bool error_on_failure)
{
char path[MAXPGPATH];
SharedFilePath(path, fileset, name);
return PathNameDeleteTemporaryFile(path, error_on_failure);
}
/*
* Delete all files in the set.
*/
void
SharedFileSetDeleteAll(SharedFileSet *fileset)
{
char dirpath[MAXPGPATH];
int i;
/*
* Delete the directory we created in each tablespace. Doesn't fail
* because we use this in error cleanup paths, but can generate LOG
* message on IO error.
*/
for (i = 0; i < fileset->ntablespaces; ++i)
{
SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]);
PathNameDeleteTemporaryDir(dirpath);
}
}
/*
* Callback function that will be invoked when this backend detaches from a
* DSM segment holding a SharedFileSet that it has created or attached to. If
* we are the last to detach, then try to remove the directories and
* everything in them. We can't raise an error on failures, because this runs
* in error cleanup paths.
*/
static void
SharedFileSetOnDetach(dsm_segment *segment, Datum datum)
{
bool unlink_all = false;
SharedFileSet *fileset = (SharedFileSet *) DatumGetPointer(datum);
SpinLockAcquire(&fileset->mutex);
Assert(fileset->refcnt > 0);
if (--fileset->refcnt == 0)
unlink_all = true;
SpinLockRelease(&fileset->mutex);
/*
* If we are the last to detach, we delete the directory in all
* tablespaces. Note that we are still actually attached for the rest of
* this function so we can safely access its data.
*/
if (unlink_all)
SharedFileSetDeleteAll(fileset);
}
/*
* Build the path for the directory holding the files backing a SharedFileSet
* in a given tablespace.
*/
static void
SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
{
char tempdirpath[MAXPGPATH];
TempTablespacePath(tempdirpath, tablespace);
snprintf(path, MAXPGPATH, "%s/%s%d.%u.sharedfileset",
tempdirpath, PG_TEMP_FILE_PREFIX,
fileset->creator_pid, fileset->number);
}
/*
* Sorting hat to determine which tablespace a given shared temporary file
* belongs in.
*/
static Oid
ChooseTablespace(const SharedFileSet *fileset, const char *name)
{
uint32 hash = hash_any((const unsigned char *) name, strlen(name));
return fileset->tablespaces[hash % fileset->ntablespaces];
}
/*
* Compute the full path of a file in a SharedFileSet.
*/
static void
SharedFilePath(char *path, SharedFileSet *fileset, const char *name)
{
char dirpath[MAXPGPATH];
SharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
snprintf(path, MAXPGPATH, "%s/%s", dirpath, name);
}
......@@ -26,6 +26,8 @@
#ifndef BUFFILE_H
#define BUFFILE_H
#include "storage/sharedfileset.h"
/* BufFile is an opaque type whose details are not known outside buffile.c. */
typedef struct BufFile BufFile;
......@@ -42,4 +44,9 @@ extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum);
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
extern void BufFileExportShared(BufFile *file);
extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name);
extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
#endif /* BUFFILE_H */
......@@ -79,6 +79,14 @@ extern int FileGetRawDesc(File file);
extern int FileGetRawFlags(File file);
extern mode_t FileGetRawMode(File file);
/* Operations used for sharing named temporary files */
extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
extern File PathNameOpenTemporaryFile(const char *name);
extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure);
extern void PathNameCreateTemporaryDir(const char *base, const char *name);
extern void PathNameDeleteTemporaryDir(const char *name);
extern void TempTablespacePath(char *path, Oid tablespace);
/* Operations that allow use of regular stdio --- USE WITH CAUTION */
extern FILE *AllocateFile(const char *name, const char *mode);
extern int FreeFile(FILE *file);
......@@ -107,6 +115,7 @@ extern void set_max_safe_fds(void);
extern void closeAllVfds(void);
extern void SetTempTablespaces(Oid *tableSpaces, int numSpaces);
extern bool TempTablespacesAreSet(void);
extern int GetTempTablespaces(Oid *tableSpaces, int numSpaces);
extern Oid GetNextTempTableSpace(void);
extern void AtEOXact_Files(void);
extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
......@@ -124,7 +133,7 @@ extern int durable_unlink(const char *fname, int loglevel);
extern int durable_link_or_rename(const char *oldfile, const char *newfile, int loglevel);
extern void SyncDataDirectory(void);
/* Filename components for OpenTemporaryFile */
/* Filename components */
#define PG_TEMP_FILES_DIR "pgsql_tmp"
#define PG_TEMP_FILE_PREFIX "pgsql_tmp"
......
/*-------------------------------------------------------------------------
*
* sharedfileset.h
* Shared temporary file management.
*
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/sharedfilespace.h
*
*-------------------------------------------------------------------------
*/
#ifndef SHAREDFILESET_H
#define SHAREDFILESET_H
#include "storage/dsm.h"
#include "storage/fd.h"
#include "storage/spin.h"
/*
* A set of temporary files that can be shared by multiple backends.
*/
typedef struct SharedFileSet
{
pid_t creator_pid; /* PID of the creating process */
uint32 number; /* per-PID identifier */
slock_t mutex; /* mutex protecting the reference count */
int refcnt; /* number of attached backends */
int ntablespaces; /* number of tablespaces to use */
Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that
* it's rare that there more than temp
* tablespaces. */
} SharedFileSet;
extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg);
extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg);
extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name);
extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name);
extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name,
bool error_on_failure);
extern void SharedFileSetDeleteAll(SharedFileSet *fileset);
#endif
......@@ -2026,6 +2026,7 @@ SharedBitmapState
SharedDependencyObjectType
SharedDependencyType
SharedExecutorInstrumentation
SharedFileSet
SharedInvalCatalogMsg
SharedInvalCatcacheMsg
SharedInvalRelcacheMsg
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment