Commit a62d8393 authored by Tom Lane's avatar Tom Lane

Repair Large Object bugs demonstrated by Ian Grant's example. inv_write

was inappropriately relying on rel->rd_nblocks to tell if the LO is
empty (apparently a hack to get around a long-dead index bug), causing
misbehavior on a written-but-never-vacuumed LO.  Also, inv_read failed
to cope gracefully with 'holes' (unwritten regions) in the object.
parent 946e80c4
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.69 2000/06/05 07:28:45 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.70 2000/06/15 06:07:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -187,6 +187,7 @@ inv_create(int flags) ...@@ -187,6 +187,7 @@ inv_create(int flags)
retval->idesc = RelationGetDescr(indr); retval->idesc = RelationGetDescr(indr);
retval->offset = retval->lowbyte = retval->highbyte = 0; retval->offset = retval->lowbyte = retval->highbyte = 0;
ItemPointerSetInvalid(&(retval->htid)); ItemPointerSetInvalid(&(retval->htid));
retval->flags = 0;
if (flags & INV_WRITE) if (flags & INV_WRITE)
{ {
...@@ -198,7 +199,7 @@ inv_create(int flags) ...@@ -198,7 +199,7 @@ inv_create(int flags)
LockRelation(r, ShareLock); LockRelation(r, ShareLock);
retval->flags = IFS_RDLOCK; retval->flags = IFS_RDLOCK;
} }
retval->flags |= IFS_ATEOF; retval->flags |= IFS_ATEOF; /* since we know the object is empty */
return retval; return retval;
} }
...@@ -235,6 +236,7 @@ inv_open(Oid lobjId, int flags) ...@@ -235,6 +236,7 @@ inv_open(Oid lobjId, int flags)
retval->idesc = RelationGetDescr(indrel); retval->idesc = RelationGetDescr(indrel);
retval->offset = retval->lowbyte = retval->highbyte = 0; retval->offset = retval->lowbyte = retval->highbyte = 0;
ItemPointerSetInvalid(&(retval->htid)); ItemPointerSetInvalid(&(retval->htid));
retval->flags = 0;
if (flags & INV_WRITE) if (flags & INV_WRITE)
{ {
...@@ -373,14 +375,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) ...@@ -373,14 +375,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
if (whence == SEEK_CUR) if (whence == SEEK_CUR)
{ {
offset += obj_desc->offset; /* calculate absolute position */ offset += obj_desc->offset; /* calculate absolute position */
return inv_seek(obj_desc, offset, SEEK_SET);
} }
else if (whence == SEEK_END)
/*
* if you seek past the end (offset > 0) I have no clue what happens
* :-( B.L. 9/1/93
*/
if (whence == SEEK_END)
{ {
/* need read lock for getsize */ /* need read lock for getsize */
if (!(obj_desc->flags & IFS_RDLOCK)) if (!(obj_desc->flags & IFS_RDLOCK))
...@@ -391,8 +387,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) ...@@ -391,8 +387,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
offset += _inv_getsize(obj_desc->heap_r, offset += _inv_getsize(obj_desc->heap_r,
obj_desc->hdesc, obj_desc->hdesc,
obj_desc->index_r); obj_desc->index_r);
return inv_seek(obj_desc, offset, SEEK_SET);
} }
/* now we can assume that the operation is SEEK_SET */
/* /*
* Whenever we do a seek, we turn off the EOF flag bit to force * Whenever we do a seek, we turn off the EOF flag bit to force
...@@ -416,9 +412,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) ...@@ -416,9 +412,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
* stores the offset of the last byte that appears on it, and we have * stores the offset of the last byte that appears on it, and we have
* an index on this. * an index on this.
*/ */
/* right now, just assume that the operation is SEEK_SET */
if (obj_desc->iscan != (IndexScanDesc) NULL) if (obj_desc->iscan != (IndexScanDesc) NULL)
{ {
d = Int32GetDatum(offset); d = Int32GetDatum(offset);
...@@ -426,7 +419,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) ...@@ -426,7 +419,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
} }
else else
{ {
ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
Int32GetDatum(offset)); Int32GetDatum(offset));
...@@ -489,9 +481,27 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) ...@@ -489,9 +481,27 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
/* copy the data from this block into the buffer */ /* copy the data from this block into the buffer */
d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull); d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
fsblock = (struct varlena *) DatumGetPointer(d);
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
fsblock = (struct varlena *) DatumGetPointer(d); /*
* If block starts beyond current seek point, then we are looking
* at a "hole" (unwritten area) in the object. Return zeroes for
* the "hole".
*/
if (obj_desc->offset < obj_desc->lowbyte)
{
int nzeroes = obj_desc->lowbyte - obj_desc->offset;
if (nzeroes > (nbytes - nread))
nzeroes = (nbytes - nread);
MemSet(buf, 0, nzeroes);
buf += nzeroes;
nread += nzeroes;
obj_desc->offset += nzeroes;
if (nread >= nbytes)
break;
}
off = obj_desc->offset - obj_desc->lowbyte; off = obj_desc->offset - obj_desc->lowbyte;
ncopy = obj_desc->highbyte - obj_desc->offset + 1; ncopy = obj_desc->highbyte - obj_desc->offset + 1;
...@@ -537,14 +547,11 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) ...@@ -537,14 +547,11 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Buffer buffer; Buffer buffer;
/* /*
* Fetch the current inversion file system block. If the class * Fetch the current inversion file system block. We can skip
* storing the inversion file is empty, we don't want to do an * the work if we already know we are at EOF.
* index lookup, since index lookups choke on empty files (should
* be fixed someday).
*/ */
if ((obj_desc->flags & IFS_ATEOF) if (obj_desc->flags & IFS_ATEOF)
|| obj_desc->heap_r->rd_nblocks == 0)
tuple.t_data = NULL; tuple.t_data = NULL;
else else
inv_fetchtup(obj_desc, &tuple, &buffer); inv_fetchtup(obj_desc, &tuple, &buffer);
...@@ -659,6 +666,7 @@ inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer) ...@@ -659,6 +666,7 @@ inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
} }
else else
index_rescan(obj_desc->iscan, false, &skey); index_rescan(obj_desc->iscan, false, &skey);
do do
{ {
res = index_getnext(obj_desc->iscan, ForwardScanDirection); res = index_getnext(obj_desc->iscan, ForwardScanDirection);
...@@ -1149,7 +1157,8 @@ inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple) ...@@ -1149,7 +1157,8 @@ inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
pfree(res); pfree(res);
} }
/* #ifdef NOT_USED
static void static void
DumpPage(Page page, int blkno) DumpPage(Page page, int blkno)
{ {
...@@ -1239,7 +1248,8 @@ ItemPointerFormExternal(ItemPointer pointer) ...@@ -1239,7 +1248,8 @@ ItemPointerFormExternal(ItemPointer pointer)
return itemPointerString; return itemPointerString;
} }
*/
#endif
static int static int
_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln) _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment