Commit fb0919fb authored by Tom Lane's avatar Tom Lane

Don't assume that max offset number stays fixed on a page when we're

not holding a pin on the page.  Use double instead of long to count
rows in relation, so that code still works for > LONG_MAX rows in rel.
parent 6497a7fd
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.21 2001/06/22 19:16:21 wieck Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.22 2001/07/05 19:33:35 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -97,8 +97,8 @@ typedef struct ...@@ -97,8 +97,8 @@ typedef struct
} ScalarMCVItem; } ScalarMCVItem;
#define swapInt(a,b) {int _tmp; _tmp=a; a=b; b=_tmp;} #define swapInt(a,b) do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
#define swapDatum(a,b) {Datum _tmp; _tmp=a; a=b; b=_tmp;} #define swapDatum(a,b) do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
static int MESSAGE_LEVEL; static int MESSAGE_LEVEL;
...@@ -111,20 +111,18 @@ static int *datumCmpTupnoLink; ...@@ -111,20 +111,18 @@ static int *datumCmpTupnoLink;
static VacAttrStats *examine_attribute(Relation onerel, int attnum); static VacAttrStats *examine_attribute(Relation onerel, int attnum);
static int acquire_sample_rows(Relation onerel, HeapTuple *rows, static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
int targrows, long *totalrows); int targrows, double *totalrows);
static double random_fract(void); static double random_fract(void);
static double init_selection_state(int n); static double init_selection_state(int n);
static long select_next_random_record(long t, int n, double *stateptr); static double select_next_random_record(double t, int n, double *stateptr);
static int compare_rows(const void *a, const void *b); static int compare_rows(const void *a, const void *b);
static int compare_scalars(const void *a, const void *b); static int compare_scalars(const void *a, const void *b);
static int compare_mcvs(const void *a, const void *b); static int compare_mcvs(const void *a, const void *b);
static OffsetNumber get_page_max_offset(Relation relation,
BlockNumber blocknumber);
static void compute_minimal_stats(VacAttrStats *stats, static void compute_minimal_stats(VacAttrStats *stats,
TupleDesc tupDesc, long totalrows, TupleDesc tupDesc, double totalrows,
HeapTuple *rows, int numrows); HeapTuple *rows, int numrows);
static void compute_scalar_stats(VacAttrStats *stats, static void compute_scalar_stats(VacAttrStats *stats,
TupleDesc tupDesc, long totalrows, TupleDesc tupDesc, double totalrows,
HeapTuple *rows, int numrows); HeapTuple *rows, int numrows);
static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats); static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
...@@ -143,7 +141,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt) ...@@ -143,7 +141,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt)
VacAttrStats **vacattrstats; VacAttrStats **vacattrstats;
int targrows, int targrows,
numrows; numrows;
long totalrows; double totalrows;
HeapTuple *rows; HeapTuple *rows;
HeapTuple tuple; HeapTuple tuple;
...@@ -298,7 +296,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt) ...@@ -298,7 +296,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt)
if (!vacstmt->vacuum) if (!vacstmt->vacuum)
vac_update_relstats(RelationGetRelid(onerel), vac_update_relstats(RelationGetRelid(onerel),
onerel->rd_nblocks, onerel->rd_nblocks,
(double) totalrows, totalrows,
RelationGetForm(onerel)->relhasindex); RelationGetForm(onerel)->relhasindex);
/* /*
...@@ -488,7 +486,7 @@ examine_attribute(Relation onerel, int attnum) ...@@ -488,7 +486,7 @@ examine_attribute(Relation onerel, int attnum)
*/ */
static int static int
acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
long *totalrows) double *totalrows)
{ {
int numrows = 0; int numrows = 0;
HeapScanDesc scan; HeapScanDesc scan;
...@@ -499,7 +497,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -499,7 +497,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
OffsetNumber lastoffset; OffsetNumber lastoffset;
int numest; int numest;
double tuplesperpage; double tuplesperpage;
long t; double t;
double rstate; double rstate;
Assert(targrows > 1); Assert(targrows > 1);
...@@ -520,7 +518,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -520,7 +518,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
*/ */
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
{ {
*totalrows = numrows; *totalrows = (double) numrows;
return numrows; return numrows;
} }
/* /*
...@@ -565,20 +563,22 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -565,20 +563,22 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
} }
tuplesperpage = (double) numest / (double) estblock; tuplesperpage = (double) numest / (double) estblock;
t = numrows; /* t is the # of records processed so far */ t = (double) numrows; /* t is the # of records processed so far */
rstate = init_selection_state(targrows); rstate = init_selection_state(targrows);
for (;;) for (;;)
{ {
double targpos; double targpos;
BlockNumber targblock; BlockNumber targblock;
Buffer targbuffer;
Page targpage;
OffsetNumber targoffset, OffsetNumber targoffset,
maxoffset; maxoffset;
t = select_next_random_record(t, targrows, &rstate); t = select_next_random_record(t, targrows, &rstate);
/* Try to read the t'th record in the table */ /* Try to read the t'th record in the table */
targpos = (double) t / tuplesperpage; targpos = t / tuplesperpage;
targblock = (BlockNumber) targpos; targblock = (BlockNumber) targpos;
targoffset = ((int) (targpos - targblock) * tuplesperpage) + targoffset = ((int) ((targpos - targblock) * tuplesperpage)) +
FirstOffsetNumber; FirstOffsetNumber;
/* Make sure we are past the last selected record */ /* Make sure we are past the last selected record */
if (targblock <= lastblock) if (targblock <= lastblock)
...@@ -595,21 +595,37 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -595,21 +595,37 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
*/ */
if (targblock >= onerel->rd_nblocks) if (targblock >= onerel->rd_nblocks)
break; break;
maxoffset = get_page_max_offset(onerel, targblock); /*
* We must maintain a pin on the target page's buffer to ensure that
* the maxoffset value stays good (else concurrent VACUUM might
* delete tuples out from under us). Hence, pin the page until we
* are done looking at it. We don't maintain a lock on the page,
* so tuples could get added to it, but we ignore such tuples.
*/
targbuffer = ReadBuffer(onerel, targblock);
if (!BufferIsValid(targbuffer))
elog(ERROR, "acquire_sample_rows: ReadBuffer(%s,%u) failed",
RelationGetRelationName(onerel), targblock);
LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
targpage = BufferGetPage(targbuffer);
maxoffset = PageGetMaxOffsetNumber(targpage);
LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
for (;;) for (;;)
{ {
HeapTupleData targtuple; HeapTupleData targtuple;
Buffer targbuffer; Buffer tupbuffer;
if (targoffset > maxoffset) if (targoffset > maxoffset)
{ {
/* Fell off end of this page, try next */ /* Fell off end of this page, try next */
ReleaseBuffer(targbuffer);
targblock++; targblock++;
targoffset = FirstOffsetNumber; targoffset = FirstOffsetNumber;
goto pageloop; goto pageloop;
} }
ItemPointerSet(&targtuple.t_self, targblock, targoffset); ItemPointerSet(&targtuple.t_self, targblock, targoffset);
heap_fetch(onerel, SnapshotNow, &targtuple, &targbuffer, NULL); heap_fetch(onerel, SnapshotNow, &targtuple, &tupbuffer, NULL);
if (targtuple.t_data != NULL) if (targtuple.t_data != NULL)
{ {
/* /*
...@@ -621,6 +637,9 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -621,6 +637,9 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
Assert(k >= 0 && k < targrows); Assert(k >= 0 && k < targrows);
heap_freetuple(rows[k]); heap_freetuple(rows[k]);
rows[k] = heap_copytuple(&targtuple); rows[k] = heap_copytuple(&targtuple);
/* this releases the second pin acquired by heap_fetch: */
ReleaseBuffer(tupbuffer);
/* this releases the initial pin: */
ReleaseBuffer(targbuffer); ReleaseBuffer(targbuffer);
lastblock = targblock; lastblock = targblock;
lastoffset = targoffset; lastoffset = targoffset;
...@@ -639,7 +658,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, ...@@ -639,7 +658,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
/* /*
* Estimate total number of valid rows in relation. * Estimate total number of valid rows in relation.
*/ */
*totalrows = (long) (onerel->rd_nblocks * tuplesperpage + 0.5); *totalrows = floor((double) onerel->rd_nblocks * tuplesperpage + 0.5);
return numrows; return numrows;
} }
...@@ -667,6 +686,12 @@ random_fract(void) ...@@ -667,6 +686,12 @@ random_fract(void)
* of the last record processed and next record to process. The only extra * of the last record processed and next record to process. The only extra
* state needed between calls is W, a random state variable. * state needed between calls is W, a random state variable.
* *
* Note: the original algorithm defines t, S, numer, and denom as integers.
* Here we express them as doubles to avoid overflow if the number of rows
* in the table exceeds INT_MAX. The algorithm should work as long as the
* row count does not become so large that it is not represented accurately
* in a double (on IEEE-math machines this would be around 2^52 rows).
*
* init_selection_state computes the initial W value. * init_selection_state computes the initial W value.
* *
* Given that we've already processed t records (t >= n), * Given that we've already processed t records (t >= n),
...@@ -680,36 +705,36 @@ init_selection_state(int n) ...@@ -680,36 +705,36 @@ init_selection_state(int n)
return exp(- log(random_fract())/n); return exp(- log(random_fract())/n);
} }
static long static double
select_next_random_record(long t, int n, double *stateptr) select_next_random_record(double t, int n, double *stateptr)
{ {
/* The magic constant here is T from Vitter's paper */ /* The magic constant here is T from Vitter's paper */
if (t <= (22 * n)) if (t <= (22.0 * n))
{ {
/* Process records using Algorithm X until t is large enough */ /* Process records using Algorithm X until t is large enough */
double V, double V,
quot; quot;
V = random_fract(); /* Generate V */ V = random_fract(); /* Generate V */
t++; t += 1;
quot = (double) (t - n) / (double) t; quot = (t - (double) n) / t;
/* Find min S satisfying (4.1) */ /* Find min S satisfying (4.1) */
while (quot > V) while (quot > V)
{ {
t++; t += 1;
quot *= (double) (t - n) / (double) t; quot *= (t - (double) n) / t;
} }
} }
else else
{ {
/* Now apply Algorithm Z */ /* Now apply Algorithm Z */
double W = *stateptr; double W = *stateptr;
long term = t - n + 1; double term = t - (double) n + 1;
int S; double S;
for (;;) for (;;)
{ {
long numer, double numer,
numer_lim, numer_lim,
denom; denom;
double U, double U,
...@@ -722,9 +747,9 @@ select_next_random_record(long t, int n, double *stateptr) ...@@ -722,9 +747,9 @@ select_next_random_record(long t, int n, double *stateptr)
/* Generate U and X */ /* Generate U and X */
U = random_fract(); U = random_fract();
X = t * (W - 1.0); X = t * (W - 1.0);
S = X; /* S is tentatively set to floor(X) */ S = floor(X); /* S is tentatively set to floor(X) */
/* Test if U <= h(S)/cg(X) in the manner of (6.3) */ /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
tmp = (double) (t + 1) / (double) term; tmp = (t + 1) / term;
lhs = exp(log(((U * tmp * tmp) * (term + S))/(t + X))/n); lhs = exp(log(((U * tmp * tmp) * (term + S))/(t + X))/n);
rhs = (((t + X)/(term + S)) * term)/t; rhs = (((t + X)/(term + S)) * term)/t;
if (lhs <= rhs) if (lhs <= rhs)
...@@ -734,20 +759,20 @@ select_next_random_record(long t, int n, double *stateptr) ...@@ -734,20 +759,20 @@ select_next_random_record(long t, int n, double *stateptr)
} }
/* Test if U <= f(S)/cg(X) */ /* Test if U <= f(S)/cg(X) */
y = (((U * (t + 1))/term) * (t + S + 1))/(t + X); y = (((U * (t + 1))/term) * (t + S + 1))/(t + X);
if (n < S) if ((double) n < S)
{ {
denom = t; denom = t;
numer_lim = term + S; numer_lim = term + S;
} }
else else
{ {
denom = t - n + S; denom = t - (double) n + S;
numer_lim = t + 1; numer_lim = t + 1;
} }
for (numer = t + S; numer >= numer_lim; numer--) for (numer = t + S; numer >= numer_lim; numer -= 1)
{ {
y *= (double) numer / (double) denom; y *= numer / denom;
denom--; denom -= 1;
} }
W = exp(- log(random_fract())/n); /* Generate W in advance */ W = exp(- log(random_fract())/n); /* Generate W in advance */
if (exp(log(y)/n) <= (t + X)/t) if (exp(log(y)/n) <= (t + X)/t)
...@@ -783,30 +808,6 @@ compare_rows(const void *a, const void *b) ...@@ -783,30 +808,6 @@ compare_rows(const void *a, const void *b)
return 0; return 0;
} }
/*
* Discover the largest valid tuple offset number on the given page
*
* This code probably ought to live in some other module.
*/
static OffsetNumber
get_page_max_offset(Relation relation, BlockNumber blocknumber)
{
Buffer buffer;
Page p;
OffsetNumber offnum;
buffer = ReadBuffer(relation, blocknumber);
if (!BufferIsValid(buffer))
elog(ERROR, "get_page_max_offset: %s relation: ReadBuffer(%ld) failed",
RelationGetRelationName(relation), (long) blocknumber);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
p = BufferGetPage(buffer);
offnum = PageGetMaxOffsetNumber(p);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return offnum;
}
/* /*
* compute_minimal_stats() -- compute minimal column statistics * compute_minimal_stats() -- compute minimal column statistics
...@@ -825,7 +826,7 @@ get_page_max_offset(Relation relation, BlockNumber blocknumber) ...@@ -825,7 +826,7 @@ get_page_max_offset(Relation relation, BlockNumber blocknumber)
*/ */
static void static void
compute_minimal_stats(VacAttrStats *stats, compute_minimal_stats(VacAttrStats *stats,
TupleDesc tupDesc, long totalrows, TupleDesc tupDesc, double totalrows,
HeapTuple *rows, int numrows) HeapTuple *rows, int numrows)
{ {
int i; int i;
...@@ -1002,7 +1003,7 @@ compute_minimal_stats(VacAttrStats *stats, ...@@ -1002,7 +1003,7 @@ compute_minimal_stats(VacAttrStats *stats,
if (f1 < 1) if (f1 < 1)
f1 = 1; f1 = 1;
term1 = sqrt((double) totalrows / (double) numrows) * f1; term1 = sqrt(totalrows / (double) numrows) * f1;
stats->stadistinct = floor(term1 + nmultiple + 0.5); stats->stadistinct = floor(term1 + nmultiple + 0.5);
} }
...@@ -1104,7 +1105,7 @@ compute_minimal_stats(VacAttrStats *stats, ...@@ -1104,7 +1105,7 @@ compute_minimal_stats(VacAttrStats *stats,
*/ */
static void static void
compute_scalar_stats(VacAttrStats *stats, compute_scalar_stats(VacAttrStats *stats,
TupleDesc tupDesc, long totalrows, TupleDesc tupDesc, double totalrows,
HeapTuple *rows, int numrows) HeapTuple *rows, int numrows)
{ {
int i; int i;
...@@ -1298,7 +1299,7 @@ compute_scalar_stats(VacAttrStats *stats, ...@@ -1298,7 +1299,7 @@ compute_scalar_stats(VacAttrStats *stats,
if (f1 < 1) if (f1 < 1)
f1 = 1; f1 = 1;
term1 = sqrt((double) totalrows / (double) numrows) * f1; term1 = sqrt(totalrows / (double) numrows) * f1;
stats->stadistinct = floor(term1 + nmultiple + 0.5); stats->stadistinct = floor(term1 + nmultiple + 0.5);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment