Commit 293d1e5f authored by Bruce Momjian's avatar Bruce Momjian

here it is as requested by Bruce.

I tested it restoring my database with > 100000 BLOBS, and dumping it out.
But unfortunatly I can not restore it back due to problems in pg_dump.

--
Sincerely Yours,
Denis Perchine
parent 33581195
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# Makefile for catalog # Makefile for catalog
# #
# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.28 2000/10/20 21:03:42 petere Exp $ # $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.29 2000/10/21 15:55:21 momjian Exp $
# #
#------------------------------------------------------------------------- #-------------------------------------------------------------------------
...@@ -11,7 +11,8 @@ top_builddir = ../../.. ...@@ -11,7 +11,8 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
OBJS = catalog.o heap.o index.o indexing.o aclchk.o \ OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
pg_aggregate.o pg_operator.o pg_proc.o pg_type.o pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
pg_type.o
BKIFILES = global.bki template1.bki global.description template1.description BKIFILES = global.bki template1.bki global.description template1.description
...@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\ ...@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_proc.h pg_type.h pg_attribute.h pg_class.h \ pg_proc.h pg_type.h pg_attribute.h pg_class.h \
pg_inherits.h pg_index.h pg_statistic.h \ pg_inherits.h pg_index.h pg_statistic.h \
pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
pg_language.h \ pg_language.h pg_largeobject.h \
pg_aggregate.h pg_ipl.h pg_inheritproc.h \ pg_aggregate.h pg_ipl.h pg_inheritproc.h \
pg_rewrite.h pg_listener.h pg_description.h indexing.h \ pg_rewrite.h pg_listener.h pg_description.h indexing.h \
) )
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.69 2000/10/08 03:53:13 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.70 2000/10/21 15:55:21 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -51,6 +51,8 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] = ...@@ -51,6 +51,8 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] =
{InheritsRelidSeqnoIndex}; {InheritsRelidSeqnoIndex};
char *Name_pg_language_indices[Num_pg_language_indices] = char *Name_pg_language_indices[Num_pg_language_indices] =
{LanguageOidIndex, LanguageNameIndex}; {LanguageOidIndex, LanguageNameIndex};
char *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
{LargeobjectLOIdIndex, LargeobjectLOIdPNIndex};
char *Name_pg_listener_indices[Num_pg_listener_indices] = char *Name_pg_listener_indices[Num_pg_listener_indices] =
{ListenerPidRelnameIndex}; {ListenerPidRelnameIndex};
char *Name_pg_opclass_indices[Num_pg_opclass_indices] = char *Name_pg_opclass_indices[Num_pg_opclass_indices] =
......
/*-------------------------------------------------------------------------
*
* pg_largeobject.c
* routines to support manipulation of the pg_largeobject relation
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.3 2000/10/21 15:55:21 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/indexing.h"
#include "catalog/pg_largeobject.h"
#include "miscadmin.h"
#include "utils/fmgroids.h"
bytea *_byteain(const char *data, int32 size);
bytea *_byteain(const char *data, int32 size) {
bytea *result;
result = (bytea *)palloc(size + VARHDRSZ);
result->vl_len = size + VARHDRSZ;
if (size > 0)
memcpy(result->vl_dat, data, size);
return result;
}
Oid LargeobjectCreate(Oid loid) {
Oid retval;
Relation pg_largeobject;
HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
Relation idescs[Num_pg_largeobject_indices];
Datum values[Natts_pg_largeobject];
char nulls[Natts_pg_largeobject];
int i;
for (i=0; i<Natts_pg_largeobject; i++) {
nulls[i] = ' ';
values[i] = (Datum)NULL;
}
i = 0;
values[i++] = ObjectIdGetDatum(loid);
values[i++] = Int32GetDatum(0);
values[i++] = (Datum) _byteain(NULL, 0);
pg_largeobject = heap_openr(LargeobjectRelationName, RowExclusiveLock);
ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
retval = heap_insert(pg_largeobject, ntup);
if (!IsIgnoringSystemIndexes()) {
CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
}
heap_close(pg_largeobject, RowExclusiveLock);
heap_freetuple(ntup);
CommandCounterIncrement();
return retval;
}
void LargeobjectDrop(Oid loid) {
Relation pg_largeobject;
Relation pg_lo_id;
ScanKeyData skey;
IndexScanDesc sd = (IndexScanDesc) NULL;
RetrieveIndexResult indexRes;
int found = 0;
ScanKeyEntryInitialize(&skey,
(bits16) 0x0,
(AttrNumber) 1,
(RegProcedure) F_OIDEQ,
ObjectIdGetDatum(loid));
pg_largeobject = heap_openr(LargeobjectRelationName, RowShareLock);
pg_lo_id = index_openr(LargeobjectLOIdIndex);
sd = index_beginscan(pg_lo_id, false, 1, &skey);
while((indexRes = index_getnext(sd, ForwardScanDirection))) {
found++;
heap_delete(pg_largeobject, &indexRes->heap_iptr, NULL);
pfree(indexRes);
}
index_endscan(sd);
index_close(pg_lo_id);
heap_close(pg_largeobject, RowShareLock);
if (found == 0)
elog(ERROR, "LargeobjectDrop: large object %d not found", loid);
}
int LargeobjectFind(Oid loid) {
int retval = 0;
Relation pg_lo_id;
ScanKeyData skey;
IndexScanDesc sd = (IndexScanDesc) NULL;
RetrieveIndexResult indexRes;
ScanKeyEntryInitialize(&skey,
(bits16) 0x0,
(AttrNumber) 1,
(RegProcedure) F_OIDEQ,
ObjectIdGetDatum(loid));
pg_lo_id = index_openr(LargeobjectLOIdIndex);
sd = index_beginscan(pg_lo_id, false, 1, &skey);
if ((indexRes = index_getnext(sd, ForwardScanDirection))) {
retval = 1;
pfree(indexRes);
}
index_endscan(sd);
index_close(pg_lo_id);
return retval;
}
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.52 2000/10/08 03:53:13 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.53 2000/10/21 15:55:22 momjian Exp $
* *
* NOTES * NOTES
* This should be moved to a more appropriate place. It is here * This should be moved to a more appropriate place. It is here
...@@ -267,7 +267,7 @@ lo_creat(PG_FUNCTION_ARGS) ...@@ -267,7 +267,7 @@ lo_creat(PG_FUNCTION_ARGS)
PG_RETURN_OID(InvalidOid); PG_RETURN_OID(InvalidOid);
} }
lobjId = RelationGetRelid(lobjDesc->heap_r); lobjId = lobjDesc->id;
inv_close(lobjDesc); inv_close(lobjDesc);
...@@ -512,8 +512,10 @@ lo_commit(bool isCommit) ...@@ -512,8 +512,10 @@ lo_commit(bool isCommit)
{ {
if (cookies[i] != NULL) if (cookies[i] != NULL)
{ {
/*
if (isCommit) if (isCommit)
inv_cleanindex(cookies[i]); inv_cleanindex(cookies[i]);
*/
cookies[i] = NULL; cookies[i] = NULL;
} }
} }
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.76 2000/10/08 03:53:14 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.77 2000/10/21 15:55:24 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -22,58 +22,34 @@ ...@@ -22,58 +22,34 @@
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/nbtree.h" #include "access/nbtree.h"
#include "access/htup.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/catname.h"
#include "catalog/heap.h" #include "catalog/heap.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/pg_opclass.h" #include "catalog/pg_opclass.h"
#include "catalog/pg_largeobject.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "libpq/libpq-fs.h" #include "libpq/libpq-fs.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/large_object.h" #include "storage/large_object.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/relcache.h" #include "utils/builtins.h"
/* #include <errno.h>
* Warning, Will Robinson... In order to pack data into an inversion
* file as densely as possible, we violate the class abstraction here.
* When we're appending a new tuple to the end of the table, we check
* the last page to see how much data we can put on it. If it's more
* than IMINBLK, we write enough to fill the page. This limits external
* fragmentation. In no case can we write more than IMAXBLK, since
* the 8K postgres page size less overhead leaves only this much space
* for data.
*/
/* #define IBLKSIZE (MaxTupleSize - MinHeapTupleBitmapSize - sizeof(int32) * 3)
* In order to prevent buffer leak on transaction commit, large object
* scan index handling has been modified. Indexes are persistant inside
* a transaction but may be closed between two calls to this API (when
* transaction is committed while object is opened, or when no
* transaction is active). Scan indexes are thus now reinitialized using
* the object current offset. [PA]
*
* Some cleanup has been also done for non freed memory.
*
* For subsequent notes, [PA] is Pascal Andr <andre@via.ecp.fr>
*/
#define IFREESPC(p) (PageGetFreeSpace(p) - \ /* Defined in backend/storage/catalog/large_object.c */
MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \ bytea *_byteain(const char *data, int32 size);
MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
sizeof(double)) static int32 getbytealen(bytea *data) {
#define IMAXBLK 8092 if (VARSIZE(data) < VARHDRSZ)
#define IMINBLK 512 elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
return (VARSIZE(data) - VARHDRSZ);
/* non-export function prototypes */ }
static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
Page page, char *dbuf, int nwrite);
static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
HeapTuple tuple, Buffer buffer);
static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
/* /*
* inv_create -- create a new large object. * inv_create -- create a new large object.
...@@ -84,18 +60,12 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln); ...@@ -84,18 +60,12 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
* Returns: * Returns:
* large object descriptor, appropriately filled in. * large object descriptor, appropriately filled in.
*/ */
LargeObjectDesc * LargeObjectDesc *
inv_create(int flags) inv_create(int flags)
{ {
int file_oid;
LargeObjectDesc *retval; LargeObjectDesc *retval;
Oid file_oid;
Relation r;
Relation indr;
TupleDesc tupdesc;
IndexInfo *indexInfo;
Oid classObjectId[1];
char objname[NAMEDATALEN];
char indname[NAMEDATALEN];
/* /*
* add one here since the pg_class tuple created will have the next * add one here since the pg_class tuple created will have the next
...@@ -104,104 +74,25 @@ inv_create(int flags) ...@@ -104,104 +74,25 @@ inv_create(int flags)
*/ */
file_oid = newoid() + 1; file_oid = newoid() + 1;
/* come up with some table names */ if (LargeobjectFind(file_oid) == 1)
sprintf(objname, "xinv%u", file_oid); elog(ERROR, "inv_create: large object %d already exists. This is internal error.", file_oid);
sprintf(indname, "xinx%u", file_oid);
if (RelnameFindRelid(objname) != InvalidOid)
elog(ERROR,
"internal error: %s already exists -- cannot create large obj",
objname);
if (RelnameFindRelid(indname) != InvalidOid)
elog(ERROR,
"internal error: %s already exists -- cannot create large obj",
indname);
/* this is pretty painful... want a tuple descriptor */
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitEntry(tupdesc, (AttrNumber) 1,
"olastbye",
INT4OID,
-1, 0, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 2,
"odata",
BYTEAOID,
-1, 0, false);
/*
* First create the table to hold the inversion large object. It will
* be located on whatever storage manager the user requested.
*/
heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
false, false);
/* make the relation visible in this transaction */
CommandCounterIncrement();
/*--------------------
* We hold AccessShareLock on any large object we have open
* by inv_create or inv_open; it is released by inv_close.
* Note this will not conflict with ExclusiveLock or ShareLock
* that we acquire when actually reading/writing; it just prevents
* deletion of the large object while we have it open.
*--------------------
*/
r = heap_openr(objname, AccessShareLock);
/*
* Now create a btree index on the relation's olastbyte attribute to
* make seeks go faster.
*/
indexInfo = makeNode(IndexInfo);
indexInfo->ii_NumIndexAttrs = 1;
indexInfo->ii_NumKeyAttrs = 1;
indexInfo->ii_KeyAttrNumbers[0] = 1;
indexInfo->ii_Predicate = NULL;
indexInfo->ii_FuncOid = InvalidOid;
indexInfo->ii_Unique = false;
classObjectId[0] = INT4_OPS_OID;
index_create(objname, indname, indexInfo,
BTREE_AM_OID, classObjectId,
false, false, false);
/* make the index visible in this transaction */
CommandCounterIncrement();
indr = index_openr(indname);
if (!RelationIsValid(indr))
{
elog(ERROR, "cannot create index for large obj on %s under inversion",
DatumGetCString(DirectFunctionCall1(smgrout,
Int16GetDatum(DEFAULT_SMGR))));
}
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
retval->heap_r = r; if (flags & INV_WRITE) {
retval->index_r = indr;
retval->iscan = (IndexScanDesc) NULL;
retval->hdesc = RelationGetDescr(r);
retval->idesc = RelationGetDescr(indr);
retval->offset = retval->lowbyte = retval->highbyte = 0;
ItemPointerSetInvalid(&(retval->htid));
retval->flags = 0;
if (flags & INV_WRITE)
{
LockRelation(r, ExclusiveLock);
retval->flags = IFS_WRLOCK | IFS_RDLOCK; retval->flags = IFS_WRLOCK | IFS_RDLOCK;
} retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
else if (flags & INV_READ) } else if (flags & INV_READ) {
{
LockRelation(r, ShareLock);
retval->flags = IFS_RDLOCK; retval->flags = IFS_RDLOCK;
} retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
retval->flags |= IFS_ATEOF; /* since we know the object is empty */ } else
elog(ERROR, "inv_create: invalid flags: %d", flags);
retval->flags |= IFS_ATEOF;
retval->index_r = index_openr(LargeobjectLOIdPNIndex);
retval->offset = 0;
retval->id = file_oid;
(void)LargeobjectCreate(file_oid);
return retval; return retval;
} }
...@@ -209,46 +100,24 @@ LargeObjectDesc * ...@@ -209,46 +100,24 @@ LargeObjectDesc *
inv_open(Oid lobjId, int flags) inv_open(Oid lobjId, int flags)
{ {
LargeObjectDesc *retval; LargeObjectDesc *retval;
Relation r;
char *indname;
Relation indrel;
r = heap_open(lobjId, AccessShareLock);
indname = pstrdup(RelationGetRelationName(r)); if (LargeobjectFind(lobjId) == 0)
elog(ERROR, "inv_open: large object %d not found", lobjId);
/*
* hack hack hack... we know that the fourth character of the
* relation name is a 'v', and that the fourth character of the index
* name is an 'x', and that they're otherwise identical.
*/
indname[3] = 'x';
indrel = index_openr(indname);
if (!RelationIsValid(indrel)) retval = (LargeObjectDesc *)palloc(sizeof(LargeObjectDesc));
return (LargeObjectDesc *) NULL;
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); if (flags & INV_WRITE) {
retval->heap_r = r;
retval->index_r = indrel;
retval->iscan = (IndexScanDesc) NULL;
retval->hdesc = RelationGetDescr(r);
retval->idesc = RelationGetDescr(indrel);
retval->offset = retval->lowbyte = retval->highbyte = 0;
ItemPointerSetInvalid(&(retval->htid));
retval->flags = 0;
if (flags & INV_WRITE)
{
LockRelation(r, ExclusiveLock);
retval->flags = IFS_WRLOCK | IFS_RDLOCK; retval->flags = IFS_WRLOCK | IFS_RDLOCK;
} retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
else if (flags & INV_READ) } else if (flags & INV_READ) {
{
LockRelation(r, ShareLock);
retval->flags = IFS_RDLOCK; retval->flags = IFS_RDLOCK;
} retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
} else
elog(ERROR, "inv_open: invalid flags: %d", flags);
retval->index_r = index_openr(LargeobjectLOIdPNIndex);
retval->offset = 0;
retval->id = lobjId;
return retval; return retval;
} }
...@@ -261,15 +130,11 @@ inv_close(LargeObjectDesc *obj_desc) ...@@ -261,15 +130,11 @@ inv_close(LargeObjectDesc *obj_desc)
{ {
Assert(PointerIsValid(obj_desc)); Assert(PointerIsValid(obj_desc));
if (obj_desc->iscan != (IndexScanDesc) NULL) if (obj_desc->flags & IFS_WRLOCK)
{ heap_close(obj_desc->heap_r, RowExclusiveLock);
index_endscan(obj_desc->iscan); else if (obj_desc->flags & IFS_RDLOCK)
obj_desc->iscan = NULL;
}
index_close(obj_desc->index_r);
heap_close(obj_desc->heap_r, AccessShareLock); heap_close(obj_desc->heap_r, AccessShareLock);
index_close(obj_desc->index_r);
pfree(obj_desc); pfree(obj_desc);
} }
...@@ -281,24 +146,7 @@ inv_close(LargeObjectDesc *obj_desc) ...@@ -281,24 +146,7 @@ inv_close(LargeObjectDesc *obj_desc)
int int
inv_drop(Oid lobjId) inv_drop(Oid lobjId)
{ {
Relation r; LargeobjectDrop(lobjId);
r = RelationIdGetRelation(lobjId);
if (!RelationIsValid(r))
return -1;
if (r->rd_rel->relkind != RELKIND_LOBJECT)
{
/* drop relcache refcount from RelationIdGetRelation */
RelationDecrementReferenceCount(r);
return -1;
}
/*
* Since heap_drop_with_catalog will destroy the relcache entry,
* there's no need to drop the refcount in this path.
*/
heap_drop_with_catalog(RelationGetRelationName(r), false);
return 1; return 1;
} }
...@@ -364,71 +212,75 @@ inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf) ...@@ -364,71 +212,75 @@ inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
#endif #endif
int static uint32 inv_getsize(LargeObjectDesc *obj_desc) {
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) uint32 found = 0;
{ uint32 lastbyte = 0;
int oldOffset;
Datum d;
ScanKeyData skey; ScanKeyData skey;
IndexScanDesc sd = (IndexScanDesc) NULL;
RetrieveIndexResult indexRes;
HeapTupleData tuple;
Buffer buffer;
Form_pg_largeobject data;
Assert(PointerIsValid(obj_desc)); Assert(PointerIsValid(obj_desc));
if (whence == SEEK_CUR) ScanKeyEntryInitialize(&skey,
{ (bits16) 0x0,
offset += obj_desc->offset; /* calculate absolute position */ (AttrNumber) 1,
} (RegProcedure) F_OIDEQ,
else if (whence == SEEK_END) ObjectIdGetDatum(obj_desc->id));
{
/* need read lock for getsize */ sd = index_beginscan(obj_desc->index_r, true, 1, &skey);
if (!(obj_desc->flags & IFS_RDLOCK)) tuple.t_datamcxt = CurrentMemoryContext;
{ tuple.t_data = NULL;
LockRelation(obj_desc->heap_r, ShareLock); while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
obj_desc->flags |= IFS_RDLOCK; tuple.t_self = indexRes->heap_iptr;
} heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
offset += _inv_getsize(obj_desc->heap_r, pfree(indexRes);
obj_desc->hdesc, if (tuple.t_data == NULL)
obj_desc->index_r); continue;
found++;
data = (Form_pg_largeobject) GETSTRUCT(&tuple);
lastbyte = data->pageno * IBLKSIZE + getbytealen(&(data->data));
ReleaseBuffer(buffer);
break;
} }
/* now we can assume that the operation is SEEK_SET */
/* index_endscan(sd);
* Whenever we do a seek, we turn off the EOF flag bit to force
* ourselves to check for real on the next read.
*/
obj_desc->flags &= ~IFS_ATEOF; if (found == 0)
oldOffset = obj_desc->offset; elog(ERROR, "inv_getsize: large object %d not found", obj_desc->id);
obj_desc->offset = offset; return lastbyte;
}
/* try to avoid doing any work, if we can manage it */ int
if (offset >= obj_desc->lowbyte inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
&& offset <= obj_desc->highbyte {
&& oldOffset <= obj_desc->highbyte Assert(PointerIsValid(obj_desc));
&& obj_desc->iscan != (IndexScanDesc) NULL)
return offset;
/* switch (whence) {
* To do a seek on an inversion file, we start an index scan that will case SEEK_SET:
* bring us to the right place. Each tuple in an inversion file if (offset < 0)
* stores the offset of the last byte that appears on it, and we have elog(ERROR, "inv_seek: invalid offset: %d", offset);
* an index on this. obj_desc->offset = offset;
*/ break;
if (obj_desc->iscan != (IndexScanDesc) NULL) case SEEK_CUR:
if ((obj_desc->offset + offset) < 0)
elog(ERROR, "inv_seek: invalid offset: %d", offset);
obj_desc->offset += offset;
break;
case SEEK_END:
{ {
d = Int32GetDatum(offset); int4 size = inv_getsize(obj_desc);
btmovescan(obj_desc->iscan, d); if (offset > size)
elog(ERROR, "inv_seek: invalid offset");
obj_desc->offset = size - offset;
} }
else break;
{ default:
ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, elog(ERROR, "inv_seek: invalid whence: %d", whence);
Int32GetDatum(offset));
obj_desc->iscan = index_beginscan(obj_desc->index_r,
(bool) 0, (uint16) 1,
&skey);
} }
return obj_desc->offset;
return offset;
} }
int int
...@@ -442,862 +294,259 @@ inv_tell(LargeObjectDesc *obj_desc) ...@@ -442,862 +294,259 @@ inv_tell(LargeObjectDesc *obj_desc)
int int
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{ {
uint32 nread = 0;
uint32 n;
uint32 off;
uint32 len;
uint32 found = 0;
uint32 pageno = obj_desc->offset / IBLKSIZE;
ScanKeyData skey[2];
IndexScanDesc sd = (IndexScanDesc) NULL;
RetrieveIndexResult indexRes;
HeapTupleData tuple; HeapTupleData tuple;
int nread; Buffer buffer;
int off; Form_pg_largeobject data;
int ncopy;
Datum d;
struct varlena *fsblock;
bool isNull;
Assert(PointerIsValid(obj_desc)); Assert(PointerIsValid(obj_desc));
Assert(buf != NULL); Assert(buf != NULL);
/* if we're already at EOF, we don't need to do any work here */ ScanKeyEntryInitialize(&skey[0],
if (obj_desc->flags & IFS_ATEOF) (bits16) 0x0,
return 0; (AttrNumber) 1,
(RegProcedure) F_OIDEQ,
ObjectIdGetDatum(obj_desc->id));
/* make sure we obey two-phase locking */ ScanKeyEntryInitialize(&skey[1],
if (!(obj_desc->flags & IFS_RDLOCK)) (bits16) 0x0,
{ (AttrNumber) 2,
LockRelation(obj_desc->heap_r, ShareLock); (RegProcedure) F_INT4GE,
obj_desc->flags |= IFS_RDLOCK; Int32GetDatum(pageno));
}
nread = 0; sd = index_beginscan(obj_desc->index_r, false, 2, skey);
tuple.t_datamcxt = CurrentMemoryContext;
tuple.t_data = NULL;
while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
tuple.t_self = indexRes->heap_iptr;
heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
pfree(indexRes);
/* fetch a block at a time */ if (tuple.t_data == NULL)
while (nread < nbytes) continue;
{
Buffer buffer;
/* fetch an inversion file system block */ found++;
inv_fetchtup(obj_desc, &tuple, &buffer); data = (Form_pg_largeobject) GETSTRUCT(&tuple);
if (data->pageno != pageno) {
ReleaseBuffer(buffer);
index_endscan(sd);
return 0;
}
if (tuple.t_data == NULL) len = getbytealen(&(data->data));
{ off = obj_desc->offset % IBLKSIZE;
obj_desc->flags |= IFS_ATEOF; if (off == len) {
ReleaseBuffer(buffer);
break; break;
} }
if (off > len) {
/* copy the data from this block into the buffer */
d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
fsblock = (struct varlena *) DatumGetPointer(d);
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
index_endscan(sd);
return 0;
}
n = len - off;
/* n = (n < (nbytes - nread)) ? n : (nbytes - nread);
* If block starts beyond current seek point, then we are looking memcpy(buf + nread, VARDATA(&(data->data)) + off, n);
* at a "hole" (unwritten area) in the object. Return zeroes for nread += n;
* the "hole". obj_desc->offset += n;
*/
if (obj_desc->offset < obj_desc->lowbyte) ReleaseBuffer(buffer);
{ pageno++;
int nzeroes = obj_desc->lowbyte - obj_desc->offset; if (nread == nbytes)
if (nzeroes > (nbytes - nread))
nzeroes = (nbytes - nread);
MemSet(buf, 0, nzeroes);
buf += nzeroes;
nread += nzeroes;
obj_desc->offset += nzeroes;
if (nread >= nbytes)
break; break;
} }
off = obj_desc->offset - obj_desc->lowbyte; index_endscan(sd);
ncopy = obj_desc->highbyte - obj_desc->offset + 1;
if (ncopy > (nbytes - nread))
ncopy = (nbytes - nread);
memmove(buf, &(fsblock->vl_dat[off]), ncopy);
/* move pointers past the amount we just read */ if (found == 0)
buf += ncopy; return 0;
nread += ncopy;
obj_desc->offset += ncopy;
}
return nread; return nread;
} }
int static int inv_write_existing(LargeObjectDesc *obj_desc, char *buf, int nbytes, int *found) {
inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) uint32 n = 0;
{ uint32 off;
uint32 len;
int i;
HeapTupleData tuple; HeapTupleData tuple;
int nwritten; HeapTuple newtup;
int tuplen; Buffer buffer;
Form_pg_largeobject data;
ScanKeyData skey[2];
IndexScanDesc sd = (IndexScanDesc) NULL;
RetrieveIndexResult indexRes;
Relation idescs[Num_pg_largeobject_indices];
Datum values[Natts_pg_largeobject];
char nulls[Natts_pg_largeobject];
char replace[Natts_pg_largeobject];
Assert(PointerIsValid(obj_desc)); Assert(PointerIsValid(obj_desc));
Assert(buf != NULL); Assert(buf != NULL);
/* ScanKeyEntryInitialize(&skey[0],
* Make sure we obey two-phase locking. A write lock entitles you to (bits16) 0,
* read the relation, as well. (AttrNumber) 1,
*/ (RegProcedure) F_OIDEQ,
ObjectIdGetDatum(obj_desc->id));
if (!(obj_desc->flags & IFS_WRLOCK)) ScanKeyEntryInitialize(&skey[1],
{ (bits16) 0x0,
LockRelation(obj_desc->heap_r, ExclusiveLock); (AttrNumber) 2,
obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK); (RegProcedure) F_INT4EQ,
} Int32GetDatum(obj_desc->offset / IBLKSIZE));
nwritten = 0;
/* write a block at a time */ CommandCounterIncrement();
while (nwritten < nbytes) sd = index_beginscan(obj_desc->index_r, false, 2, skey);
{ tuple.t_datamcxt = CurrentMemoryContext;
Buffer buffer;
/*
* Fetch the current inversion file system block. We can skip
* the work if we already know we are at EOF.
*/
if (obj_desc->flags & IFS_ATEOF)
tuple.t_data = NULL; tuple.t_data = NULL;
else while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
inv_fetchtup(obj_desc, &tuple, &buffer); tuple.t_self = indexRes->heap_iptr;
heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
/* either append or replace a block, as required */ pfree(indexRes);
if (tuple.t_data == NULL) if (tuple.t_data != NULL)
tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten); break;
else
{
if (obj_desc->offset > obj_desc->highbyte)
{
tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
ReleaseBuffer(buffer);
}
else
tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
/*
* inv_wrold() has already issued WriteBuffer() which has
* decremented local reference counter (LocalRefCount). So we
* should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
*/
}
/* move pointers past the amount we just wrote */
buf += tuplen;
nwritten += tuplen;
obj_desc->offset += tuplen;
}
/* that's it */
return nwritten;
}
/*
* inv_cleanindex
* Clean opened indexes for large objects, and clears current result.
* This is necessary on transaction commit in order to prevent buffer
* leak.
* This function must be called for each opened large object.
* [ PA, 7/17/98 ]
*/
void
inv_cleanindex(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
if (obj_desc->iscan == (IndexScanDesc) NULL)
return;
index_endscan(obj_desc->iscan);
obj_desc->iscan = (IndexScanDesc) NULL;
ItemPointerSetInvalid(&(obj_desc->htid));
}
/*
* inv_fetchtup -- Fetch an inversion file system block.
*
* This routine finds the file system block containing the offset
* recorded in the obj_desc structure. Later, we need to think about
* the effects of non-functional updates (can you rewrite the same
* block twice in a single transaction?), but for now, we won't bother.
*
* Parameters:
* obj_desc -- the object descriptor.
* bufP -- pointer to a buffer in the buffer cache; caller
* must free this.
*
* Returns:
* A heap tuple containing the desired block, or NULL if no
* such tuple exists.
*/
static void
inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
{
RetrieveIndexResult res;
Datum d;
int firstbyte,
lastbyte;
struct varlena *fsblock;
bool isNull;
/*
* If we've exhausted the current block, we need to get the next one.
* When we support time travel and non-functional updates, we will
* need to loop over the blocks, rather than just have an 'if', in
* order to find the one we're really interested in.
*/
if (obj_desc->offset > obj_desc->highbyte
|| obj_desc->offset < obj_desc->lowbyte
|| !ItemPointerIsValid(&(obj_desc->htid)))
{
ScanKeyData skey;
ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
Int32GetDatum(obj_desc->offset));
/* initialize scan key if not done */
if (obj_desc->iscan == (IndexScanDesc) NULL)
{
/*
* As scan index may be prematurely closed (on commit), we
* must use object current offset (was 0) to reinitialize the
* entry [ PA ].
*/
obj_desc->iscan = index_beginscan(obj_desc->index_r,
(bool) 0, (uint16) 1,
&skey);
}
else
index_rescan(obj_desc->iscan, false, &skey);
do
{
res = index_getnext(obj_desc->iscan, ForwardScanDirection);
if (res == (RetrieveIndexResult) NULL)
{
ItemPointerSetInvalid(&(obj_desc->htid));
tuple->t_datamcxt = NULL;
tuple->t_data = NULL;
return;
}
/*
* For time travel, we need to use the actual time qual here,
* rather that NowTimeQual. We currently have no way to pass
* a time qual in.
*
* This is now valid for snapshot !!! And should be fixed in some
* way... - vadim 07/28/98
*
*/
tuple->t_self = res->heap_iptr;
heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
pfree(res);
} while (tuple->t_data == NULL);
/* remember this tid -- we may need it for later reads/writes */
ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
}
else
{
tuple->t_self = obj_desc->htid;
heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
if (tuple->t_data == NULL)
elog(ERROR, "inv_fetchtup: heap_fetch failed");
} }
/* index_endscan(sd);
* By here, we have the heap tuple we're interested in. We cache the if (tuple.t_data == NULL)
* upper and lower bounds for this block in the object descriptor and return 0;
* return the tuple.
*/
d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
lastbyte = (int32) DatumGetInt32(d);
d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
fsblock = (struct varlena *) DatumGetPointer(d);
/*
* order of + and - is important -- these are unsigned quantites near
* 0
*/
firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
obj_desc->lowbyte = firstbyte;
obj_desc->highbyte = lastbyte;
return;
}
/*
* inv_wrnew() -- append a new filesystem block tuple to the inversion
* file.
*
* In response to an inv_write, we append one or more file system
* blocks to the class containing the large object. We violate the
* class abstraction here in order to pack things as densely as we
* are able. We examine the last page in the relation, and write
* just enough to fill it, assuming that it has above a certain
* threshold of space available. If the space available is less than
* the threshold, we allocate a new page by writing a big tuple.
*
* By the time we get here, we know all the parameters passed in
* are valid, and that we hold the appropriate lock on the heap
* relation.
*
* Parameters:
* obj_desc: large object descriptor for which to append block.
* buf: buffer containing data to write.
* nbytes: amount to write
*
* Returns:
* number of bytes actually written to the new tuple.
*/
static int
inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{
Relation hr;
HeapTuple ntup;
Buffer buffer;
Page page;
int nblocks;
int nwritten;
hr = obj_desc->heap_r;
/*
* Get the last block in the relation. If there's no data in the
* relation at all, then we just get a new block. Otherwise, we check
* the last block to see whether it has room to accept some or all of
* the data that the user wants to write. If it doesn't, then we
* allocate a new block.
*/
nblocks = RelationGetNumberOfBlocks(hr);
if (nblocks > 0)
{
buffer = ReadBuffer(hr, nblocks - 1);
page = BufferGetPage(buffer);
}
else
{
buffer = ReadBuffer(hr, P_NEW);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
}
/* (*found)++;
* If the last page is too small to hold all the data, and it's too data = (Form_pg_largeobject) GETSTRUCT(&tuple);
* small to hold IMINBLK, then we allocate a new page. If it will off = obj_desc->offset % IBLKSIZE;
* hold at least IMINBLK, but less than all the data requested, then len = getbytealen(&(data->data));
* we write IMINBLK here. The caller is responsible for noticing that
* less than the requested number of bytes were written, and calling
* this routine again.
*/
nwritten = IFREESPC(page); if (len > IBLKSIZE) {
if (nwritten < nbytes)
{
if (nwritten < IMINBLK)
{
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
buffer = ReadBuffer(hr, P_NEW); elog(FATAL, "Internal error: len > IBLKSIZE");
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
if (nbytes > IMAXBLK)
nwritten = IMAXBLK;
else
nwritten = nbytes;
}
} }
else
nwritten = nbytes;
/*
* Insert a new file system block tuple, index it, and write it out.
*/
ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten); for (i=0; i<Natts_pg_largeobject; i++) {
inv_indextup(obj_desc, ntup); nulls[i] = ' ';
heap_freetuple(ntup); replace[i] = ' ';
values[i] = (Datum)NULL;
/* new tuple is inserted */
WriteBuffer(buffer);
return nwritten;
}
static int
inv_wrold(LargeObjectDesc *obj_desc,
char *dbuf,
int nbytes,
HeapTuple tuple,
Buffer buffer)
{
Relation hr;
HeapTuple ntup;
Buffer newbuf;
Page page;
Page newpage;
int tupbytes;
Datum d;
struct varlena *fsblock;
int nwritten,
nblocks,
freespc;
bool isNull;
int keep_offset;
RetrieveIndexResult res;
/*
* Since we're using a no-overwrite storage manager, the way we
* overwrite blocks is to mark the old block invalid and append a new
* block. First mark the old block invalid. This violates the tuple
* abstraction.
*/
TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
tuple->t_data->t_cmax = GetCurrentCommandId();
tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
/*
* If we're overwriting the entire block, we're lucky. All we need to
* do is to insert a new block.
*/
if (obj_desc->offset == obj_desc->lowbyte
&& obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
{
WriteBuffer(buffer);
return inv_wrnew(obj_desc, dbuf, nbytes);
} }
/* i = 0;
* By here, we need to overwrite part of the data in the current
* tuple. In order to reduce the degree to which we fragment blocks,
* we guarantee that no block will be broken up due to an overwrite.
* This means that we need to allocate a tuple on a new page, if
* there's not room for the replacement on this one.
*/
newbuf = buffer;
page = BufferGetPage(buffer);
newpage = BufferGetPage(newbuf);
hr = obj_desc->heap_r;
freespc = IFREESPC(page);
d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
fsblock = (struct varlena *) DatumGetPointer(d);
tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
if (freespc < tupbytes)
{ {
char b[IBLKSIZE];
int4 rest = len - off;
/* memset(b, 0, IBLKSIZE); /* Can optimize later */
* First see if there's enough space on the last page of the table if ((off > 0) && (len > 0)) /* We start in the middle of the tuple */
* to put this tuple. memcpy(b, VARDATA(&(data->data)), (off > len) ? len : off);
*/
nblocks = RelationGetNumberOfBlocks(hr);
if (nblocks > 0) if ((nbytes <= rest) || (len == IBLKSIZE)) {
{ /* We will update inside existing tuple size */
newbuf = ReadBuffer(hr, nblocks - 1); if (nbytes < rest)
newpage = BufferGetPage(newbuf); n = rest;
}
else else
{ n = nbytes;
newbuf = ReadBuffer(hr, P_NEW); memcpy(b + off, buf, n);
newpage = BufferGetPage(newbuf); if (n < rest) /* There's a rest of the tuple left */
PageInit(newpage, BufferGetPageSize(newbuf), 0); memcpy(b + off + n, VARDATA(&(data->data)) + off + n, rest - n);
/* Update data only */
replace[2] = 'r';
values[2] = (Datum) _byteain(b, len);
} else {
/* We will extend tuple */
/* Do we fit into max tuple size */
if (nbytes <= (IBLKSIZE - off))
len = off + nbytes;
else
len = IBLKSIZE;
n = len - off;
memcpy(b + off, buf, n);
/* Update data */
replace[2] = 'r';
values[2] = (Datum) _byteain(b, len);
} }
freespc = IFREESPC(newpage); newtup = heap_modifytuple(&tuple, obj_desc->heap_r,
values, nulls, replace);
/* heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
* If there's no room on the last page, allocate a new last page if (!IsIgnoringSystemIndexes()) {
* for the table, and put it there. CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
*/ CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, newtup);
CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
if (freespc < tupbytes)
{
ReleaseBuffer(newbuf);
newbuf = ReadBuffer(hr, P_NEW);
newpage = BufferGetPage(newbuf);
PageInit(newpage, BufferGetPageSize(newbuf), 0);
} }
heap_freetuple(newtup);
} }
ReleaseBuffer(buffer);
nwritten = nbytes; return n;
if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
nwritten = obj_desc->highbyte - obj_desc->offset + 1;
memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
dbuf, nwritten);
/*
* we are rewriting the entire old block, therefore we reset offset to
* the lowbyte of the original block before jumping into
* inv_newtuple()
*/
keep_offset = obj_desc->offset;
obj_desc->offset = obj_desc->lowbyte;
ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
tupbytes);
/* after we are done, we restore to the true offset */
obj_desc->offset = keep_offset;
/*
* By here, we have a page (newpage) that's guaranteed to have enough
* space on it to put the new tuple. Call inv_newtuple to do the
* work. Passing NULL as a buffer to inv_newtuple() keeps it from
* copying any data into the new tuple. When it returns, the tuple is
* ready to receive data from the old tuple and the user's data
* buffer.
*/
/*
ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
dptr = ((char *) ntup) + ntup->t_hoff -
(sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
sizeof(int4)
+ sizeof(fsblock->vl_len);
if (obj_desc->offset > obj_desc->lowbyte) {
memmove(dptr,
&(fsblock->vl_dat[0]),
obj_desc->offset - obj_desc->lowbyte);
dptr += obj_desc->offset - obj_desc->lowbyte;
}
nwritten = nbytes;
if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
nwritten = obj_desc->highbyte - obj_desc->offset + 1;
memmove(dptr, dbuf, nwritten);
dptr += nwritten;
if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
*/
/*
loc = (obj_desc->highbyte - obj_desc->offset)
+ nwritten;
sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
what's going on here?? - jolly
*/
/*
sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
memmove(&(fsblock->vl_dat[0]), dptr, sz);
}
*/
/* index the new tuple */
inv_indextup(obj_desc, ntup);
heap_freetuple(ntup);
/*
* move the scandesc forward so we don't reread the newly inserted
* tuple on the next index scan
*/
res = NULL;
if (obj_desc->iscan)
res = index_getnext(obj_desc->iscan, ForwardScanDirection);
if (res)
pfree(res);
/*
* Okay, by here, a tuple for the new block is correctly placed,
* indexed, and filled. Write the changed pages out.
*/
WriteBuffer(buffer);
if (newbuf != buffer)
WriteBuffer(newbuf);
/* Tuple id is no longer valid */
ItemPointerSetInvalid(&(obj_desc->htid));
/* done */
return nwritten;
} }
static HeapTuple static int inv_write_append(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
inv_newtuple(LargeObjectDesc *obj_desc,
Buffer buffer,
Page page,
char *dbuf,
int nwrite)
{
HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData)); HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
PageHeader ph; Relation idescs[Num_pg_largeobject_indices];
int tupsize; Datum values[Natts_pg_largeobject];
int hoff; char nulls[Natts_pg_largeobject];
Offset lower;
Offset upper;
ItemId itemId;
OffsetNumber off;
OffsetNumber limit;
char *attptr;
/* compute tuple size -- no nulls */
hoff = offsetof(HeapTupleHeaderData, t_bits);
hoff = MAXALIGN(hoff);
/* add in olastbyte, varlena.vl_len, varlena.vl_dat */
tupsize = hoff + (2 * sizeof(int32)) + nwrite;
tupsize = MAXALIGN(tupsize);
/*
* Allocate the tuple on the page, violating the page abstraction.
* This code was swiped from PageAddItem().
*/
ph = (PageHeader) page;
limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
/* look for "recyclable" (unused & deallocated) ItemId */
for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
{
itemId = &ph->pd_linp[off - 1];
if ((((*itemId).lp_flags & LP_USED) == 0) &&
((*itemId).lp_len == 0))
break;
}
if (off > limit)
lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
else if (off == limit)
lower = ph->pd_lower + sizeof(ItemIdData);
else
lower = ph->pd_lower;
upper = ph->pd_upper - tupsize;
itemId = &ph->pd_linp[off - 1];
(*itemId).lp_off = upper;
(*itemId).lp_len = tupsize;
(*itemId).lp_flags = LP_USED;
ph->pd_lower = lower;
ph->pd_upper = upper;
ntup->t_datamcxt = NULL;
ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
/*
* Tuple is now allocated on the page. Next, fill in the tuple
* header. This block of code violates the tuple abstraction.
*/
ntup->t_len = tupsize;
ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
ntup->t_data->t_oid = newoid();
TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
ntup->t_data->t_cmin = GetCurrentCommandId();
StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
ntup->t_data->t_cmax = 0;
ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
ntup->t_data->t_natts = 2;
ntup->t_data->t_hoff = hoff;
/* if a NULL is passed in, avoid the calculations below */
if (dbuf == NULL)
return ntup;
/*
* Finally, copy the user's data buffer into the tuple. This violates
* the tuple and class abstractions.
*/
attptr = ((char *) ntup->t_data) + hoff;
*((int32 *) attptr) = obj_desc->offset + nwrite - 1;
attptr += sizeof(int32);
/*
* * mer fixed disk layout of varlenas to get rid of the need for
* this. *
*
* ((int32 *) attptr) = nwrite + sizeof(int32); * attptr +=
* sizeof(int32);
*/
*((int32 *) attptr) = nwrite + sizeof(int32);
attptr += sizeof(int32);
/*
* If a data buffer was passed in, then copy the data from the buffer
* to the tuple. Some callers (eg, inv_wrold()) may not pass in a
* buffer, since they have to copy part of the old tuple data and part
* of the user's new data into the new tuple.
*/
if (dbuf != (char *) NULL)
memmove(attptr, dbuf, nwrite);
/* keep track of boundary of current tuple */
obj_desc->lowbyte = obj_desc->offset;
obj_desc->highbyte = obj_desc->offset + nwrite - 1;
/* new tuple is filled -- return it */
return ntup;
}
static void
inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
{
InsertIndexResult res;
Datum v[1];
char n[1];
n[0] = ' ';
v[0] = Int32GetDatum(obj_desc->highbyte);
res = index_insert(obj_desc->index_r, &v[0], &n[0],
&(tuple->t_self), obj_desc->heap_r);
if (res)
pfree(res);
}
#ifdef NOT_USED
static void
DumpPage(Page page, int blkno)
{
ItemId lp;
HeapTuple tup;
int flags, i, nline;
ItemPointerData pointerData;
printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
((PageHeader)page)->pd_special);
printf("\t:MaxOffsetNumber=%d\n",
(int16) PageGetMaxOffsetNumber(page));
nline = (int16) PageGetMaxOffsetNumber(page);
{
int i; int i;
char *cp; uint32 len;
i = PageGetSpecialSize(page);
cp = PageGetSpecialPointer(page);
printf("\t:SpecialData=");
while (i > 0) { for (i=0; i<Natts_pg_largeobject; i++) {
printf(" 0x%02x", *cp); nulls[i] = ' ';
cp += 1; values[i] = (Datum)NULL;
i -= 1;
} }
printf("\n");
}
for (i = 0; i < nline; i++) {
lp = ((PageHeader)page)->pd_linp + i;
flags = (*lp).lp_flags;
ItemPointerSet(&pointerData, blkno, 1 + i);
printf("%s:off=%d:flags=0x%x:len=%d",
ItemPointerFormExternal(&pointerData), (*lp).lp_off,
flags, (*lp).lp_len);
if (flags & LP_USED) {
HeapTupleData htdata;
printf(":USED");
memmove((char *) &htdata,
(char *) &((char *)page)[(*lp).lp_off],
sizeof(htdata));
tup = &htdata;
printf("\n\t:ctid=%s:oid=%d",
ItemPointerFormExternal(&tup->t_ctid),
tup->t_oid);
printf(":natts=%d:thoff=%d:",
tup->t_natts,
tup->t_hoff);
printf("\n\t:cmin=%u:", i = 0;
tup->t_cmin); values[i++] = ObjectIdGetDatum(obj_desc->id);
len = (nbytes > IBLKSIZE) ? IBLKSIZE : nbytes;
printf("xmin=%u:", tup->t_xmin); values[i++] = Int32GetDatum(obj_desc->offset / IBLKSIZE);
values[i++] = (Datum) _byteain(buf, len);
printf("\n\t:cmax=%u:", ntup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
tup->t_cmax); heap_insert(obj_desc->heap_r, ntup);
printf("xmax=%u:\n", tup->t_xmax); if (!IsIgnoringSystemIndexes()) {
CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
} else CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, ntup);
putchar('\n'); CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
} }
}
static char* heap_freetuple(ntup);
ItemPointerFormExternal(ItemPointer pointer)
{
static char itemPointerString[32];
if (!ItemPointerIsValid(pointer)) {
memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
} else {
sprintf(itemPointerString, "<%u,%u>",
ItemPointerGetBlockNumber(pointer),
ItemPointerGetOffsetNumber(pointer));
}
return itemPointerString; return len;
} }
#endif static int inv_write_int(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
int nwritten = 0;
static int int found = 0;
_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
{
IndexScanDesc iscan;
RetrieveIndexResult res;
HeapTupleData tuple;
Datum d;
long size;
bool isNull;
Buffer buffer;
/* scan backwards from end */
iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
do
{
res = index_getnext(iscan, BackwardScanDirection);
/*
* If there are no more index tuples, then the relation is empty,
* so the file's size is zero.
*/
if (res == (RetrieveIndexResult) NULL) if (nbytes == 0)
{
index_endscan(iscan);
return 0; return 0;
}
/* nwritten = inv_write_existing(obj_desc, buf, nbytes, &found);
* For time travel, we need to use the actual time qual here, if (found > 0) {
* rather that NowTimeQual. We currently have no way to pass a obj_desc->offset += nwritten;
* time qual in. return nwritten;
*/ }
tuple.t_self = res->heap_iptr; /* Looks like we are beyond the end of the file */
heap_fetch(hreln, SnapshotNow, &tuple, &buffer); nwritten = inv_write_append(obj_desc, buf, nbytes);
pfree(res); obj_desc->offset += nwritten;
} while (tuple.t_data == NULL); return nwritten;
}
/* don't need the index scan anymore */ static int count = 0;
index_endscan(iscan);
/* get olastbyte attribute */ int
d = heap_getattr(&tuple, 1, hdesc, &isNull); inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
size = DatumGetInt32(d) + 1; int nwritten = 0;
ReleaseBuffer(buffer); while (nwritten < nbytes)
nwritten += inv_write_int(obj_desc, buf + nwritten, nbytes - nwritten);
return size; return nwritten;
} }
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.170 2000/10/13 00:43:31 pjw Exp $ * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.171 2000/10/21 15:55:26 momjian Exp $
* *
* Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
* *
...@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal) ...@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
fprintf(stderr, "%s saving BLOBs\n", g_comment_start); fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
/* Cursor to get all BLOB tables */ /* Cursor to get all BLOB tables */
appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT); appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT loid from pg_largeobject");
res = PQexec(g_conn, oidQry->data); res = PQexec(g_conn, oidQry->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: catname.h,v 1.14 2000/10/08 03:53:15 momjian Exp $ * $Id: catname.h,v 1.15 2000/10/21 15:55:28 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#define InheritsRelationName "pg_inherits" #define InheritsRelationName "pg_inherits"
#define InheritancePrecidenceListRelationName "pg_ipl" #define InheritancePrecidenceListRelationName "pg_ipl"
#define LanguageRelationName "pg_language" #define LanguageRelationName "pg_language"
#define LargeobjectRelationName "pg_largeobject"
#define ListenerRelationName "pg_listener" #define ListenerRelationName "pg_listener"
#define LogRelationName "pg_log" #define LogRelationName "pg_log"
#define OperatorClassRelationName "pg_opclass" #define OperatorClassRelationName "pg_opclass"
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: indexing.h,v 1.42 2000/10/08 03:53:15 momjian Exp $ * $Id: indexing.h,v 1.43 2000/10/21 15:55:28 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#define Num_pg_index_indices 2 #define Num_pg_index_indices 2
#define Num_pg_inherits_indices 1 #define Num_pg_inherits_indices 1
#define Num_pg_language_indices 2 #define Num_pg_language_indices 2
#define Num_pg_largeobject_indices 2
#define Num_pg_listener_indices 1 #define Num_pg_listener_indices 1
#define Num_pg_opclass_indices 2 #define Num_pg_opclass_indices 2
#define Num_pg_operator_indices 2 #define Num_pg_operator_indices 2
...@@ -62,6 +63,8 @@ ...@@ -62,6 +63,8 @@
#define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index" #define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index"
#define LanguageNameIndex "pg_language_name_index" #define LanguageNameIndex "pg_language_name_index"
#define LanguageOidIndex "pg_language_oid_index" #define LanguageOidIndex "pg_language_oid_index"
#define LargeobjectLOIdIndex "pg_largeobject_loid_index"
#define LargeobjectLOIdPNIndex "pg_largeobject_loid_pn_index"
#define ListenerPidRelnameIndex "pg_listener_pid_relname_index" #define ListenerPidRelnameIndex "pg_listener_pid_relname_index"
#define OpclassDeftypeIndex "pg_opclass_deftype_index" #define OpclassDeftypeIndex "pg_opclass_deftype_index"
#define OpclassNameIndex "pg_opclass_name_index" #define OpclassNameIndex "pg_opclass_name_index"
...@@ -92,6 +95,7 @@ extern char *Name_pg_group_indices[]; ...@@ -92,6 +95,7 @@ extern char *Name_pg_group_indices[];
extern char *Name_pg_index_indices[]; extern char *Name_pg_index_indices[];
extern char *Name_pg_inherits_indices[]; extern char *Name_pg_inherits_indices[];
extern char *Name_pg_language_indices[]; extern char *Name_pg_language_indices[];
extern char *Name_pg_largeobject_indices[];
extern char *Name_pg_listener_indices[]; extern char *Name_pg_listener_indices[];
extern char *Name_pg_opclass_indices[]; extern char *Name_pg_opclass_indices[];
extern char *Name_pg_operator_indices[]; extern char *Name_pg_operator_indices[];
...@@ -191,6 +195,8 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli ...@@ -191,6 +195,8 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops)); DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops)); DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
DECLARE_INDEX(pg_largeobject_loid_index on pg_largeobject using btree(loid oid_ops));
DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops)); DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
/* This column needs to allow multiple zero entries, but is in the cache */ /* This column needs to allow multiple zero entries, but is in the cache */
DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops)); DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
......
/*-------------------------------------------------------------------------
*
* pg_largeobject.h
* definition of the system "largeobject" relation (pg_largeobject)
* along with the relation's initial contents.
*
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: pg_largeobject.h,v 1.3 2000/10/21 15:55:28 momjian Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
* information from the DATA() statements.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_LARGEOBJECT_H
#define PG_LARGEOBJECT_H
/* ----------------
* postgres.h contains the system type definintions and the
* CATALOG(), BOOTSTRAP and DATA() sugar words so this file
* can be read by both genbki.sh and the C compiler.
* ----------------
*/
/* ----------------
* pg_largeobject definition. cpp turns this into
* typedef struct FormData_pg_largeobject. Large object id
* is stored in loid;
* ----------------
*/
CATALOG(pg_largeobject)
{
Oid loid;
int4 pageno;
bytea data;
} FormData_pg_largeobject;
/* ----------------
* Form_pg_largeobject corresponds to a pointer to a tuple with
* the format of pg_largeobject relation.
* ----------------
*/
typedef FormData_pg_largeobject *Form_pg_largeobject;
/* ----------------
* compiler constants for pg_largeobject
* ----------------
*/
#define Natts_pg_largeobject 3
#define Anum_pg_largeobject_loid 1
#define Anum_pg_largeobject_pageno 2
#define Anum_pg_largeobject_data 3
Oid LargeobjectCreate(Oid loid);
void LargeobjectDrop(Oid loid);
int LargeobjectFind(Oid loid);
#endif /* PG_LARGEOBJECT_H */
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: large_object.h,v 1.15 2000/10/08 03:53:15 momjian Exp $ * $Id: large_object.h,v 1.16 2000/10/21 15:55:29 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -22,17 +22,11 @@ ...@@ -22,17 +22,11 @@
/* /*
* This structure will eventually have lots more stuff associated with it. * This structure will eventually have lots more stuff associated with it.
*/ */
typedef struct LargeObjectDesc typedef struct LargeObjectDesc {
{ Relation heap_r;
Relation heap_r; /* heap relation */ Relation index_r;
Relation index_r; /* index relation on seqno attribute */
IndexScanDesc iscan; /* index scan we're using */
TupleDesc hdesc; /* heap relation tuple desc */
TupleDesc idesc; /* index relation tuple desc */
uint32 lowbyte; /* low byte on the current page */
uint32 highbyte; /* high byte on the current page */
uint32 offset; /* current seek pointer */ uint32 offset; /* current seek pointer */
ItemPointerData htid; /* tid of current heap tuple */ Oid id;
#define IFS_RDLOCK (1 << 0) #define IFS_RDLOCK (1 << 0)
#define IFS_WRLOCK (1 << 1) #define IFS_WRLOCK (1 << 1)
...@@ -55,7 +49,4 @@ extern int inv_tell(LargeObjectDesc *obj_desc); ...@@ -55,7 +49,4 @@ extern int inv_tell(LargeObjectDesc *obj_desc);
extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes); extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes); extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
/* added for buffer leak prevention [ PA ] */
extern void inv_cleanindex(LargeObjectDesc *obj_desc);
#endif /* LARGE_OBJECT_H */ #endif /* LARGE_OBJECT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment