inv_api.c 32.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * inv_api.c
4 5
 *	  routines for manipulating inversion fs large objects. This file
 *	  contains the user-level large object application interface routines.
6
 *
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7 8
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.69 2000/06/05 07:28:45 tgl Exp $
13 14 15
 *
 *-------------------------------------------------------------------------
 */
16
#include <sys/types.h>
17
#include <sys/file.h>
18
#include <sys/stat.h>
Bruce Momjian's avatar
Bruce Momjian committed
19

Marc G. Fournier's avatar
Marc G. Fournier committed
20
#include "postgres.h"
Bruce Momjian's avatar
Bruce Momjian committed
21

22 23 24
#include "access/genam.h"
#include "access/heapam.h"
#include "access/nbtree.h"
Bruce Momjian's avatar
Bruce Momjian committed
25
#include "catalog/catalog.h"
Bruce Momjian's avatar
Bruce Momjian committed
26
#include "catalog/heap.h"
Bruce Momjian's avatar
Bruce Momjian committed
27 28 29
#include "catalog/index.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_type.h"
Bruce Momjian's avatar
Bruce Momjian committed
30 31 32
#include "libpq/libpq-fs.h"
#include "miscadmin.h"
#include "storage/large_object.h"
Bruce Momjian's avatar
Bruce Momjian committed
33
#include "storage/smgr.h"
34
#include "utils/fmgroids.h"
Bruce Momjian's avatar
Bruce Momjian committed
35
#include "utils/relcache.h"
36 37

/*
38 39 40 41 42 43 44 45
 *	Warning, Will Robinson...  In order to pack data into an inversion
 *	file as densely as possible, we violate the class abstraction here.
 *	When we're appending a new tuple to the end of the table, we check
 *	the last page to see how much data we can put on it.  If it's more
 *	than IMINBLK, we write enough to fill the page.  This limits external
 *	fragmentation.	In no case can we write more than IMAXBLK, since
 *	the 8K postgres page size less overhead leaves only this much space
 *	for data.
46 47
 */

48
/*
49 50 51 52 53 54
 *		In order to prevent buffer leak on transaction commit, large object
 *		scan index handling has been modified. Indexes are persistant inside
 *		a transaction but may be closed between two calls to this API (when
 *		transaction is committed while object is opened, or when no
 *		transaction is active). Scan indexes are thus now reinitialized using
 *		the object current offset. [PA]
55
 *
56
 *		Some cleanup has been also done for non freed memory.
57
 *
58
 *		For subsequent notes, [PA] is Pascal Andr <andre@via.ecp.fr>
59 60
 */

Bruce Momjian's avatar
Done.  
Bruce Momjian committed
61
#define IFREESPC(p)		(PageGetFreeSpace(p) - \
62 63
				 MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
				 MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
64
				 sizeof(double))
65 66
#define IMAXBLK			8092
#define IMINBLK			512
67 68

/* non-export function prototypes */
69
static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
70
			 Page page, char *dbuf, int nwrite);
71
static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
72
static int	inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
73
static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
74 75
		  HeapTuple tuple, Buffer buffer);
static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
76
static int	_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
77 78

/*
79
 *	inv_create -- create a new large object.
80
 *
81
 *		Arguments:
Bruce Momjian's avatar
Bruce Momjian committed
82
 *		  flags -- was archive, smgr
83
 *
84 85
 *		Returns:
 *		  large object descriptor, appropriately filled in.
86 87 88 89
 */
LargeObjectDesc *
inv_create(int flags)
{
90
	int			file_oid;
91
	LargeObjectDesc *retval;
92 93 94 95 96 97 98
	Relation	r;
	Relation	indr;
	TupleDesc	tupdesc;
	AttrNumber	attNums[1];
	Oid			classObjectId[1];
	char		objname[NAMEDATALEN];
	char		indname[NAMEDATALEN];
99 100 101 102 103 104 105 106 107

	/*
	 * add one here since the pg_class tuple created will have the next
	 * oid and we want to have the relation name to correspond to the
	 * tuple OID
	 */
	file_oid = newoid() + 1;

	/* come up with some table names */
108 109
	sprintf(objname, "xinv%u", file_oid);
	sprintf(indname, "xinx%u", file_oid);
110

111
	if (RelnameFindRelid(objname) != InvalidOid)
112
	{
113
		elog(ERROR,
114 115 116
		  "internal error: %s already exists -- cannot create large obj",
			 objname);
	}
117
	if (RelnameFindRelid(indname) != InvalidOid)
118
	{
119
		elog(ERROR,
Bruce Momjian's avatar
Bruce Momjian committed
120
		  "internal error: %s already exists -- cannot create large obj",
121 122 123 124 125 126 127
			 indname);
	}

	/* this is pretty painful...  want a tuple descriptor */
	tupdesc = CreateTemplateTupleDesc(2);
	TupleDescInitEntry(tupdesc, (AttrNumber) 1,
					   "olastbye",
128 129
					   INT4OID,
					   -1, 0, false);
130 131
	TupleDescInitEntry(tupdesc, (AttrNumber) 2,
					   "odata",
132 133
					   BYTEAOID,
					   -1, 0, false);
134 135 136 137 138 139

	/*
	 * First create the table to hold the inversion large object.  It will
	 * be located on whatever storage manager the user requested.
	 */

140
	heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT, false);
141 142 143 144

	/* make the relation visible in this transaction */
	CommandCounterIncrement();

145 146 147 148 149 150 151 152 153
	/*--------------------
	 * We hold AccessShareLock on any large object we have open
	 * by inv_create or inv_open; it is released by inv_close.
	 * Note this will not conflict with ExclusiveLock or ShareLock
	 * that we acquire when actually reading/writing; it just prevents
	 * deletion of the large object while we have it open.
	 *--------------------
	 */
	r = heap_openr(objname, AccessShareLock);
154 155 156 157 158 159 160 161 162 163 164 165 166 167

	/*
	 * Now create a btree index on the relation's olastbyte attribute to
	 * make seeks go faster.  The hardwired constants are embarassing to
	 * me, and are symptomatic of the pressure under which this code was
	 * written.
	 *
	 * ok, mao, let's put in some symbolic constants - jolly
	 */

	attNums[0] = 1;
	classObjectId[0] = INT4_OPS_OID;
	index_create(objname, indname, NULL, NULL, BTREE_AM_OID,
				 1, &attNums[0], &classObjectId[0],
168
				 0, (Datum) NULL, NULL, FALSE, FALSE, FALSE);
169 170 171 172 173 174 175

	/* make the index visible in this transaction */
	CommandCounterIncrement();
	indr = index_openr(indname);

	if (!RelationIsValid(indr))
	{
176
		elog(ERROR, "cannot create index for large obj on %s under inversion",
177 178
			 DatumGetCString(DirectFunctionCall1(smgrout,
							 Int16GetDatum(DEFAULT_SMGR))));
179 180 181 182 183 184 185
	}

	retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));

	retval->heap_r = r;
	retval->index_r = indr;
	retval->iscan = (IndexScanDesc) NULL;
186 187
	retval->hdesc = RelationGetDescr(r);
	retval->idesc = RelationGetDescr(indr);
188
	retval->offset = retval->lowbyte = retval->highbyte = 0;
189 190 191 192
	ItemPointerSetInvalid(&(retval->htid));

	if (flags & INV_WRITE)
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
193
		LockRelation(r, ExclusiveLock);
194 195 196 197
		retval->flags = IFS_WRLOCK | IFS_RDLOCK;
	}
	else if (flags & INV_READ)
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
198
		LockRelation(r, ShareLock);
199 200 201 202
		retval->flags = IFS_RDLOCK;
	}
	retval->flags |= IFS_ATEOF;

203
	return retval;
204 205 206 207 208
}

LargeObjectDesc *
inv_open(Oid lobjId, int flags)
{
209
	LargeObjectDesc *retval;
210 211 212
	Relation	r;
	char	   *indname;
	Relation	indrel;
213

214
	r = heap_open(lobjId, AccessShareLock);
215

216
	indname = pstrdup(RelationGetRelationName(r));
217 218 219 220 221 222 223 224 225 226

	/*
	 * hack hack hack...  we know that the fourth character of the
	 * relation name is a 'v', and that the fourth character of the index
	 * name is an 'x', and that they're otherwise identical.
	 */
	indname[3] = 'x';
	indrel = index_openr(indname);

	if (!RelationIsValid(indrel))
227
		return (LargeObjectDesc *) NULL;
228 229 230 231 232 233

	retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));

	retval->heap_r = r;
	retval->index_r = indrel;
	retval->iscan = (IndexScanDesc) NULL;
234 235
	retval->hdesc = RelationGetDescr(r);
	retval->idesc = RelationGetDescr(indrel);
236 237 238 239 240
	retval->offset = retval->lowbyte = retval->highbyte = 0;
	ItemPointerSetInvalid(&(retval->htid));

	if (flags & INV_WRITE)
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
241
		LockRelation(r, ExclusiveLock);
242 243 244 245
		retval->flags = IFS_WRLOCK | IFS_RDLOCK;
	}
	else if (flags & INV_READ)
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
246
		LockRelation(r, ShareLock);
247 248 249
		retval->flags = IFS_RDLOCK;
	}

250
	return retval;
251 252 253 254 255 256
}

/*
 * Closes an existing large object descriptor.
 */
void
257
inv_close(LargeObjectDesc *obj_desc)
258
{
259
	Assert(PointerIsValid(obj_desc));
260

261 262
	if (obj_desc->iscan != (IndexScanDesc) NULL)
	{
263
		index_endscan(obj_desc->iscan);
264 265
		obj_desc->iscan = NULL;
	}
266

267
	index_close(obj_desc->index_r);
268
	heap_close(obj_desc->heap_r, AccessShareLock);
269

270
	pfree(obj_desc);
271 272 273 274 275 276 277 278
}

/*
 * Destroys an existing large object, and frees its associated pointers.
 *
 * returns -1 if failed
 */
int
279
inv_drop(Oid lobjId)
280
{
281
	Relation	r;
282

283 284
	r = RelationIdGetRelation(lobjId);
	if (!RelationIsValid(r))
285
		return -1;
286

287 288 289 290 291 292 293
	if (r->rd_rel->relkind != RELKIND_LOBJECT)
	{
		/* drop relcache refcount from RelationIdGetRelation */
		RelationDecrementReferenceCount(r);
		return -1;
	}

294 295
	/*
	 * Since heap_drop_with_catalog will destroy the relcache entry,
296 297
	 * there's no need to drop the refcount in this path.
	 */
298
	heap_drop_with_catalog(RelationGetRelationName(r));
299
	return 1;
300 301 302
}

/*
303
 *	inv_stat() -- do a stat on an inversion file.
304
 *
305 306 307 308 309
 *		For the time being, this is an insanely expensive operation.  In
 *		order to find the size of the file, we seek to the last block in
 *		it and compute the size from that.	We scan pg_class to determine
 *		the file's owner and create time.  We don't maintain mod time or
 *		access time, yet.
310
 *
311 312 313
 *		These fields aren't stored in a table anywhere because they're
 *		updated so frequently, and postgres only appends tuples at the
 *		end of relations.  Once clustering works, we should fix this.
314
 */
315
#ifdef NOT_USED
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330

struct pgstat
{								/* just the fields we need from stat
								 * structure */
	int			st_ino;
	int			st_mode;
	unsigned int st_size;
	unsigned int st_sizehigh;	/* high order bits */
/* 2^64 == 1.8 x 10^20 bytes */
	int			st_uid;
	int			st_atime_s;		/* just the seconds */
	int			st_mtime_s;		/* since SysV and the new BSD both have */
	int			st_ctime_s;		/* usec fields.. */
};

331
int
332
inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
333
{
334 335 336 337 338 339
	Assert(PointerIsValid(obj_desc));
	Assert(stbuf != NULL);

	/* need read lock for stat */
	if (!(obj_desc->flags & IFS_RDLOCK))
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
340
		LockRelation(obj_desc->heap_r, ShareLock);
341 342
		obj_desc->flags |= IFS_RDLOCK;
	}
343

344
	stbuf->st_ino = RelationGetRelid(obj_desc->heap_r);
345
#if 1
346
	stbuf->st_mode = (S_IFREG | 0666);	/* IFREG|rw-rw-rw- */
347
#else
348
	stbuf->st_mode = 100666;	/* IFREG|rw-rw-rw- */
349
#endif
350 351 352
	stbuf->st_size = _inv_getsize(obj_desc->heap_r,
								  obj_desc->hdesc,
								  obj_desc->index_r);
353

354
	stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
355

356 357
	/* we have no good way of computing access times right now */
	stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
358

359
	return 0;
360
}
361

362
#endif
363 364

int
365
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
366
{
367 368 369
	int			oldOffset;
	Datum		d;
	ScanKeyData skey;
370

371
	Assert(PointerIsValid(obj_desc));
372

373 374 375
	if (whence == SEEK_CUR)
	{
		offset += obj_desc->offset;		/* calculate absolute position */
376
		return inv_seek(obj_desc, offset, SEEK_SET);
377
	}
378

379 380 381 382 383 384 385 386 387
	/*
	 * if you seek past the end (offset > 0) I have no clue what happens
	 * :-(				  B.L.	 9/1/93
	 */
	if (whence == SEEK_END)
	{
		/* need read lock for getsize */
		if (!(obj_desc->flags & IFS_RDLOCK))
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
388
			LockRelation(obj_desc->heap_r, ShareLock);
389 390 391 392 393
			obj_desc->flags |= IFS_RDLOCK;
		}
		offset += _inv_getsize(obj_desc->heap_r,
							   obj_desc->hdesc,
							   obj_desc->index_r);
394
		return inv_seek(obj_desc, offset, SEEK_SET);
395
	}
396

397 398 399 400
	/*
	 * Whenever we do a seek, we turn off the EOF flag bit to force
	 * ourselves to check for real on the next read.
	 */
401

402 403 404
	obj_desc->flags &= ~IFS_ATEOF;
	oldOffset = obj_desc->offset;
	obj_desc->offset = offset;
405

406 407 408 409 410
	/* try to avoid doing any work, if we can manage it */
	if (offset >= obj_desc->lowbyte
		&& offset <= obj_desc->highbyte
		&& oldOffset <= obj_desc->highbyte
		&& obj_desc->iscan != (IndexScanDesc) NULL)
411
		return offset;
412 413 414 415 416 417 418

	/*
	 * To do a seek on an inversion file, we start an index scan that will
	 * bring us to the right place.  Each tuple in an inversion file
	 * stores the offset of the last byte that appears on it, and we have
	 * an index on this.
	 */
419 420


421 422 423 424 425
	/* right now, just assume that the operation is SEEK_SET */
	if (obj_desc->iscan != (IndexScanDesc) NULL)
	{
		d = Int32GetDatum(offset);
		btmovescan(obj_desc->iscan, d);
426
	}
427 428
	else
	{
429

Bruce Momjian's avatar
Bruce Momjian committed
430
		ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
431
							   Int32GetDatum(offset));
432

433 434 435 436
		obj_desc->iscan = index_beginscan(obj_desc->index_r,
										  (bool) 0, (uint16) 1,
										  &skey);
	}
437

438
	return offset;
439
}
440

441
int
442
inv_tell(LargeObjectDesc *obj_desc)
443 444
{
	Assert(PointerIsValid(obj_desc));
445

446
	return obj_desc->offset;
447 448 449
}

int
450
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
451
{
Bruce Momjian's avatar
Bruce Momjian committed
452 453 454 455 456
	HeapTupleData tuple;
	int			nread;
	int			off;
	int			ncopy;
	Datum		d;
457
	struct varlena *fsblock;
Bruce Momjian's avatar
Bruce Momjian committed
458
	bool		isNull;
459 460 461 462 463 464

	Assert(PointerIsValid(obj_desc));
	Assert(buf != NULL);

	/* if we're already at EOF, we don't need to do any work here */
	if (obj_desc->flags & IFS_ATEOF)
465
		return 0;
466 467 468 469

	/* make sure we obey two-phase locking */
	if (!(obj_desc->flags & IFS_RDLOCK))
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
470
		LockRelation(obj_desc->heap_r, ShareLock);
471 472 473 474 475 476 477 478
		obj_desc->flags |= IFS_RDLOCK;
	}

	nread = 0;

	/* fetch a block at a time */
	while (nread < nbytes)
	{
479 480
		Buffer		buffer;

481
		/* fetch an inversion file system block */
482
		inv_fetchtup(obj_desc, &tuple, &buffer);
483

484
		if (tuple.t_data == NULL)
485 486 487 488
		{
			obj_desc->flags |= IFS_ATEOF;
			break;
		}
489

490
		/* copy the data from this block into the buffer */
491
		d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
492
		ReleaseBuffer(buffer);
493

494
		fsblock = (struct varlena *) DatumGetPointer(d);
495

496 497 498 499 500
		off = obj_desc->offset - obj_desc->lowbyte;
		ncopy = obj_desc->highbyte - obj_desc->offset + 1;
		if (ncopy > (nbytes - nread))
			ncopy = (nbytes - nread);
		memmove(buf, &(fsblock->vl_dat[off]), ncopy);
501

502 503 504 505 506 507
		/* move pointers past the amount we just read */
		buf += ncopy;
		nread += ncopy;
		obj_desc->offset += ncopy;
	}

508
	return nread;
509 510 511
}

int
512
inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
513
{
Bruce Momjian's avatar
Bruce Momjian committed
514 515 516
	HeapTupleData tuple;
	int			nwritten;
	int			tuplen;
517 518 519

	Assert(PointerIsValid(obj_desc));
	Assert(buf != NULL);
520 521

	/*
522 523
	 * Make sure we obey two-phase locking.  A write lock entitles you to
	 * read the relation, as well.
524 525
	 */

526 527
	if (!(obj_desc->flags & IFS_WRLOCK))
	{
528
		LockRelation(obj_desc->heap_r, ExclusiveLock);
529
		obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
530 531
	}

532 533 534 535 536
	nwritten = 0;

	/* write a block at a time */
	while (nwritten < nbytes)
	{
537 538
		Buffer		buffer;

539 540 541 542 543 544 545 546 547
		/*
		 * Fetch the current inversion file system block.  If the class
		 * storing the inversion file is empty, we don't want to do an
		 * index lookup, since index lookups choke on empty files (should
		 * be fixed someday).
		 */

		if ((obj_desc->flags & IFS_ATEOF)
			|| obj_desc->heap_r->rd_nblocks == 0)
548
			tuple.t_data = NULL;
549
		else
550
			inv_fetchtup(obj_desc, &tuple, &buffer);
551 552

		/* either append or replace a block, as required */
553
		if (tuple.t_data == NULL)
554 555 556 557
			tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
		else
		{
			if (obj_desc->offset > obj_desc->highbyte)
558
			{
559
				tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
560 561
				ReleaseBuffer(buffer);
			}
562
			else
563
				tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
Bruce Momjian's avatar
Bruce Momjian committed
564 565 566 567 568 569

			/*
			 * inv_wrold() has already issued WriteBuffer() which has
			 * decremented local reference counter (LocalRefCount). So we
			 * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
			 */
570
		}
571

572 573 574 575 576
		/* move pointers past the amount we just wrote */
		buf += tuplen;
		nwritten += tuplen;
		obj_desc->offset += tuplen;
	}
577

578
	/* that's it */
579
	return nwritten;
580 581
}

582
/*
583
 * inv_cleanindex
584 585 586 587 588
 *		 Clean opened indexes for large objects, and clears current result.
 *		 This is necessary on transaction commit in order to prevent buffer
 *		 leak.
 *		 This function must be called for each opened large object.
 *		 [ PA, 7/17/98 ]
589
 */
590
void
591 592
inv_cleanindex(LargeObjectDesc *obj_desc)
{
593
	Assert(PointerIsValid(obj_desc));
594

595 596
	if (obj_desc->iscan == (IndexScanDesc) NULL)
		return;
597 598 599

	index_endscan(obj_desc->iscan);
	obj_desc->iscan = (IndexScanDesc) NULL;
600

601 602 603
	ItemPointerSetInvalid(&(obj_desc->htid));
}

604
/*
605
 *	inv_fetchtup -- Fetch an inversion file system block.
606
 *
607 608 609 610
 *		This routine finds the file system block containing the offset
 *		recorded in the obj_desc structure.  Later, we need to think about
 *		the effects of non-functional updates (can you rewrite the same
 *		block twice in a single transaction?), but for now, we won't bother.
611
 *
612 613 614 615
 *		Parameters:
 *				obj_desc -- the object descriptor.
 *				bufP -- pointer to a buffer in the buffer cache; caller
 *						must free this.
616
 *
617 618 619
 *		Returns:
 *				A heap tuple containing the desired block, or NULL if no
 *				such tuple exists.
620
 */
621 622
static void
inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
623
{
624
	RetrieveIndexResult res;
625 626 627
	Datum		d;
	int			firstbyte,
				lastbyte;
628
	struct varlena *fsblock;
629
	bool		isNull;
630

631 632 633 634 635 636 637 638 639 640 641
	/*
	 * If we've exhausted the current block, we need to get the next one.
	 * When we support time travel and non-functional updates, we will
	 * need to loop over the blocks, rather than just have an 'if', in
	 * order to find the one we're really interested in.
	 */

	if (obj_desc->offset > obj_desc->highbyte
		|| obj_desc->offset < obj_desc->lowbyte
		|| !ItemPointerIsValid(&(obj_desc->htid)))
	{
Bruce Momjian's avatar
Bruce Momjian committed
642
		ScanKeyData skey;
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
643 644

		ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
Bruce Momjian's avatar
Bruce Momjian committed
645
							   Int32GetDatum(obj_desc->offset));
646

647 648 649
		/* initialize scan key if not done */
		if (obj_desc->iscan == (IndexScanDesc) NULL)
		{
Bruce Momjian's avatar
Bruce Momjian committed
650

651 652 653 654
			/*
			 * As scan index may be prematurely closed (on commit), we
			 * must use object current offset (was 0) to reinitialize the
			 * entry [ PA ].
655
			 */
656
			obj_desc->iscan = index_beginscan(obj_desc->index_r,
Bruce Momjian's avatar
Bruce Momjian committed
657 658 659 660 661
											  (bool) 0, (uint16) 1,
											  &skey);
		}
		else
			index_rescan(obj_desc->iscan, false, &skey);
662 663 664
		do
		{
			res = index_getnext(obj_desc->iscan, ForwardScanDirection);
665

666 667 668
			if (res == (RetrieveIndexResult) NULL)
			{
				ItemPointerSetInvalid(&(obj_desc->htid));
669
				tuple->t_datamcxt = NULL;
670 671
				tuple->t_data = NULL;
				return;
672
			}
673

674 675 676 677
			/*
			 * For time travel, we need to use the actual time qual here,
			 * rather that NowTimeQual.  We currently have no way to pass
			 * a time qual in.
678
			 *
679 680 681
			 * This is now valid for snapshot !!! And should be fixed in some
			 * way...	- vadim 07/28/98
			 *
682
			 */
683 684
			tuple->t_self = res->heap_iptr;
			heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
Bruce Momjian's avatar
Bruce Momjian committed
685
			pfree(res);
686
		} while (tuple->t_data == NULL);
687 688

		/* remember this tid -- we may need it for later reads/writes */
Bruce Momjian's avatar
Bruce Momjian committed
689
		ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
690 691 692
	}
	else
	{
693 694
		tuple->t_self = obj_desc->htid;
		heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
Bruce Momjian's avatar
Bruce Momjian committed
695
		if (tuple->t_data == NULL)
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
696
			elog(ERROR, "inv_fetchtup: heap_fetch failed");
697 698 699 700 701 702 703
	}

	/*
	 * By here, we have the heap tuple we're interested in.  We cache the
	 * upper and lower bounds for this block in the object descriptor and
	 * return the tuple.
	 */
704

705
	d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
706
	lastbyte = (int32) DatumGetInt32(d);
707
	d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
708
	fsblock = (struct varlena *) DatumGetPointer(d);
709

710 711 712 713 714
	/*
	 * order of + and - is important -- these are unsigned quantites near
	 * 0
	 */
	firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
715

716 717
	obj_desc->lowbyte = firstbyte;
	obj_desc->highbyte = lastbyte;
718

719
	return;
720 721 722
}

/*
723 724
 *	inv_wrnew() -- append a new filesystem block tuple to the inversion
 *					file.
725
 *
726 727 728 729 730 731 732
 *		In response to an inv_write, we append one or more file system
 *		blocks to the class containing the large object.  We violate the
 *		class abstraction here in order to pack things as densely as we
 *		are able.  We examine the last page in the relation, and write
 *		just enough to fill it, assuming that it has above a certain
 *		threshold of space available.  If the space available is less than
 *		the threshold, we allocate a new page by writing a big tuple.
733
 *
734 735 736
 *		By the time we get here, we know all the parameters passed in
 *		are valid, and that we hold the appropriate lock on the heap
 *		relation.
737
 *
738 739 740 741
 *		Parameters:
 *				obj_desc: large object descriptor for which to append block.
 *				buf: buffer containing data to write.
 *				nbytes: amount to write
742
 *
743 744
 *		Returns:
 *				number of bytes actually written to the new tuple.
745 746
 */
static int
747
inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
748
{
749 750 751 752 753 754
	Relation	hr;
	HeapTuple	ntup;
	Buffer		buffer;
	Page		page;
	int			nblocks;
	int			nwritten;
755 756 757 758 759 760 761 762 763 764 765 766 767

	hr = obj_desc->heap_r;

	/*
	 * Get the last block in the relation.	If there's no data in the
	 * relation at all, then we just get a new block.  Otherwise, we check
	 * the last block to see whether it has room to accept some or all of
	 * the data that the user wants to write.  If it doesn't, then we
	 * allocate a new block.
	 */

	nblocks = RelationGetNumberOfBlocks(hr);

Bruce Momjian's avatar
Bruce Momjian committed
768 769
	if (nblocks > 0)
	{
770
		buffer = ReadBuffer(hr, nblocks - 1);
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
771 772
		page = BufferGetPage(buffer);
	}
Bruce Momjian's avatar
Bruce Momjian committed
773 774
	else
	{
775
		buffer = ReadBuffer(hr, P_NEW);
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
776 777 778
		page = BufferGetPage(buffer);
		PageInit(page, BufferGetPageSize(buffer), 0);
	}
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804

	/*
	 * If the last page is too small to hold all the data, and it's too
	 * small to hold IMINBLK, then we allocate a new page.	If it will
	 * hold at least IMINBLK, but less than all the data requested, then
	 * we write IMINBLK here.  The caller is responsible for noticing that
	 * less than the requested number of bytes were written, and calling
	 * this routine again.
	 */

	nwritten = IFREESPC(page);
	if (nwritten < nbytes)
	{
		if (nwritten < IMINBLK)
		{
			ReleaseBuffer(buffer);
			buffer = ReadBuffer(hr, P_NEW);
			page = BufferGetPage(buffer);
			PageInit(page, BufferGetPageSize(buffer), 0);
			if (nbytes > IMAXBLK)
				nwritten = IMAXBLK;
			else
				nwritten = nbytes;
		}
	}
	else
805 806
		nwritten = nbytes;

807 808 809
	/*
	 * Insert a new file system block tuple, index it, and write it out.
	 */
810

811 812
	ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
	inv_indextup(obj_desc, ntup);
813
	heap_freetuple(ntup);
814

815 816
	/* new tuple is inserted */
	WriteBuffer(buffer);
817

818
	return nwritten;
819 820 821
}

static int
822
inv_wrold(LargeObjectDesc *obj_desc,
823 824
		  char *dbuf,
		  int nbytes,
825
		  HeapTuple tuple,
826
		  Buffer buffer)
827
{
828 829 830 831 832 833 834
	Relation	hr;
	HeapTuple	ntup;
	Buffer		newbuf;
	Page		page;
	Page		newpage;
	int			tupbytes;
	Datum		d;
835
	struct varlena *fsblock;
836 837 838 839 840
	int			nwritten,
				nblocks,
				freespc;
	bool		isNull;
	int			keep_offset;
841
	RetrieveIndexResult res;
842 843

	/*
844 845 846 847
	 * Since we're using a no-overwrite storage manager, the way we
	 * overwrite blocks is to mark the old block invalid and append a new
	 * block.  First mark the old block invalid.  This violates the tuple
	 * abstraction.
848 849
	 */

850 851 852
	TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
	tuple->t_data->t_cmax = GetCurrentCommandId();
	tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
853

854 855 856 857
	/*
	 * If we're overwriting the entire block, we're lucky.	All we need to
	 * do is to insert a new block.
	 */
858

859 860 861 862
	if (obj_desc->offset == obj_desc->lowbyte
		&& obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
	{
		WriteBuffer(buffer);
863
		return inv_wrnew(obj_desc, dbuf, nbytes);
864
	}
865 866

	/*
867 868 869 870 871
	 * By here, we need to overwrite part of the data in the current
	 * tuple.  In order to reduce the degree to which we fragment blocks,
	 * we guarantee that no block will be broken up due to an overwrite.
	 * This means that we need to allocate a tuple on a new page, if
	 * there's not room for the replacement on this one.
872 873
	 */

874 875 876 877 878
	newbuf = buffer;
	page = BufferGetPage(buffer);
	newpage = BufferGetPage(newbuf);
	hr = obj_desc->heap_r;
	freespc = IFREESPC(page);
879
	d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
880 881 882 883 884 885 886 887 888 889 890 891 892
	fsblock = (struct varlena *) DatumGetPointer(d);
	tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);

	if (freespc < tupbytes)
	{

		/*
		 * First see if there's enough space on the last page of the table
		 * to put this tuple.
		 */

		nblocks = RelationGetNumberOfBlocks(hr);

Bruce Momjian's avatar
Bruce Momjian committed
893 894
		if (nblocks > 0)
		{
895
			newbuf = ReadBuffer(hr, nblocks - 1);
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
896 897
			newpage = BufferGetPage(newbuf);
		}
Bruce Momjian's avatar
Bruce Momjian committed
898 899
		else
		{
900
			newbuf = ReadBuffer(hr, P_NEW);
Bruce Momjian's avatar
Done.  
Bruce Momjian committed
901 902 903
			newpage = BufferGetPage(newbuf);
			PageInit(newpage, BufferGetPageSize(newbuf), 0);
		}
904 905 906 907 908 909 910 911 912 913 914 915 916 917 918

		freespc = IFREESPC(newpage);

		/*
		 * If there's no room on the last page, allocate a new last page
		 * for the table, and put it there.
		 */

		if (freespc < tupbytes)
		{
			ReleaseBuffer(newbuf);
			newbuf = ReadBuffer(hr, P_NEW);
			newpage = BufferGetPage(newbuf);
			PageInit(newpage, BufferGetPageSize(newbuf), 0);
		}
919 920
	}

921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948
	nwritten = nbytes;
	if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
		nwritten = obj_desc->highbyte - obj_desc->offset + 1;
	memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
			dbuf, nwritten);

	/*
	 * we are rewriting the entire old block, therefore we reset offset to
	 * the lowbyte of the original block before jumping into
	 * inv_newtuple()
	 */
	keep_offset = obj_desc->offset;
	obj_desc->offset = obj_desc->lowbyte;
	ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
						tupbytes);
	/* after we are done, we restore to the true offset */
	obj_desc->offset = keep_offset;

	/*
	 * By here, we have a page (newpage) that's guaranteed to have enough
	 * space on it to put the new tuple.  Call inv_newtuple to do the
	 * work.  Passing NULL as a buffer to inv_newtuple() keeps it from
	 * copying any data into the new tuple.  When it returns, the tuple is
	 * ready to receive data from the old tuple and the user's data
	 * buffer.
	 */
/*
	ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
949
	dptr = ((char *) ntup) + ntup->t_hoff -
Bruce Momjian's avatar
Bruce Momjian committed
950
				(sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
951
				sizeof(int4)
952 953 954 955 956 957 958 959
				+ sizeof(fsblock->vl_len);

	if (obj_desc->offset > obj_desc->lowbyte) {
		memmove(dptr,
				&(fsblock->vl_dat[0]),
				obj_desc->offset - obj_desc->lowbyte);
		dptr += obj_desc->offset - obj_desc->lowbyte;
	}
960 961


962 963 964
	nwritten = nbytes;
	if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
		nwritten = obj_desc->highbyte - obj_desc->offset + 1;
965

966 967
	memmove(dptr, dbuf, nwritten);
	dptr += nwritten;
968

969
	if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
970 971
*/
/*
972 973 974
		loc = (obj_desc->highbyte - obj_desc->offset)
				+ nwritten;
		sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
975

976
		what's going on here?? - jolly
977 978
*/
/*
979 980 981
		sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
		memmove(&(fsblock->vl_dat[0]), dptr, sz);
	}
982 983 984
*/


985 986
	/* index the new tuple */
	inv_indextup(obj_desc, ntup);
987
	heap_freetuple(ntup);
988

989 990 991 992
	/*
	 * move the scandesc forward so we don't reread the newly inserted
	 * tuple on the next index scan
	 */
993
	res = NULL;
994
	if (obj_desc->iscan)
995 996 997 998
		res = index_getnext(obj_desc->iscan, ForwardScanDirection);

	if (res)
		pfree(res);
999

1000 1001 1002 1003
	/*
	 * Okay, by here, a tuple for the new block is correctly placed,
	 * indexed, and filled.  Write the changed pages out.
	 */
1004

1005 1006 1007
	WriteBuffer(buffer);
	if (newbuf != buffer)
		WriteBuffer(newbuf);
1008

Bruce Momjian's avatar
Done.  
Bruce Momjian committed
1009 1010 1011
	/* Tuple id is no longer valid */
	ItemPointerSetInvalid(&(obj_desc->htid));

1012
	/* done */
1013
	return nwritten;
1014 1015
}

1016
static HeapTuple
1017
inv_newtuple(LargeObjectDesc *obj_desc,
1018 1019 1020 1021
			 Buffer buffer,
			 Page page,
			 char *dbuf,
			 int nwrite)
1022
{
Bruce Momjian's avatar
Bruce Momjian committed
1023
	HeapTuple	ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
1024 1025 1026 1027 1028 1029 1030 1031 1032
	PageHeader	ph;
	int			tupsize;
	int			hoff;
	Offset		lower;
	Offset		upper;
	ItemId		itemId;
	OffsetNumber off;
	OffsetNumber limit;
	char	   *attptr;
1033 1034

	/* compute tuple size -- no nulls */
1035
	hoff = offsetof(HeapTupleHeaderData, t_bits);
1036
	hoff = MAXALIGN(hoff);
1037 1038 1039

	/* add in olastbyte, varlena.vl_len, varlena.vl_dat */
	tupsize = hoff + (2 * sizeof(int32)) + nwrite;
1040
	tupsize = MAXALIGN(tupsize);
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067

	/*
	 * Allocate the tuple on the page, violating the page abstraction.
	 * This code was swiped from PageAddItem().
	 */

	ph = (PageHeader) page;
	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));

	/* look for "recyclable" (unused & deallocated) ItemId */
	for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
	{
		itemId = &ph->pd_linp[off - 1];
		if ((((*itemId).lp_flags & LP_USED) == 0) &&
			((*itemId).lp_len == 0))
			break;
	}

	if (off > limit)
		lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
	else if (off == limit)
		lower = ph->pd_lower + sizeof(ItemIdData);
	else
		lower = ph->pd_lower;

	upper = ph->pd_upper - tupsize;

1068
	itemId = &ph->pd_linp[off - 1];
1069 1070 1071 1072 1073 1074
	(*itemId).lp_off = upper;
	(*itemId).lp_len = tupsize;
	(*itemId).lp_flags = LP_USED;
	ph->pd_lower = lower;
	ph->pd_upper = upper;

1075
	ntup->t_datamcxt = NULL;
1076
	ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
1077 1078 1079 1080 1081 1082 1083

	/*
	 * Tuple is now allocated on the page.	Next, fill in the tuple
	 * header.	This block of code violates the tuple abstraction.
	 */

	ntup->t_len = tupsize;
1084 1085 1086 1087 1088 1089 1090 1091 1092
	ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
	LastOidProcessed = ntup->t_data->t_oid = newoid();
	TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
	ntup->t_data->t_cmin = GetCurrentCommandId();
	StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
	ntup->t_data->t_cmax = 0;
	ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
	ntup->t_data->t_natts = 2;
	ntup->t_data->t_hoff = hoff;
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102

	/* if a NULL is passed in, avoid the calculations below */
	if (dbuf == NULL)
		return ntup;

	/*
	 * Finally, copy the user's data buffer into the tuple.  This violates
	 * the tuple and class abstractions.
	 */

1103
	attptr = ((char *) ntup->t_data) + hoff;
1104 1105 1106 1107 1108 1109 1110
	*((int32 *) attptr) = obj_desc->offset + nwrite - 1;
	attptr += sizeof(int32);

	/*
	 * *  mer fixed disk layout of varlenas to get rid of the need for
	 * this. *
	 *
Bruce Momjian's avatar
Bruce Momjian committed
1111
	 * ((int32 *) attptr) = nwrite + sizeof(int32); *  attptr +=
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
	 * sizeof(int32);
	 */

	*((int32 *) attptr) = nwrite + sizeof(int32);
	attptr += sizeof(int32);

	/*
	 * If a data buffer was passed in, then copy the data from the buffer
	 * to the tuple.  Some callers (eg, inv_wrold()) may not pass in a
	 * buffer, since they have to copy part of the old tuple data and part
	 * of the user's new data into the new tuple.
	 */

	if (dbuf != (char *) NULL)
		memmove(attptr, dbuf, nwrite);

	/* keep track of boundary of current tuple */
	obj_desc->lowbyte = obj_desc->offset;
	obj_desc->highbyte = obj_desc->offset + nwrite - 1;

	/* new tuple is filled -- return it */
1133
	return ntup;
1134 1135 1136
}

static void
1137
inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
1138
{
1139
	InsertIndexResult res;
1140 1141
	Datum		v[1];
	char		n[1];
1142

1143 1144 1145
	n[0] = ' ';
	v[0] = Int32GetDatum(obj_desc->highbyte);
	res = index_insert(obj_desc->index_r, &v[0], &n[0],
1146
					   &(tuple->t_self), obj_desc->heap_r);
1147

1148 1149
	if (res)
		pfree(res);
1150 1151 1152 1153 1154 1155
}

/*
static void
DumpPage(Page page, int blkno)
{
1156 1157 1158 1159 1160 1161 1162 1163
		ItemId			lp;
		HeapTuple		tup;
		int				flags, i, nline;
		ItemPointerData pointerData;

		printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
				((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
				((PageHeader)page)->pd_special);
1164

1165 1166
		printf("\t:MaxOffsetNumber=%d\n",
			   (int16) PageGetMaxOffsetNumber(page));
1167

1168
		nline = (int16) PageGetMaxOffsetNumber(page);
1169 1170

{
1171 1172
		int		i;
		char	*cp;
1173

1174 1175
		i = PageGetSpecialSize(page);
		cp = PageGetSpecialPointer(page);
1176

1177
		printf("\t:SpecialData=");
1178

1179 1180 1181 1182 1183 1184
		while (i > 0) {
				printf(" 0x%02x", *cp);
				cp += 1;
				i -= 1;
		}
		printf("\n");
1185
}
1186 1187 1188 1189 1190 1191 1192
		for (i = 0; i < nline; i++) {
				lp = ((PageHeader)page)->pd_linp + i;
				flags = (*lp).lp_flags;
				ItemPointerSet(&pointerData, blkno, 1 + i);
				printf("%s:off=%d:flags=0x%x:len=%d",
						ItemPointerFormExternal(&pointerData), (*lp).lp_off,
						flags, (*lp).lp_len);
1193

1194 1195
				if (flags & LP_USED) {
						HeapTupleData	htdata;
1196

1197
						printf(":USED");
1198

1199 1200 1201
						memmove((char *) &htdata,
								(char *) &((char *)page)[(*lp).lp_off],
								sizeof(htdata));
1202

1203
						tup = &htdata;
1204

1205 1206 1207
						printf("\n\t:ctid=%s:oid=%d",
								ItemPointerFormExternal(&tup->t_ctid),
								tup->t_oid);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1208
						printf(":natts=%d:thoff=%d:",
1209
								tup->t_natts,
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1210
								tup->t_hoff);
1211

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1212 1213
						printf("\n\t:cmin=%u:",
								tup->t_cmin);
1214

1215
						printf("xmin=%u:", tup->t_xmin);
1216

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1217 1218
						printf("\n\t:cmax=%u:",
								tup->t_cmax);
1219

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1220
						printf("xmax=%u:\n", tup->t_xmax);
1221

1222 1223 1224
				} else
						putchar('\n');
		}
1225 1226 1227 1228 1229
}

static char*
ItemPointerFormExternal(ItemPointer pointer)
{
1230 1231 1232 1233 1234 1235 1236 1237 1238
		static char		itemPointerString[32];

		if (!ItemPointerIsValid(pointer)) {
			memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
		} else {
			sprintf(itemPointerString, "<%u,%u>",
					ItemPointerGetBlockNumber(pointer),
					ItemPointerGetOffsetNumber(pointer));
		}
1239

1240
		return itemPointerString;
1241 1242 1243 1244 1245 1246
}
*/

static int
_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
{
1247
	IndexScanDesc iscan;
1248
	RetrieveIndexResult res;
Bruce Momjian's avatar
Bruce Momjian committed
1249
	HeapTupleData tuple;
1250 1251 1252
	Datum		d;
	long		size;
	bool		isNull;
1253
	Buffer		buffer;
1254

1255 1256
	/* scan backwards from end */
	iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
1257

1258 1259 1260
	do
	{
		res = index_getnext(iscan, BackwardScanDirection);
1261

1262 1263 1264 1265
		/*
		 * If there are no more index tuples, then the relation is empty,
		 * so the file's size is zero.
		 */
1266

1267 1268 1269
		if (res == (RetrieveIndexResult) NULL)
		{
			index_endscan(iscan);
1270
			return 0;
1271
		}
1272

1273 1274 1275 1276 1277
		/*
		 * For time travel, we need to use the actual time qual here,
		 * rather that NowTimeQual.  We currently have no way to pass a
		 * time qual in.
		 */
1278 1279
		tuple.t_self = res->heap_iptr;
		heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
1280
		pfree(res);
1281
	} while (tuple.t_data == NULL);
1282

1283 1284
	/* don't need the index scan anymore */
	index_endscan(iscan);
1285

1286
	/* get olastbyte attribute */
1287
	d = heap_getattr(&tuple, 1, hdesc, &isNull);
1288
	size = DatumGetInt32(d) + 1;
1289
	ReleaseBuffer(buffer);
1290

1291
	return size;
1292
}