Commit a2597ef1 authored by Tom Lane's avatar Tom Lane

Modify sequence state storage to eliminate dangling-pointer problem

exemplified by bug #671.  Moving the storage to relcache turned out to
be a bad idea because relcache might decide to discard the info.  Instead,
open and close the relcache entry on each sequence operation, and use
a record of the current XID to discover whether we already hold
AccessShareLock on the sequence.
parent b8ffc996
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.123 2002/05/21 22:05:53 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.124 2002/05/22 21:40:55 tgl Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -166,7 +166,6 @@ ...@@ -166,7 +166,6 @@
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/async.h" #include "commands/async.h"
#include "commands/sequence.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "libpq/be-fsstubs.h" #include "libpq/be-fsstubs.h"
...@@ -947,7 +946,6 @@ CommitTransaction(void) ...@@ -947,7 +946,6 @@ CommitTransaction(void)
/* NOTIFY commit must also come before lower-level cleanup */ /* NOTIFY commit must also come before lower-level cleanup */
AtCommit_Notify(); AtCommit_Notify();
CloseSequences();
AtEOXact_portals(); AtEOXact_portals();
/* Here is where we really truly commit. */ /* Here is where we really truly commit. */
...@@ -1063,7 +1061,6 @@ AbortTransaction(void) ...@@ -1063,7 +1061,6 @@ AbortTransaction(void)
DeferredTriggerAbortXact(); DeferredTriggerAbortXact();
lo_commit(false); /* 'false' means it's abort */ lo_commit(false); /* 'false' means it's abort */
AtAbort_Notify(); AtAbort_Notify();
CloseSequences();
AtEOXact_portals(); AtEOXact_portals();
/* Advertise the fact that we aborted in pg_clog. */ /* Advertise the fact that we aborted in pg_clog. */
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.76 2002/04/15 05:22:03 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.77 2002/05/22 21:40:55 tgl Exp $
* *
* DESCRIPTION * DESCRIPTION
* The "DefineFoo" routines take the parse tree and pick out the * The "DefineFoo" routines take the parse tree and pick out the
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "commands/defrem.h" #include "commands/defrem.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "utils/int8.h"
/* /*
...@@ -114,6 +115,34 @@ defGetNumeric(DefElem *def) ...@@ -114,6 +115,34 @@ defGetNumeric(DefElem *def)
return 0; /* keep compiler quiet */ return 0; /* keep compiler quiet */
} }
/*
* Extract an int64 value from a DefElem.
*/
int64
defGetInt64(DefElem *def)
{
if (def->arg == NULL)
elog(ERROR, "Define: \"%s\" requires a numeric value",
def->defname);
switch (nodeTag(def->arg))
{
case T_Integer:
return (int64) intVal(def->arg);
case T_Float:
/*
* Values too large for int4 will be represented as Float
* constants by the lexer. Accept these if they are valid int8
* strings.
*/
return DatumGetInt64(DirectFunctionCall1(int8in,
CStringGetDatum(strVal(def->arg))));
default:
elog(ERROR, "Define: \"%s\" requires a numeric value",
def->defname);
}
return 0; /* keep compiler quiet */
}
/* /*
* Extract a possibly-qualified name (as a List of Strings) from a DefElem. * Extract a possibly-qualified name (as a List of Strings) from a DefElem.
*/ */
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.78 2002/05/21 22:05:54 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.79 2002/05/22 21:40:55 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -17,16 +17,14 @@ ...@@ -17,16 +17,14 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/int8.h"
#define SEQ_MAGIC 0x1717
#ifndef INT64_IS_BUSTED #ifndef INT64_IS_BUSTED
#ifdef HAVE_LL_CONSTANTS #ifdef HAVE_LL_CONSTANTS
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL) #define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL)
...@@ -46,29 +44,47 @@ ...@@ -46,29 +44,47 @@
*/ */
#define SEQ_LOG_VALS 32 #define SEQ_LOG_VALS 32
/*
* The "special area" of a sequence's buffer page looks like this.
*/
#define SEQ_MAGIC 0x1717
typedef struct sequence_magic typedef struct sequence_magic
{ {
uint32 magic; uint32 magic;
} sequence_magic; } sequence_magic;
/*
* We store a SeqTable item for every sequence we have touched in the current
* session. This is needed to hold onto nextval/currval state. (We can't
* rely on the relcache, since it's only, well, a cache, and may decide to
* discard entries.)
*
* XXX We use linear search to find pre-existing SeqTable entries. This is
* good when only a small number of sequences are touched in a session, but
* would suck with many different sequences. Perhaps use a hashtable someday.
*/
typedef struct SeqTableData typedef struct SeqTableData
{ {
Oid relid; struct SeqTableData *next; /* link to next SeqTable object */
Relation rel; /* NULL if rel is not open in cur xact */ Oid relid; /* pg_class OID of this sequence */
int64 cached; TransactionId xid; /* xact in which we last did a seq op */
int64 last; int64 last; /* value last returned by nextval */
int64 increment; int64 cached; /* last value already cached for nextval */
struct SeqTableData *next; /* if last != cached, we have not used up all the cached values */
int64 increment; /* copy of sequence's increment field */
} SeqTableData; } SeqTableData;
typedef SeqTableData *SeqTable; typedef SeqTableData *SeqTable;
static SeqTable seqtab = NULL; static SeqTable seqtab = NULL; /* Head of list of SeqTable items */
static SeqTable init_sequence(char *caller, RangeVar *relation);
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf); static void init_sequence(const char *caller, RangeVar *relation,
SeqTable *p_elm, Relation *p_rel);
static Form_pg_sequence read_info(const char *caller, SeqTable elm,
Relation rel, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new); static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
static int64 get_param(DefElem *def);
static void do_setval(RangeVar *sequence, int64 next, bool iscalled); static void do_setval(RangeVar *sequence, int64 next, bool iscalled);
/* /*
...@@ -281,6 +297,7 @@ nextval(PG_FUNCTION_ARGS) ...@@ -281,6 +297,7 @@ nextval(PG_FUNCTION_ARGS)
text *seqin = PG_GETARG_TEXT_P(0); text *seqin = PG_GETARG_TEXT_P(0);
RangeVar *sequence; RangeVar *sequence;
SeqTable elm; SeqTable elm;
Relation seqrel;
Buffer buf; Buffer buf;
Page page; Page page;
Form_pg_sequence seq; Form_pg_sequence seq;
...@@ -300,7 +317,7 @@ nextval(PG_FUNCTION_ARGS) ...@@ -300,7 +317,7 @@ nextval(PG_FUNCTION_ARGS)
"nextval")); "nextval"));
/* open and AccessShareLock sequence */ /* open and AccessShareLock sequence */
elm = init_sequence("nextval", sequence); init_sequence("nextval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK) if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s", elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
...@@ -309,11 +326,12 @@ nextval(PG_FUNCTION_ARGS) ...@@ -309,11 +326,12 @@ nextval(PG_FUNCTION_ARGS)
if (elm->last != elm->cached) /* some numbers were cached */ if (elm->last != elm->cached) /* some numbers were cached */
{ {
elm->last += elm->increment; elm->last += elm->increment;
relation_close(seqrel, NoLock);
PG_RETURN_INT64(elm->last); PG_RETURN_INT64(elm->last);
} }
seq = read_info("nextval", elm, &buf); /* lock page' buffer and /* lock page' buffer and read tuple */
* read tuple */ seq = read_info("nextval", elm, seqrel, &buf);
page = BufferGetPage(buf); page = BufferGetPage(buf);
last = next = result = seq->last_value; last = next = result = seq->last_value;
...@@ -421,7 +439,7 @@ nextval(PG_FUNCTION_ARGS) ...@@ -421,7 +439,7 @@ nextval(PG_FUNCTION_ARGS)
XLogRecPtr recptr; XLogRecPtr recptr;
XLogRecData rdata[2]; XLogRecData rdata[2];
xlrec.node = elm->rel->rd_node; xlrec.node = seqrel->rd_node;
rdata[0].buffer = InvalidBuffer; rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec; rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec); rdata[0].len = sizeof(xl_seq_rec);
...@@ -453,6 +471,8 @@ nextval(PG_FUNCTION_ARGS) ...@@ -453,6 +471,8 @@ nextval(PG_FUNCTION_ARGS)
if (WriteBuffer(buf) == STATUS_ERROR) if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.nextval: WriteBuffer failed", sequence->relname); elog(ERROR, "%s.nextval: WriteBuffer failed", sequence->relname);
relation_close(seqrel, NoLock);
PG_RETURN_INT64(result); PG_RETURN_INT64(result);
} }
...@@ -462,13 +482,14 @@ currval(PG_FUNCTION_ARGS) ...@@ -462,13 +482,14 @@ currval(PG_FUNCTION_ARGS)
text *seqin = PG_GETARG_TEXT_P(0); text *seqin = PG_GETARG_TEXT_P(0);
RangeVar *sequence; RangeVar *sequence;
SeqTable elm; SeqTable elm;
Relation seqrel;
int64 result; int64 result;
sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin, sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
"currval")); "currval"));
/* open and AccessShareLock sequence */ /* open and AccessShareLock sequence */
elm = init_sequence("currval", sequence); init_sequence("currval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK) if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
elog(ERROR, "%s.currval: you don't have permissions to read sequence %s", elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
...@@ -480,6 +501,8 @@ currval(PG_FUNCTION_ARGS) ...@@ -480,6 +501,8 @@ currval(PG_FUNCTION_ARGS)
result = elm->last; result = elm->last;
relation_close(seqrel, NoLock);
PG_RETURN_INT64(result); PG_RETURN_INT64(result);
} }
...@@ -500,18 +523,19 @@ static void ...@@ -500,18 +523,19 @@ static void
do_setval(RangeVar *sequence, int64 next, bool iscalled) do_setval(RangeVar *sequence, int64 next, bool iscalled)
{ {
SeqTable elm; SeqTable elm;
Relation seqrel;
Buffer buf; Buffer buf;
Form_pg_sequence seq; Form_pg_sequence seq;
/* open and AccessShareLock sequence */ /* open and AccessShareLock sequence */
elm = init_sequence("setval", sequence); init_sequence("setval", sequence, &elm, &seqrel);
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK) if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.setval: you don't have permissions to set sequence %s", elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
sequence->relname, sequence->relname); sequence->relname, sequence->relname);
/* lock page' buffer and read tuple */ /* lock page' buffer and read tuple */
seq = read_info("setval", elm, &buf); seq = read_info("setval", elm, seqrel, &buf);
if ((next < seq->min_value) || (next > seq->max_value)) if ((next < seq->min_value) || (next > seq->max_value))
elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")", elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")",
...@@ -529,7 +553,7 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled) ...@@ -529,7 +553,7 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
XLogRecData rdata[2]; XLogRecData rdata[2];
Page page = BufferGetPage(buf); Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node; xlrec.node = seqrel->rd_node;
rdata[0].buffer = InvalidBuffer; rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec; rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec); rdata[0].len = sizeof(xl_seq_rec);
...@@ -559,6 +583,8 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled) ...@@ -559,6 +583,8 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
if (WriteBuffer(buf) == STATUS_ERROR) if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.setval: WriteBuffer failed", sequence->relname); elog(ERROR, "%s.setval: WriteBuffer failed", sequence->relname);
relation_close(seqrel, NoLock);
} }
/* /*
...@@ -600,84 +626,48 @@ setval_and_iscalled(PG_FUNCTION_ARGS) ...@@ -600,84 +626,48 @@ setval_and_iscalled(PG_FUNCTION_ARGS)
PG_RETURN_INT64(next); PG_RETURN_INT64(next);
} }
static Form_pg_sequence
read_info(char *caller, SeqTable elm, Buffer *buf)
{
PageHeader page;
ItemId lp;
HeapTupleData tuple;
sequence_magic *sm;
Form_pg_sequence seq;
if (elm->rel->rd_nblocks > 1) /*
elog(ERROR, "%s.%s: invalid number of blocks in sequence", * Given a relation name, open and lock the sequence. p_elm and p_rel are
RelationGetRelationName(elm->rel), caller); * output parameters.
*/
*buf = ReadBuffer(elm->rel, 0); static void
if (!BufferIsValid(*buf)) init_sequence(const char *caller, RangeVar *relation,
elog(ERROR, "%s.%s: ReadBuffer failed", SeqTable *p_elm, Relation *p_rel)
RelationGetRelationName(elm->rel), caller);
LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
page = (PageHeader) BufferGetPage(*buf);
sm = (sequence_magic *) PageGetSpecialPointer(page);
if (sm->magic != SEQ_MAGIC)
elog(ERROR, "%s.%s: bad magic (%08X)",
RelationGetRelationName(elm->rel), caller, sm->magic);
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsUsed(lp));
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
elm->increment = seq->increment_by;
return seq;
}
static SeqTable
init_sequence(char *caller, RangeVar *relation)
{ {
Oid relid = RangeVarGetRelid(relation, false); Oid relid = RangeVarGetRelid(relation, false);
SeqTable elm, TransactionId thisxid = GetCurrentTransactionId();
prev = (SeqTable) NULL; SeqTable elm;
Relation seqrel; Relation seqrel;
/* Look to see if we already have a seqtable entry for relation */ /* Look to see if we already have a seqtable entry for relation */
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next) for (elm = seqtab; elm != NULL; elm = elm->next)
{ {
if (elm->relid == relid) if (elm->relid == relid)
break; break;
prev = elm;
} }
/* If so, and if it's already been opened in this xact, just return it */ /*
if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL) * Open the sequence relation, acquiring AccessShareLock if we don't
return elm; * already have a lock in the current xact.
*/
if (elm == NULL || elm->xid != thisxid)
seqrel = relation_open(relid, AccessShareLock);
else
seqrel = relation_open(relid, NoLock);
/* Else open and check it */
seqrel = heap_open(relid, AccessShareLock);
if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE) if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
elog(ERROR, "%s.%s: %s is not a sequence", elog(ERROR, "%s.%s: %s is not a sequence",
relation->relname, caller, relation->relname); relation->relname, caller, relation->relname);
/* /*
* If elm exists but elm->rel is NULL, the seqtable entry is left over * Allocate new seqtable entry if we didn't find one.
* from a previous xact -- update the entry and reuse it.
* *
* NOTE: seqtable entries remain in the list for the life of a backend. * NOTE: seqtable entries remain in the list for the life of a backend.
* If the sequence itself is deleted then the entry becomes wasted memory, * If the sequence itself is deleted then the entry becomes wasted memory,
* but it's small enough that this should not matter. * but it's small enough that this should not matter.
*/ */
if (elm != (SeqTable) NULL) if (elm == NULL)
{
elm->rel = seqrel;
}
else
{ {
/* /*
* Time to make a new seqtable entry. These entries live as long * Time to make a new seqtable entry. These entries live as long
...@@ -686,40 +676,59 @@ init_sequence(char *caller, RangeVar *relation) ...@@ -686,40 +676,59 @@ init_sequence(char *caller, RangeVar *relation)
elm = (SeqTable) malloc(sizeof(SeqTableData)); elm = (SeqTable) malloc(sizeof(SeqTableData));
if (elm == NULL) if (elm == NULL)
elog(ERROR, "Memory exhausted in init_sequence"); elog(ERROR, "Memory exhausted in init_sequence");
elm->rel = seqrel;
elm->relid = relid; elm->relid = relid;
elm->cached = elm->last = elm->increment = 0; /* increment is set to 0 until we do read_info (see currval) */
elm->next = (SeqTable) NULL; elm->last = elm->cached = elm->increment = 0;
elm->next = seqtab;
if (seqtab == (SeqTable) NULL) seqtab = elm;
seqtab = elm;
else
prev->next = elm;
} }
return elm; /* Flag that we have a lock in the current xact. */
elm->xid = thisxid;
*p_elm = elm;
*p_rel = seqrel;
} }
/* /* Given an opened relation, lock the page buffer and find the tuple */
* CloseSequences static Form_pg_sequence
* is called by xact mgr at commit/abort. read_info(const char *caller, SeqTable elm,
*/ Relation rel, Buffer *buf)
void
CloseSequences(void)
{ {
SeqTable elm; PageHeader page;
Relation rel; ItemId lp;
HeapTupleData tuple;
sequence_magic *sm;
Form_pg_sequence seq;
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next) if (rel->rd_nblocks > 1)
{ elog(ERROR, "%s.%s: invalid number of blocks in sequence",
rel = elm->rel; RelationGetRelationName(rel), caller);
if (rel != (Relation) NULL) /* opened in current xact */
{ *buf = ReadBuffer(rel, 0);
elm->rel = (Relation) NULL; if (!BufferIsValid(*buf))
heap_close(rel, AccessShareLock); elog(ERROR, "%s.%s: ReadBuffer failed",
} RelationGetRelationName(rel), caller);
}
LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
page = (PageHeader) BufferGetPage(*buf);
sm = (sequence_magic *) PageGetSpecialPointer(page);
if (sm->magic != SEQ_MAGIC)
elog(ERROR, "%s.%s: bad magic (%08X)",
RelationGetRelationName(rel), caller, sm->magic);
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsUsed(lp));
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
elm->increment = seq->increment_by;
return seq;
} }
...@@ -761,7 +770,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) ...@@ -761,7 +770,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
if (increment_by == (DefElem *) NULL) /* INCREMENT BY */ if (increment_by == (DefElem *) NULL) /* INCREMENT BY */
new->increment_by = 1; new->increment_by = 1;
else if ((new->increment_by = get_param(increment_by)) == 0) else if ((new->increment_by = defGetInt64(increment_by)) == 0)
elog(ERROR, "DefineSequence: can't INCREMENT by 0"); elog(ERROR, "DefineSequence: can't INCREMENT by 0");
if (max_value == (DefElem *) NULL) /* MAXVALUE */ if (max_value == (DefElem *) NULL) /* MAXVALUE */
...@@ -772,7 +781,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) ...@@ -772,7 +781,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->max_value = -1; /* descending seq */ new->max_value = -1; /* descending seq */
} }
else else
new->max_value = get_param(max_value); new->max_value = defGetInt64(max_value);
if (min_value == (DefElem *) NULL) /* MINVALUE */ if (min_value == (DefElem *) NULL) /* MINVALUE */
{ {
...@@ -782,7 +791,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) ...@@ -782,7 +791,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->min_value = SEQ_MINVALUE; /* descending seq */ new->min_value = SEQ_MINVALUE; /* descending seq */
} }
else else
new->min_value = get_param(min_value); new->min_value = defGetInt64(min_value);
if (new->min_value >= new->max_value) if (new->min_value >= new->max_value)
elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")", elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")",
...@@ -796,7 +805,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) ...@@ -796,7 +805,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
new->last_value = new->max_value; /* descending seq */ new->last_value = new->max_value; /* descending seq */
} }
else else
new->last_value = get_param(last_value); new->last_value = defGetInt64(last_value);
if (new->last_value < new->min_value) if (new->last_value < new->min_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")", elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")",
...@@ -807,33 +816,12 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) ...@@ -807,33 +816,12 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
if (cache_value == (DefElem *) NULL) /* CACHE */ if (cache_value == (DefElem *) NULL) /* CACHE */
new->cache_value = 1; new->cache_value = 1;
else if ((new->cache_value = get_param(cache_value)) <= 0) else if ((new->cache_value = defGetInt64(cache_value)) <= 0)
elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0", elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0",
new->cache_value); new->cache_value);
} }
static int64
get_param(DefElem *def)
{
if (def->arg == (Node *) NULL)
elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
if (IsA(def->arg, Integer))
return (int64) intVal(def->arg);
/*
* Values too large for int4 will be represented as Float constants by
* the lexer. Accept these if they are valid int8 strings.
*/
if (IsA(def->arg, Float))
return DatumGetInt64(DirectFunctionCall1(int8in,
CStringGetDatum(strVal(def->arg))));
/* Shouldn't get here unless parser messed up */
elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
return 0; /* not reached; keep compiler quiet */
}
void void
seq_redo(XLogRecPtr lsn, XLogRecord *record) seq_redo(XLogRecPtr lsn, XLogRecord *record)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: defrem.h,v 1.37 2002/05/17 18:32:52 petere Exp $ * $Id: defrem.h,v 1.38 2002/05/22 21:40:55 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -54,12 +54,13 @@ extern void DefineDomain(CreateDomainStmt *stmt); ...@@ -54,12 +54,13 @@ extern void DefineDomain(CreateDomainStmt *stmt);
extern void RemoveDomain(List *names, int behavior); extern void RemoveDomain(List *names, int behavior);
/* support routines in define.c */ /* support routines in commands/define.c */
extern void case_translate_language_name(const char *input, char *output); extern void case_translate_language_name(const char *input, char *output);
extern char *defGetString(DefElem *def); extern char *defGetString(DefElem *def);
extern double defGetNumeric(DefElem *def); extern double defGetNumeric(DefElem *def);
extern int64 defGetInt64(DefElem *def);
extern List *defGetQualifiedName(DefElem *def); extern List *defGetQualifiedName(DefElem *def);
extern TypeName *defGetTypeName(DefElem *def); extern TypeName *defGetTypeName(DefElem *def);
extern int defGetTypeLength(DefElem *def); extern int defGetTypeLength(DefElem *def);
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: sequence.h,v 1.19 2001/11/05 17:46:33 momjian Exp $ * $Id: sequence.h,v 1.20 2002/05/22 21:40:55 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -84,7 +84,6 @@ extern Datum setval(PG_FUNCTION_ARGS); ...@@ -84,7 +84,6 @@ extern Datum setval(PG_FUNCTION_ARGS);
extern Datum setval_and_iscalled(PG_FUNCTION_ARGS); extern Datum setval_and_iscalled(PG_FUNCTION_ARGS);
extern void DefineSequence(CreateSeqStmt *stmt); extern void DefineSequence(CreateSeqStmt *stmt);
extern void CloseSequences(void);
extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr); extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment