Commit dad4cb62 authored by Tom Lane's avatar Tom Lane

Improve tuplestore.c to support multiple concurrent read positions.

This facility replaces the former mark/restore support but is otherwise
upward-compatible with previous uses.  It's expected to be needed for
single evaluation of CTEs and also for window functions, so I'm committing
it separately instead of waiting for either one of those patches to be
finished.  Per discussion with Greg Stark and Hitoshi Harada.

Note: I removed nodeFunctionscan's mark/restore support, instead of bothering
to update it for this change, because it was dead code anyway.
parent 233f1351
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.97 2008/08/05 21:28:29 tgl Exp $
* $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.98 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -239,10 +239,6 @@ ExecMarkPos(PlanState *node)
ExecTidMarkPos((TidScanState *) node);
break;
case T_FunctionScanState:
ExecFunctionMarkPos((FunctionScanState *) node);
break;
case T_ValuesScanState:
ExecValuesMarkPos((ValuesScanState *) node);
break;
......@@ -296,10 +292,6 @@ ExecRestrPos(PlanState *node)
ExecTidRestrPos((TidScanState *) node);
break;
case T_FunctionScanState:
ExecFunctionRestrPos((FunctionScanState *) node);
break;
case T_ValuesScanState:
ExecValuesRestrPos((ValuesScanState *) node);
break;
......@@ -332,7 +324,7 @@ ExecRestrPos(PlanState *node)
* (However, since the only present use of mark/restore is in mergejoin,
* there is no need to support mark/restore in any plan type that is not
* capable of generating ordered output. So the seqscan, tidscan,
* functionscan, and valuesscan support is actually useless code at present.)
* and valuesscan support is actually useless code at present.)
*/
bool
ExecSupportsMarkRestore(NodeTag plantype)
......@@ -342,7 +334,6 @@ ExecSupportsMarkRestore(NodeTag plantype)
case T_SeqScan:
case T_IndexScan:
case T_TidScan:
case T_FunctionScan:
case T_ValuesScan:
case T_Material:
case T_Sort:
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.46 2008/02/29 02:49:39 neilc Exp $
* $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.47 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -131,6 +131,9 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)
TypeFuncClass functypclass;
TupleDesc tupdesc = NULL;
/* check for unsupported flags */
Assert(!(eflags & EXEC_FLAG_MARK));
/*
* FunctionScan should not have any children.
*/
......@@ -273,42 +276,6 @@ ExecEndFunctionScan(FunctionScanState *node)
node->tuplestorestate = NULL;
}
/* ----------------------------------------------------------------
* ExecFunctionMarkPos
*
* Calls tuplestore to save the current position in the stored file.
* ----------------------------------------------------------------
*/
void
ExecFunctionMarkPos(FunctionScanState *node)
{
/*
* if we haven't materialized yet, just return.
*/
if (!node->tuplestorestate)
return;
tuplestore_markpos(node->tuplestorestate);
}
/* ----------------------------------------------------------------
* ExecFunctionRestrPos
*
* Calls tuplestore to restore the last saved file position.
* ----------------------------------------------------------------
*/
void
ExecFunctionRestrPos(FunctionScanState *node)
{
/*
* if we haven't materialized yet, just return.
*/
if (!node->tuplestorestate)
return;
tuplestore_restorepos(node->tuplestorestate);
}
/* ----------------------------------------------------------------
* ExecFunctionReScan
*
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.62 2008/03/23 00:54:04 tgl Exp $
* $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.63 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -51,7 +51,7 @@ ExecMaterial(MaterialState *node)
estate = node->ss.ps.state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
tuplestorestate = (Tuplestorestate *) node->tuplestorestate;
tuplestorestate = node->tuplestorestate;
/*
* If first time through, and we need a tuplestore, initialize it.
......@@ -60,7 +60,19 @@ ExecMaterial(MaterialState *node)
{
tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
tuplestore_set_eflags(tuplestorestate, node->eflags);
node->tuplestorestate = (void *) tuplestorestate;
if (node->eflags & EXEC_FLAG_MARK)
{
/*
* Allocate a second read pointer to serve as the mark.
* We know it must have index 1, so needn't store that.
*/
int ptrn;
ptrn = tuplestore_alloc_read_pointer(tuplestorestate,
node->eflags);
Assert(ptrn == 1);
}
node->tuplestorestate = tuplestorestate;
}
/*
......@@ -236,7 +248,7 @@ ExecEndMaterial(MaterialState *node)
* Release tuplestore resources
*/
if (node->tuplestorestate != NULL)
tuplestore_end((Tuplestorestate *) node->tuplestorestate);
tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
/*
......@@ -262,7 +274,10 @@ ExecMaterialMarkPos(MaterialState *node)
if (!node->tuplestorestate)
return;
tuplestore_markpos((Tuplestorestate *) node->tuplestorestate);
/*
* copy the active read pointer to the mark.
*/
tuplestore_copy_read_pointer(node->tuplestorestate, 0, 1);
}
/* ----------------------------------------------------------------
......@@ -283,9 +298,9 @@ ExecMaterialRestrPos(MaterialState *node)
return;
/*
* restore the scan to the previously marked position
* copy the mark to the active read pointer.
*/
tuplestore_restorepos((Tuplestorestate *) node->tuplestorestate);
tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0);
}
/* ----------------------------------------------------------------
......@@ -322,14 +337,14 @@ ExecMaterialReScan(MaterialState *node, ExprContext *exprCtxt)
if (((PlanState *) node)->lefttree->chgParam != NULL ||
(node->eflags & EXEC_FLAG_REWIND) == 0)
{
tuplestore_end((Tuplestorestate *) node->tuplestorestate);
tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
if (((PlanState *) node)->lefttree->chgParam == NULL)
ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
node->eof_underlying = false;
}
else
tuplestore_rescan((Tuplestorestate *) node->tuplestorestate);
tuplestore_rescan(node->tuplestorestate);
}
else
{
......
......@@ -11,6 +11,8 @@
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
* Also, it is possible to support multiple independent read pointers.
*
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
......@@ -20,25 +22,31 @@
* maxKBytes, we dump all the tuples into a temp file and then read from that
* when needed.
*
* Upon creation, a tuplestore supports a single read pointer, numbered 0.
* Additional read pointers can be created using tuplestore_alloc_read_pointer.
* Mark/restore behavior is supported by copying read pointers.
*
* When the caller requests backward-scan capability, we write the temp file
* in a format that allows either forward or backward scan. Otherwise, only
* forward scan is allowed. Rewind and markpos/restorepos are normally allowed
* but can be turned off via tuplestore_set_eflags; turning off both backward
* scan and rewind enables truncation of the tuplestore at the mark point
* (if any) for minimal memory usage.
* forward scan is allowed. A request for backward scan must be made before
* putting any tuples into the tuplestore. Rewind is normally allowed but
* can be turned off via tuplestore_set_eflags; turning off both backward
* scan and rewind for all read pointers enables truncation of the tuplestore
* at the oldest read point for minimal memory usage.
*
* Because we allow reading before writing is complete, there are two
* interesting positions in the temp file: the current read position and
* the current write position. At any given instant, the temp file's seek
* position corresponds to one of these, and the other one is remembered in
* the Tuplestore's state.
* Note: in TSS_WRITEFILE state, the temp file's seek position is the
* current write position, and the write-position variables in the tuplestore
* aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
* seek position is the active read pointer's position, and that read pointer
* isn't kept up to date. We update the appropriate variables using ftell()
* before switching to the other state or activating a different read pointer.
*
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.39 2008/05/12 00:00:53 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.40 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -63,13 +71,35 @@ typedef enum
TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
* State for a single read pointer. If we are in state INMEM then all the
* read pointers' "current" fields denote the read positions. In state
* WRITEFILE, the file/offset fields denote the read positions. In state
* READFILE, inactive read pointers have valid file/offset, but the active
* read pointer implicitly has position equal to the temp file's seek position.
*
* Special case: if eof_reached is true, then the pointer's read position is
* implicitly equal to the write position, and current/file/offset aren't
* maintained. This way we need not update all the read pointers each time
* we write.
*/
typedef struct
{
int eflags; /* capability flags */
bool eof_reached; /* read reached EOF */
int current; /* next array index to read */
int file; /* temp file# */
off_t offset; /* byte offset in file */
} TSReadPointer;
/*
* Private state of a Tuplestore operation.
*/
struct Tuplestorestate
{
TupStoreStatus status; /* enumerated value as shown above */
int eflags; /* capability flags */
int eflags; /* capability flags (OR of pointers' flags) */
bool backward; /* store extra length words in file? */
bool interXact; /* keep open through transactions? */
long availMem; /* remaining memory available, in bytes */
BufFile *myfile; /* underlying file, or NULL if none */
......@@ -116,31 +146,20 @@ struct Tuplestorestate
int memtupsize; /* allocated length of memtuples array */
/*
* These variables are used to keep track of the current position.
* These variables are used to keep track of the current positions.
*
* In state WRITEFILE, the current file seek position is the write point,
* and the read position is remembered in readpos_xxx; in state READFILE,
* the current file seek position is the read point, and the write
* position is remembered in writepos_xxx. (The write position is the
* same as EOF, but since BufFileSeek doesn't currently implement
* SEEK_END, we have to remember it explicitly.)
*
* Special case: if we are in WRITEFILE state and eof_reached is true,
* then the read position is implicitly equal to the write position (and
* hence to the file seek position); this way we need not update the
* readpos_xxx variables on each write.
* In state WRITEFILE, the current file seek position is the write point;
* in state READFILE, the write position is remembered in writepos_xxx.
* (The write position is the same as EOF, but since BufFileSeek doesn't
* currently implement SEEK_END, we have to remember it explicitly.)
*/
bool eof_reached; /* read reached EOF (always valid) */
int current; /* next array index (valid if INMEM) */
int readpos_file; /* file# (valid if WRITEFILE and not eof) */
off_t readpos_offset; /* offset (valid if WRITEFILE and not eof) */
int writepos_file; /* file# (valid if READFILE) */
off_t writepos_offset; /* offset (valid if READFILE) */
/* markpos_xxx holds marked position for mark and restore */
int markpos_current; /* saved "current" */
int markpos_file; /* saved "readpos_file" */
off_t markpos_offset; /* saved "readpos_offset" */
TSReadPointer *readptrs; /* array of read pointers */
int activeptr; /* index of the active read pointer */
int readptrcount; /* number of pointers currently valid */
int readptrsize; /* allocated length of readptrs array */
int writepos_file; /* file# (valid if READFILE state) */
off_t writepos_offset; /* offset (valid if READFILE state) */
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
......@@ -160,11 +179,11 @@ struct Tuplestorestate
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
* If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of
* If state->backward is true, then the stored representation of
* the tuple must be followed by another "unsigned int" that is a copy of the
* length --- so the total tape space used is actually sizeof(unsigned int)
* more than the stored length value. This allows read-backwards. When
* EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra
* state->backward is not set, the write/read routines may omit the extra
* length word.
*
* writetup is expected to write both length words as well as the tuple
......@@ -184,6 +203,7 @@ struct Tuplestorestate
* We count space allocated for tuples against the maxKBytes limit,
* plus the space used by the variable-size array memtuples.
* Fixed-size space (primarily the BufFile I/O buffer) is not counted.
* We don't worry about the size of the read pointer array, either.
*
* Note that we count actual space used (as shown by GetMemoryChunkSpace)
* rather than the originally-requested size. This is important since
......@@ -200,7 +220,7 @@ static Tuplestorestate *tuplestore_begin_common(int eflags,
int maxKBytes);
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
static void dumptuples(Tuplestorestate *state);
static void tuplestore_trim(Tuplestorestate *state, int ntuples);
static void tuplestore_trim(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
......@@ -231,8 +251,15 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->eof_reached = false;
state->current = 0;
state->activeptr = 0;
state->readptrcount = 1;
state->readptrsize = 8; /* arbitrary */
state->readptrs = (TSReadPointer *)
palloc(state->readptrsize * sizeof(TSReadPointer));
state->readptrs[0].eflags = eflags;
state->readptrs[0].eof_reached = false;
state->readptrs[0].current = 0;
return state;
}
......@@ -267,8 +294,8 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
* the pre-8.3 behavior of tuplestores.
*/
eflags = randomAccess ?
(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) :
(EXEC_FLAG_REWIND | EXEC_FLAG_MARK);
(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
(EXEC_FLAG_REWIND);
state = tuplestore_begin_common(eflags, interXact, maxKBytes);
......@@ -282,27 +309,70 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
/*
* tuplestore_set_eflags
*
* Set capability flags at a finer grain than is allowed by
* tuplestore_begin_xxx. This must be called before inserting any data
* into the tuplestore.
* Set the capability flags for read pointer 0 at a finer grain than is
* allowed by tuplestore_begin_xxx. This must be called before inserting
* any data into the tuplestore.
*
* eflags is a bitmask following the meanings used for executor node
* startup flags (see executor.h). tuplestore pays attention to these bits:
* EXEC_FLAG_REWIND need rewind to start
* EXEC_FLAG_BACKWARD need backward fetch
* EXEC_FLAG_MARK need mark/restore
* If tuplestore_set_eflags is not called, REWIND and MARK are allowed,
* and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call.
* If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
* is set per "randomAccess" in the tuplestore_begin_xxx call.
*/
void
tuplestore_set_eflags(Tuplestorestate *state, int eflags)
{
Assert(state->status == TSS_INMEM);
Assert(state->memtupcount == 0);
int i;
if (state->status != TSS_INMEM || state->memtupcount != 0)
elog(ERROR, "too late to call tuplestore_set_eflags");
state->readptrs[0].eflags = eflags;
for (i = 1; i < state->readptrcount; i++)
eflags |= state->readptrs[i].eflags;
state->eflags = eflags;
}
/*
* tuplestore_alloc_read_pointer - allocate another read pointer.
*
* Returns the pointer's index.
*
* The new pointer initially copies the position of read pointer 0.
* It can have its own eflags, but if any data has been inserted into
* the tuplestore, these eflags must not represent an increase in
* requirements.
*/
int
tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
{
/* Check for possible increase of requirements */
if (state->status != TSS_INMEM || state->memtupcount != 0)
{
if ((state->eflags | eflags) != state->eflags)
elog(ERROR, "too late to require new tuplestore eflags");
}
/* Make room for another read pointer if needed */
if (state->readptrcount >= state->readptrsize)
{
int newcnt = state->readptrsize * 2;
state->readptrs = (TSReadPointer *)
repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
state->readptrsize = newcnt;
}
/* And set it up */
state->readptrs[state->readptrcount] = state->readptrs[0];
state->readptrs[state->readptrcount].eflags = eflags;
state->eflags |= eflags;
return state->readptrcount++;
}
/*
* tuplestore_end
*
......@@ -321,18 +391,71 @@ tuplestore_end(Tuplestorestate *state)
pfree(state->memtuples[i]);
pfree(state->memtuples);
}
pfree(state->readptrs);
pfree(state);
}
/*
* tuplestore_select_read_pointer - make the specified read pointer active
*/
void
tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
{
TSReadPointer *readptr = &state->readptrs[ptr];
Assert(ptr >= 0 && ptr < state->readptrcount);
/* No work if already active */
if (ptr == state->activeptr)
return;
switch (state->status)
{
case TSS_INMEM:
case TSS_WRITEFILE:
/* no work */
break;
case TSS_READFILE:
/*
* We have to make the temp file's seek position equal to the
* logical position of the read pointer. In eof_reached state,
* that's the EOF, which we have available from the saved
* write position.
*/
if (readptr->eof_reached)
{
if (BufFileSeek(state->myfile,
state->writepos_file,
state->writepos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore seek failed");
}
else
{
if (BufFileSeek(state->myfile,
readptr->file,
readptr->offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore seek failed");
}
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
state->activeptr = ptr;
}
/*
* tuplestore_ateof
*
* Returns the current eof_reached state.
* Returns the active read pointer's eof_reached state.
*/
bool
tuplestore_ateof(Tuplestorestate *state)
{
return state->eof_reached;
return state->readptrs[state->activeptr].eof_reached;
}
/*
......@@ -340,8 +463,8 @@ tuplestore_ateof(Tuplestorestate *state)
*
* Note that the input tuple is always copied; the caller need not save it.
*
* If the read status is currently "AT EOF" then it remains so (the read
* pointer advances along with the write pointer); otherwise the read
* Any read pointer that is currently "AT EOF" remains so (the read pointer
* implicitly advances along with the write pointer); otherwise the read
* pointer is unchanged. This is for the convenience of nodeMaterial.c.
*
* tuplestore_puttupleslot() is a convenience routine to collect data from
......@@ -427,10 +550,6 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
/* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
/* If eof_reached, keep read position in sync */
if (state->eof_reached)
state->current = state->memtupcount;
/*
* Done if we still fit in available memory and have array slots.
*/
......@@ -443,6 +562,12 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
*/
PrepareTempTablespaces();
state->myfile = BufFileCreateTemp(state->interXact);
/*
* Freeze the decision about whether trailing length words
* will be used. We can't change this choice once data is on
* tape, even though callers might drop the requirement.
*/
state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
state->status = TSS_WRITEFILE;
dumptuples(state);
break;
......@@ -454,13 +579,14 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
/*
* Switch from reading to writing.
*/
if (!state->eof_reached)
if (!state->readptrs[state->activeptr].eof_reached)
BufFileTell(state->myfile,
&state->readpos_file, &state->readpos_offset);
&state->readptrs[state->activeptr].file,
&state->readptrs[state->activeptr].offset);
if (BufFileSeek(state->myfile,
state->writepos_file, state->writepos_offset,
SEEK_SET) != 0)
elog(ERROR, "seek to EOF failed");
elog(ERROR, "tuplestore seek to EOF failed");
state->status = TSS_WRITEFILE;
WRITETUP(state, tuple);
break;
......@@ -482,10 +608,11 @@ static void *
tuplestore_gettuple(Tuplestorestate *state, bool forward,
bool *should_free)
{
TSReadPointer *readptr = &state->readptrs[state->activeptr];
unsigned int tuplen;
void *tup;
Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD));
Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
switch (state->status)
{
......@@ -493,35 +620,47 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*should_free = false;
if (forward)
{
if (state->current < state->memtupcount)
return state->memtuples[state->current++];
state->eof_reached = true;
if (readptr->eof_reached)
return NULL;
if (readptr->current < state->memtupcount)
{
/*
* We have another tuple, so return it. Note: in
* principle we could try tuplestore_trim() here after
* advancing current, but this would cost cycles with
* little chance of success, so we don't bother.
*/
return state->memtuples[readptr->current++];
}
readptr->eof_reached = true;
return NULL;
}
else
{
if (state->current <= 0)
return NULL;
/*
* if all tuples are fetched already then we return last
* tuple, else - tuple before last returned.
*/
if (state->eof_reached)
state->eof_reached = false;
if (readptr->eof_reached)
{
readptr->current = state->memtupcount;
readptr->eof_reached = false;
}
else
{
state->current--; /* last returned tuple */
if (state->current <= 0)
if (readptr->current <= 0)
return NULL;
readptr->current--; /* last returned tuple */
}
return state->memtuples[state->current - 1];
if (readptr->current <= 0)
return NULL;
return state->memtuples[readptr->current - 1];
}
break;
case TSS_WRITEFILE:
/* Skip state change if we'll just return NULL */
if (state->eof_reached && forward)
if (readptr->eof_reached && forward)
return NULL;
/*
......@@ -529,11 +668,11 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*/
BufFileTell(state->myfile,
&state->writepos_file, &state->writepos_offset);
if (!state->eof_reached)
if (!readptr->eof_reached)
if (BufFileSeek(state->myfile,
state->readpos_file, state->readpos_offset,
readptr->file, readptr->offset,
SEEK_SET) != 0)
elog(ERROR, "seek failed");
elog(ERROR, "tuplestore seek failed");
state->status = TSS_READFILE;
/* FALL THRU into READFILE case */
......@@ -548,7 +687,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
}
else
{
state->eof_reached = true;
readptr->eof_reached = true;
return NULL;
}
}
......@@ -564,12 +703,16 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*/
if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
SEEK_CUR) != 0)
{
/* even a failed backwards fetch gets you out of eof state */
readptr->eof_reached = false;
return NULL;
}
tuplen = getlen(state, false);
if (state->eof_reached)
if (readptr->eof_reached)
{
state->eof_reached = false;
readptr->eof_reached = false;
/* We will return the tuple returned before returning NULL */
}
else
......@@ -670,9 +813,9 @@ tuplestore_advance(Tuplestorestate *state, bool forward)
/*
* dumptuples - remove tuples from memory and write to tape
*
* As a side effect, we must set readpos and markpos to the value
* corresponding to "current"; otherwise, a dump would lose the current read
* position.
* As a side effect, we must convert each read pointer's position from
* "current" to file/offset format. But eof_reached pointers don't
* need to change state.
*/
static void
dumptuples(Tuplestorestate *state)
......@@ -681,12 +824,15 @@ dumptuples(Tuplestorestate *state)
for (i = 0;; i++)
{
if (i == state->current)
BufFileTell(state->myfile,
&state->readpos_file, &state->readpos_offset);
if (i == state->markpos_current)
BufFileTell(state->myfile,
&state->markpos_file, &state->markpos_offset);
TSReadPointer *readptr = state->readptrs;
int j;
for (j = 0; j < state->readptrcount; readptr++, j++)
{
if (i == readptr->current && !readptr->eof_reached)
BufFileTell(state->myfile,
&readptr->file, &readptr->offset);
}
if (i >= state->memtupcount)
break;
WRITETUP(state, state->memtuples[i]);
......@@ -695,28 +841,30 @@ dumptuples(Tuplestorestate *state)
}
/*
* tuplestore_rescan - rewind and replay the scan
* tuplestore_rescan - rewind the active read pointer to start
*/
void
tuplestore_rescan(Tuplestorestate *state)
{
Assert(state->eflags & EXEC_FLAG_REWIND);
TSReadPointer *readptr = &state->readptrs[state->activeptr];
Assert(readptr->eflags & EXEC_FLAG_REWIND);
switch (state->status)
{
case TSS_INMEM:
state->eof_reached = false;
state->current = 0;
readptr->eof_reached = false;
readptr->current = 0;
break;
case TSS_WRITEFILE:
state->eof_reached = false;
state->readpos_file = 0;
state->readpos_offset = 0L;
readptr->eof_reached = false;
readptr->file = 0;
readptr->offset = 0L;
break;
case TSS_READFILE:
state->eof_reached = false;
readptr->eof_reached = false;
if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
elog(ERROR, "seek to start failed");
elog(ERROR, "tuplestore seek to start failed");
break;
default:
elog(ERROR, "invalid tuplestore state");
......@@ -725,85 +873,78 @@ tuplestore_rescan(Tuplestorestate *state)
}
/*
* tuplestore_markpos - saves current position in the tuple sequence
* tuplestore_copy_read_pointer - copy a read pointer's state to another
*/
void
tuplestore_markpos(Tuplestorestate *state)
tuplestore_copy_read_pointer(Tuplestorestate *state,
int srcptr, int destptr)
{
Assert(state->eflags & EXEC_FLAG_MARK);
TSReadPointer *sptr = &state->readptrs[srcptr];
TSReadPointer *dptr = &state->readptrs[destptr];
switch (state->status)
{
case TSS_INMEM:
state->markpos_current = state->current;
Assert(srcptr >= 0 && srcptr < state->readptrcount);
Assert(destptr >= 0 && destptr < state->readptrcount);
/*
* We can truncate the tuplestore if neither backward scan nor
* rewind capability are required by the caller. There will never
* be a need to back up past the mark point.
*
* Note: you might think we could remove all the tuples before
* "current", since that one is the next to be returned. However,
* since tuplestore_gettuple returns a direct pointer to our
* internal copy of the tuple, it's likely that the caller has
* still got the tuple just before "current" referenced in a slot.
* Don't free it yet.
*/
if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)))
tuplestore_trim(state, 1);
break;
case TSS_WRITEFILE:
if (state->eof_reached)
{
/* Need to record the implicit read position */
BufFileTell(state->myfile,
&state->markpos_file,
&state->markpos_offset);
}
else
{
state->markpos_file = state->readpos_file;
state->markpos_offset = state->readpos_offset;
}
break;
case TSS_READFILE:
BufFileTell(state->myfile,
&state->markpos_file,
&state->markpos_offset);
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
/* Assigning to self is a no-op */
if (srcptr == destptr)
return;
/*
* tuplestore_restorepos - restores current position in tuple sequence to
* last saved position
*/
void
tuplestore_restorepos(Tuplestorestate *state)
{
Assert(state->eflags & EXEC_FLAG_MARK);
if (dptr->eflags != sptr->eflags)
{
/* Possible change of overall eflags, so copy and then recompute */
int eflags;
int i;
*dptr = *sptr;
eflags = state->readptrs[0].eflags;
for (i = 1; i < state->readptrcount; i++)
eflags |= state->readptrs[i].eflags;
state->eflags = eflags;
}
else
*dptr = *sptr;
switch (state->status)
{
case TSS_INMEM:
state->eof_reached = false;
state->current = state->markpos_current;
/* We might be able to truncate the tuplestore */
tuplestore_trim(state);
break;
case TSS_WRITEFILE:
state->eof_reached = false;
state->readpos_file = state->markpos_file;
state->readpos_offset = state->markpos_offset;
break;
case TSS_READFILE:
state->eof_reached = false;
if (BufFileSeek(state->myfile,
state->markpos_file,
state->markpos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_restorepos failed");
/*
* This case is a bit tricky since the active read pointer's
* position corresponds to the seek point, not what is in its
* variables. Assigning to the active requires a seek, and
* assigning from the active requires a tell, except when
* eof_reached.
*/
if (destptr == state->activeptr)
{
if (dptr->eof_reached)
{
if (BufFileSeek(state->myfile,
state->writepos_file,
state->writepos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore seek failed");
}
else
{
if (BufFileSeek(state->myfile,
dptr->file, dptr->offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore seek failed");
}
}
else if (srcptr == state->activeptr)
{
if (!dptr->eof_reached)
BufFileTell(state->myfile,
&dptr->file,
&dptr->offset);
}
break;
default:
elog(ERROR, "invalid tuplestore state");
......@@ -812,14 +953,22 @@ tuplestore_restorepos(Tuplestorestate *state)
}
/*
* tuplestore_trim - remove all but ntuples tuples before current
* tuplestore_trim - remove all no-longer-needed tuples
*/
static void
tuplestore_trim(Tuplestorestate *state, int ntuples)
tuplestore_trim(Tuplestorestate *state)
{
int oldest;
int nremove;
int i;
/*
* We can truncate the tuplestore if neither backward scan nor
* rewind capability are required by any read pointer.
*/
if (state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND))
return;
/*
* We don't bother trimming temp files since it usually would mean more
* work than just letting them sit in kernel buffers until they age out.
......@@ -827,7 +976,23 @@ tuplestore_trim(Tuplestorestate *state, int ntuples)
if (state->status != TSS_INMEM)
return;
nremove = state->current - ntuples;
/* Find the oldest read pointer */
oldest = state->memtupcount;
for (i = 0; i < state->readptrcount; i++)
{
if (!state->readptrs[i].eof_reached)
oldest = Min(oldest, state->readptrs[i].current);
}
/*
* Note: you might think we could remove all the tuples before the oldest
* "current", since that one is the next to be returned. However,
* since tuplestore_gettuple returns a direct pointer to our
* internal copy of the tuple, it's likely that the caller has
* still got the tuple just before "current" referenced in a slot.
* So we keep one extra tuple before the oldest "current".
*/
nremove = oldest - 1;
if (nremove <= 0)
return; /* nothing to do */
Assert(nremove <= state->memtupcount);
......@@ -856,8 +1021,11 @@ tuplestore_trim(Tuplestorestate *state, int ntuples)
(state->memtupcount - nremove) * sizeof(void *));
state->memtupcount -= nremove;
state->current -= nremove;
state->markpos_current -= nremove;
for (i = 0; i < state->readptrcount; i++)
{
if (!state->readptrs[i].eof_reached)
state->readptrs[i].current -= nremove;
}
}
......@@ -910,7 +1078,7 @@ writetup_heap(Tuplestorestate *state, void *tup)
if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
elog(ERROR, "write failed");
if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
if (state->backward) /* need trailing length word? */
if (BufFileWrite(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "write failed");
......@@ -931,7 +1099,7 @@ readtup_heap(Tuplestorestate *state, unsigned int len)
if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
len - sizeof(int)) != (size_t) (len - sizeof(int)))
elog(ERROR, "unexpected end of data");
if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
if (state->backward) /* need trailing length word? */
if (BufFileRead(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "unexpected end of data");
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.11 2008/01/01 19:45:57 momjian Exp $
* $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.12 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -20,8 +20,6 @@ extern int ExecCountSlotsFunctionScan(FunctionScan *node);
extern FunctionScanState *ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecFunctionScan(FunctionScanState *node);
extern void ExecEndFunctionScan(FunctionScanState *node);
extern void ExecFunctionMarkPos(FunctionScanState *node);
extern void ExecFunctionRestrPos(FunctionScanState *node);
extern void ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt);
#endif /* NODEFUNCTIONSCAN_H */
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.187 2008/08/22 00:16:04 tgl Exp $
* $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.188 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1324,7 +1324,7 @@ typedef struct MaterialState
ScanState ss; /* its first field is NodeTag */
int eflags; /* capability flags to pass to tuplestore */
bool eof_underlying; /* reached end of underlying plan? */
void *tuplestorestate; /* private state of tuplestore.c */
Tuplestorestate *tuplestorestate;
} MaterialState;
/* ----------------
......
......@@ -11,6 +11,8 @@
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
* Also, it is possible to support multiple independent read pointers.
*
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
......@@ -22,7 +24,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.23 2008/03/25 19:26:53 neilc Exp $
* $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.24 2008/10/01 19:51:50 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -57,16 +59,21 @@ extern void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
/* tuplestore_donestoring() used to be required, but is no longer used */
#define tuplestore_donestoring(state) ((void) 0)
extern int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags);
extern void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr);
extern void tuplestore_copy_read_pointer(Tuplestorestate *state,
int srcptr, int destptr);
extern bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
TupleTableSlot *slot);
extern bool tuplestore_advance(Tuplestorestate *state, bool forward);
extern void tuplestore_end(Tuplestorestate *state);
extern bool tuplestore_ateof(Tuplestorestate *state);
extern void tuplestore_rescan(Tuplestorestate *state);
extern void tuplestore_markpos(Tuplestorestate *state);
extern void tuplestore_restorepos(Tuplestorestate *state);
extern void tuplestore_end(Tuplestorestate *state);
#endif /* TUPLESTORE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment