heapam.c 44.8 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * heapam.c
4
 *	  heap access method code
5
 *
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
6 7
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
11
 *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.70 2000/06/02 10:20:24 vadim Exp $
12 13 14
 *
 *
 * INTERFACE ROUTINES
15 16 17 18 19 20 21 22 23 24 25
 *		heapgettup		- fetch next heap tuple from a scan
 *		heap_open		- open a heap relation by relationId
 *		heap_openr		- open a heap relation by name
 *		heap_close		- close a heap relation
 *		heap_beginscan	- begin relation scan
 *		heap_rescan		- restart a relation scan
 *		heap_endscan	- end relation scan
 *		heap_getnext	- retrieve next tuple in scan
 *		heap_fetch		- retrive tuple with tid
 *		heap_insert		- insert tuple into a relation
 *		heap_delete		- delete a tuple from a relation
26
 *		heap_update - replace a tuple in a relation with another tuple
27 28 29
 *		heap_markpos	- mark scan position
 *		heap_restrpos	- restore position to marked location
 *
30
 * NOTES
31 32 33
 *	  This file contains the heap_ routines which implement
 *	  the POSTGRES heap access method used for all POSTGRES
 *	  relations.
34 35
 *
 * OLD COMMENTS
36
 *		struct relscan hints:  (struct should be made AM independent?)
37
 *
38 39 40 41
 *		rs_ctid is the tid of the last tuple returned by getnext.
 *		rs_ptid and rs_ntid are the tids of the previous and next tuples
 *		returned by getnext, respectively.	NULL indicates an end of
 *		scan (either direction); NON indicates an unknow value.
42
 *
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
 *		possible combinations:
 *		rs_p	rs_c	rs_n			interpretation
 *		NULL	NULL	NULL			empty scan
 *		NULL	NULL	NON				at begining of scan
 *		NULL	NULL	t1				at begining of scan (with cached tid)
 *		NON		NULL	NULL			at end of scan
 *		t1		NULL	NULL			at end of scan (with cached tid)
 *		NULL	t1		NULL			just returned only tuple
 *		NULL	t1		NON				just returned first tuple
 *		NULL	t1		t2				returned first tuple (with cached tid)
 *		NON		t1		NULL			just returned last tuple
 *		t2		t1		NULL			returned last tuple (with cached tid)
 *		t1		t2		NON				in the middle of a forward scan
 *		NON		t2		t1				in the middle of a reverse scan
 *		ti		tj		tk				in the middle of a scan (w cached tid)
58
 *
59 60
 *		Here NULL is ...tup == NULL && ...buf == InvalidBuffer,
 *		and NON is ...tup == NULL && ...buf == UnknownBuffer.
61
 *
62 63 64
 *		Currently, the NONTID values are not cached with their actual
 *		values by getnext.	Values may be cached by markpos since it stores
 *		all three tids.
65
 *
66 67
 *		NOTE:  the calls to elog() must stop.  Should decide on an interface
 *		between the general and specific AM calls.
68
 *
69 70 71 72
 *		XXX probably do not need a free tuple routine for heaps.
 *		Huh?  Free tuple is not necessary for tuples returned by scans, but
 *		is necessary for tuples which are returned by
 *		RelationGetTupleByItemPointer. -hirohama
73 74 75 76
 *
 *-------------------------------------------------------------------------
 */

77
#include "postgres.h"
78

79 80
#include "access/heapam.h"
#include "access/hio.h"
Bruce Momjian's avatar
Bruce Momjian committed
81
#include "access/valid.h"
82
#include "catalog/catalog.h"
Bruce Momjian's avatar
Bruce Momjian committed
83
#include "miscadmin.h"
84
#include "utils/builtins.h"
Bruce Momjian's avatar
Bruce Momjian committed
85 86
#include "utils/inval.h"
#include "utils/relcache.h"
87

Marc G. Fournier's avatar
Marc G. Fournier committed
88

89
/* ----------------------------------------------------------------
90
 *						 heap support routines
91 92 93 94
 * ----------------------------------------------------------------
 */

/* ----------------
95
 *		initscan - scan code common to heap_beginscan and heap_rescan
96 97 98
 * ----------------
 */
static void
99
initscan(HeapScanDesc scan,
100 101 102 103
		 Relation relation,
		 int atend,
		 unsigned nkeys,
		 ScanKey key)
104
{
105 106 107 108 109 110 111 112 113 114
	/* ----------------
	 *	Make sure we have up-to-date idea of number of blocks in relation.
	 *	It is sufficient to do this once at scan start, since any tuples
	 *	added while the scan is in progress will be invisible to my
	 *	transaction anyway...
	 * ----------------
	 */
	relation->rd_nblocks = RelationGetNumberOfBlocks(relation);

	if (relation->rd_nblocks == 0)
115 116 117 118 119
	{
		/* ----------------
		 *	relation is empty
		 * ----------------
		 */
120
		scan->rs_ntup.t_datamcxt = scan->rs_ctup.t_datamcxt =
121
			scan->rs_ptup.t_datamcxt = NULL;
Bruce Momjian's avatar
Bruce Momjian committed
122
		scan->rs_ntup.t_data = scan->rs_ctup.t_data =
123
			scan->rs_ptup.t_data = NULL;
124
		scan->rs_nbuf = scan->rs_cbuf = scan->rs_pbuf = InvalidBuffer;
125 126 127 128 129 130 131
	}
	else if (atend)
	{
		/* ----------------
		 *	reverse scan
		 * ----------------
		 */
132
		scan->rs_ntup.t_datamcxt = scan->rs_ctup.t_datamcxt = NULL;
133
		scan->rs_ntup.t_data = scan->rs_ctup.t_data = NULL;
134
		scan->rs_nbuf = scan->rs_cbuf = InvalidBuffer;
135
		scan->rs_ptup.t_datamcxt = NULL;
136
		scan->rs_ptup.t_data = NULL;
137
		scan->rs_pbuf = UnknownBuffer;
138 139 140 141 142 143 144
	}
	else
	{
		/* ----------------
		 *	forward scan
		 * ----------------
		 */
145
		scan->rs_ctup.t_datamcxt = scan->rs_ptup.t_datamcxt = NULL;
146
		scan->rs_ctup.t_data = scan->rs_ptup.t_data = NULL;
147
		scan->rs_cbuf = scan->rs_pbuf = InvalidBuffer;
148
		scan->rs_ntup.t_datamcxt = NULL;
149
		scan->rs_ntup.t_data = NULL;
150
		scan->rs_nbuf = UnknownBuffer;
151 152 153
	}							/* invalid too */

	/* we don't have a marked position... */
154 155 156 157
	ItemPointerSetInvalid(&(scan->rs_mptid));
	ItemPointerSetInvalid(&(scan->rs_mctid));
	ItemPointerSetInvalid(&(scan->rs_mntid));
	ItemPointerSetInvalid(&(scan->rs_mcd));
158

159
	/* ----------------
160
	 *	copy the scan key, if appropriate
161 162
	 * ----------------
	 */
163
	if (key != NULL)
164
		memmove(scan->rs_key, key, nkeys * sizeof(ScanKeyData));
165 166 167
}

/* ----------------
168
 *		unpinscan - code common to heap_rescan and heap_endscan
169 170 171
 * ----------------
 */
static void
172
unpinscan(HeapScanDesc scan)
173
{
174 175
	if (BufferIsValid(scan->rs_pbuf))
		ReleaseBuffer(scan->rs_pbuf);
176 177

	/* ------------------------------------
178
	 *	Scan will pin buffer once for each non-NULL tuple pointer
179 180 181 182
	 *	(ptup, ctup, ntup), so they have to be unpinned multiple
	 *	times.
	 * ------------------------------------
	 */
183 184
	if (BufferIsValid(scan->rs_cbuf))
		ReleaseBuffer(scan->rs_cbuf);
185

186 187
	if (BufferIsValid(scan->rs_nbuf))
		ReleaseBuffer(scan->rs_nbuf);
188

189 190 191
	/*
	 * we don't bother to clear rs_pbuf etc --- caller must reinitialize
	 * them if scan descriptor is not being deleted.
192
	 */
193 194 195
}

/* ------------------------------------------
196
 *		nextpage
197
 *
198 199 200
 *		figure out the next page to scan after the current page
 *		taking into account of possible adjustment of degrees of
 *		parallelism
201 202 203 204 205
 * ------------------------------------------
 */
static int
nextpage(int page, int dir)
{
206
	return (dir < 0) ? page - 1 : page + 1;
207 208 209
}

/* ----------------
210
 *		heapgettup - fetch next heap tuple
211
 *
212 213
 *		routine used by heap_getnext() which does most of the
 *		real work in scanning tuples.
214 215 216 217 218
 *
 *		The scan routines handle their own buffer lock/unlocking, so
 *		there is no reason to request the buffer number unless
 *		to want to perform some other operation with the result,
 *		like pass it to another function.
219 220
 * ----------------
 */
221
static void
222
heapgettup(Relation relation,
223
		   HeapTuple tuple,
224
		   int dir,
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
225
		   Buffer *buffer,
226
		   Snapshot snapshot,
227 228
		   int nkeys,
		   ScanKey key)
229
{
Bruce Momjian's avatar
Bruce Momjian committed
230 231 232 233 234 235 236 237 238
	ItemId		lpp;
	Page		dp;
	int			page;
	int			pages;
	int			lines;
	OffsetNumber lineoff;
	int			linesleft;
	ItemPointer tid = (tuple->t_data == NULL) ?
	(ItemPointer) NULL : &(tuple->t_self);
239

240
	/* ----------------
241
	 *	increment access statistics
242 243
	 * ----------------
	 */
244 245 246
	IncrHeapAccessStat(local_heapgettup);
	IncrHeapAccessStat(global_heapgettup);

247
	/* ----------------
248 249 250 251
	 *	debugging stuff
	 *
	 * check validity of arguments, here and for other functions too
	 * Note: no locking manipulations needed--this is a local function
252 253
	 * ----------------
	 */
254 255 256
#ifdef	HEAPDEBUGALL
	if (ItemPointerIsValid(tid))
	{
257
		elog(DEBUG, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
258 259
			 RelationGetRelationName(relation), tid, tid->ip_blkid,
			 tid->ip_posid, dir);
260
	}
261 262
	else
	{
263
		elog(DEBUG, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
264
			 RelationGetRelationName(relation), tid, dir);
265
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
266
	elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
267

268
	elog(DEBUG, "heapgettup: relation(%c)=`%s', %p",
269
		 relation->rd_rel->relkind, RelationGetRelationName(relation),
270
		 snapshot);
271
#endif	 /* !defined(HEAPDEBUGALL) */
272 273 274

	if (!ItemPointerIsValid(tid))
		Assert(!PointerIsValid(tid));
275 276

	/* ----------------
277
	 *	return null immediately if relation is empty
278 279
	 * ----------------
	 */
280
	if (!(pages = relation->rd_nblocks))
281
	{
282
		tuple->t_datamcxt = NULL;
283 284 285
		tuple->t_data = NULL;
		return;
	}
286 287 288 289 290 291 292 293 294 295 296 297 298 299

	/* ----------------
	 *	calculate next starting lineoff, given scan direction
	 * ----------------
	 */
	if (!dir)
	{
		/* ----------------
		 * ``no movement'' scan direction
		 * ----------------
		 */
		/* assume it is a valid TID XXX */
		if (ItemPointerIsValid(tid) == false)
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
300
			*buffer = InvalidBuffer;
301
			tuple->t_datamcxt = NULL;
302 303
			tuple->t_data = NULL;
			return;
304
		}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
305
		*buffer = RelationGetBufferWithBuffer(relation,
Bruce Momjian's avatar
Bruce Momjian committed
306 307
										  ItemPointerGetBlockNumber(tid),
											  *buffer);
308

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
309
		if (!BufferIsValid(*buffer))
310
			elog(ERROR, "heapgettup: failed ReadBuffer");
311

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
312 313 314
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
315 316 317
		lineoff = ItemPointerGetOffsetNumber(tid);
		lpp = PageGetItemId(dp, lineoff);

318
		tuple->t_datamcxt = NULL;
319 320
		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
		tuple->t_len = ItemIdGetLength(lpp);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
321
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
322
		return;
323

324
	}
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
	else if (dir < 0)
	{
		/* ----------------
		 *	reverse scan direction
		 * ----------------
		 */
		if (ItemPointerIsValid(tid) == false)
			tid = NULL;
		if (tid == NULL)
		{
			page = pages - 1;	/* final page */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
		}
		if (page < 0)
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
343
			*buffer = InvalidBuffer;
344 345
			tuple->t_data = NULL;
			return;
346 347
		}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
348 349
		*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
		if (!BufferIsValid(*buffer))
350
			elog(ERROR, "heapgettup: failed ReadBuffer");
351

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
352 353 354
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
355 356 357 358
		lines = PageGetMaxOffsetNumber(dp);
		if (tid == NULL)
		{
			lineoff = lines;	/* final offnum */
359
		}
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
		else
		{
			lineoff =			/* previous offnum */
				OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
		}
		/* page and lineoff now reference the physically previous tid */

	}
	else
	{
		/* ----------------
		 *	forward scan direction
		 * ----------------
		 */
		if (ItemPointerIsValid(tid) == false)
		{
			page = 0;			/* first page */
			lineoff = FirstOffsetNumber;		/* first offnum */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
			lineoff =			/* next offnum */
				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
		}

		if (page >= pages)
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
388
			*buffer = InvalidBuffer;
389
			tuple->t_datamcxt = NULL;
390 391
			tuple->t_data = NULL;
			return;
392 393 394
		}
		/* page and lineoff now reference the physically next tid */

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
395 396
		*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
		if (!BufferIsValid(*buffer))
397
			elog(ERROR, "heapgettup: failed ReadBuffer");
398

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
399 400 401
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
402
		lines = PageGetMaxOffsetNumber(dp);
403
	}
404 405 406

	/* 'dir' is now non-zero */

407
	/* ----------------
408 409
	 *	calculate line pointer and number of remaining items
	 *	to check on this page.
410 411
	 * ----------------
	 */
412 413 414 415 416 417
	lpp = PageGetItemId(dp, lineoff);
	if (dir < 0)
		linesleft = lineoff - 1;
	else
		linesleft = lines - lineoff;

418
	/* ----------------
419 420
	 *	advance the scan until we find a qualifying tuple or
	 *	run out of stuff to scan
421 422
	 * ----------------
	 */
423 424 425 426
	for (;;)
	{
		while (linesleft >= 0)
		{
427
			if (ItemIdIsUsed(lpp))
428
			{
429
				tuple->t_datamcxt = NULL;
430 431 432 433 434 435 436
				tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
				tuple->t_len = ItemIdGetLength(lpp);
				ItemPointerSet(&(tuple->t_self), page, lineoff);
				/* ----------------
				 *	if current tuple qualifies, return it.
				 * ----------------
				 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
437
				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
438 439
								   snapshot, nkeys, key);
				if (tuple->t_data != NULL)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
440 441
				{
					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
442
					return;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
443
				}
444 445 446 447 448 449 450 451 452 453
			}

			/* ----------------
			 *	otherwise move to the next item on the page
			 * ----------------
			 */
			--linesleft;
			if (dir < 0)
			{
				--lpp;			/* move back in this page's ItemId array */
454
				--lineoff;
455 456 457
			}
			else
			{
Bruce Momjian's avatar
Bruce Momjian committed
458 459
				++lpp;			/* move forward in this page's ItemId
								 * array */
460
				++lineoff;
461 462 463 464 465 466 467 468
			}
		}

		/* ----------------
		 *	if we get here, it means we've exhausted the items on
		 *	this page and it's time to move to the next..
		 * ----------------
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
469
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
470 471 472 473 474 475 476 477
		page = nextpage(page, dir);

		/* ----------------
		 *	return NULL if we've exhausted all the pages..
		 * ----------------
		 */
		if (page < 0 || page >= pages)
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
478 479 480
			if (BufferIsValid(*buffer))
				ReleaseBuffer(*buffer);
			*buffer = InvalidBuffer;
481
			tuple->t_datamcxt = NULL;
482 483
			tuple->t_data = NULL;
			return;
484 485
		}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
486
		*buffer = ReleaseAndReadBuffer(*buffer, relation, page);
487

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
488
		if (!BufferIsValid(*buffer))
489
			elog(ERROR, "heapgettup: failed ReadBuffer");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
490 491
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
		dp = (Page) BufferGetPage(*buffer);
492
		lines = PageGetMaxOffsetNumber((Page) dp);
493 494
		linesleft = lines - 1;
		if (dir < 0)
495 496 497 498
		{
			lineoff = lines;
			lpp = PageGetItemId(dp, lines);
		}
499
		else
500 501
		{
			lineoff = FirstOffsetNumber;
502
			lpp = PageGetItemId(dp, FirstOffsetNumber);
503
		}
504 505 506 507 508
	}
}


/* ----------------------------------------------------------------
509
 *					 heap access method interface
510 511 512
 * ----------------------------------------------------------------
 */
/* ----------------
513
 *		heap_open - open a heap relation by relationId
514
 *
515 516 517 518 519
 *		If lockmode is "NoLock", no lock is obtained on the relation,
 *		and the caller must check for a NULL return value indicating
 *		that no such relation exists.
 *		Otherwise, an error is raised if the relation does not exist,
 *		and the specified kind of lock is obtained on the relation.
520 521 522
 * ----------------
 */
Relation
523
heap_open(Oid relationId, LOCKMODE lockmode)
524
{
525
	Relation	r;
526

527 528
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

529 530 531 532 533 534 535
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_open);
	IncrHeapAccessStat(global_open);

536 537
	/* The relcache does all the real work... */
	r = RelationIdGetRelation(relationId);
538

539
	/* Under no circumstances will we return an index as a relation. */
540
	if (RelationIsValid(r) && r->rd_rel->relkind == RELKIND_INDEX)
541
		elog(ERROR, "%s is an index relation", RelationGetRelationName(r));
542

543 544 545
	if (lockmode == NoLock)
		return r;				/* caller must check RelationIsValid! */

546
	if (!RelationIsValid(r))
547 548 549 550
		elog(ERROR, "Relation %u does not exist", relationId);

	LockRelation(r, lockmode);

551
	return r;
552 553 554
}

/* ----------------
555
 *		heap_openr - open a heap relation by name
556
 *
557 558 559 560 561
 *		If lockmode is "NoLock", no lock is obtained on the relation,
 *		and the caller must check for a NULL return value indicating
 *		that no such relation exists.
 *		Otherwise, an error is raised if the relation does not exist,
 *		and the specified kind of lock is obtained on the relation.
562 563 564
 * ----------------
 */
Relation
565
heap_openr(const char *relationName, LOCKMODE lockmode)
566
{
567
	Relation	r;
568

569 570
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

571 572 573 574 575 576 577
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_openr);
	IncrHeapAccessStat(global_openr);

578
	/* The relcache does all the real work... */
579 580
	r = RelationNameGetRelation(relationName);

581
	/* Under no circumstances will we return an index as a relation. */
582
	if (RelationIsValid(r) && r->rd_rel->relkind == RELKIND_INDEX)
583
		elog(ERROR, "%s is an index relation", RelationGetRelationName(r));
584

585 586 587
	if (lockmode == NoLock)
		return r;				/* caller must check RelationIsValid! */

588
	if (!RelationIsValid(r))
589 590 591 592
		elog(ERROR, "Relation '%s' does not exist", relationName);

	LockRelation(r, lockmode);

593
	return r;
594 595 596
}

/* ----------------
597
 *		heap_close - close a heap relation
598
 *
599 600 601
 *		If lockmode is not "NoLock", we first release the specified lock.
 *		Note that it is often sensible to hold a lock beyond heap_close;
 *		in that case, the lock is released automatically at xact end.
602 603 604
 * ----------------
 */
void
605
heap_close(Relation relation, LOCKMODE lockmode)
606
{
607 608
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

609 610 611 612 613 614 615
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_close);
	IncrHeapAccessStat(global_close);

616 617 618 619
	if (lockmode != NoLock)
		UnlockRelation(relation, lockmode);

	/* The relcache does the real work... */
620
	RelationClose(relation);
621 622 623 624
}


/* ----------------
625
 *		heap_beginscan	- begin relation scan
626 627 628 629
 * ----------------
 */
HeapScanDesc
heap_beginscan(Relation relation,
630
			   int atend,
631
			   Snapshot snapshot,
632 633
			   unsigned nkeys,
			   ScanKey key)
634
{
635
	HeapScanDesc scan;
636 637 638 639 640 641 642 643 644 645 646

	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_beginscan);
	IncrHeapAccessStat(global_beginscan);

	/* ----------------
	 *	sanity checks
	 * ----------------
647
	 */
648
	if (!RelationIsValid(relation))
649
		elog(ERROR, "heap_beginscan: !RelationIsValid(relation)");
650 651 652 653 654 655 656

	/* ----------------
	 *	increment relation ref count while scanning relation
	 * ----------------
	 */
	RelationIncrementReferenceCount(relation);

657 658 659 660
	/* ----------------
	 *	Acquire AccessShareLock for the duration of the scan
	 *
	 *	Note: we could get an SI inval message here and consequently have
661
	 *	to rebuild the relcache entry.	The refcount increment above
662 663 664 665 666 667 668 669 670
	 *	ensures that we will rebuild it and not just flush it...
	 * ----------------
	 */
	LockRelation(relation, AccessShareLock);

	/* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */
	if (relation->rd_rel->relkind == RELKIND_UNCATALOGED)
		snapshot = SnapshotSelf;

671 672 673 674
	/* ----------------
	 *	allocate and initialize scan descriptor
	 * ----------------
	 */
675
	scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
676

677
	scan->rs_rd = relation;
678 679 680
	scan->rs_atend = atend;
	scan->rs_snapshot = snapshot;
	scan->rs_nkeys = (short) nkeys;
681 682

	if (nkeys)
683

684
		/*
685 686
		 * we do this here instead of in initscan() because heap_rescan
		 * also calls initscan() and we don't want to allocate memory
687 688
		 * again
		 */
689
		scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
690
	else
691
		scan->rs_key = NULL;
692

693
	initscan(scan, relation, atend, nkeys, key);
694

695
	return scan;
696 697 698
}

/* ----------------
699
 *		heap_rescan		- restart a relation scan
700 701 702
 * ----------------
 */
void
703
heap_rescan(HeapScanDesc scan,
704 705
			bool scanFromEnd,
			ScanKey key)
706
{
707 708 709 710 711 712 713 714 715 716 717 718 719
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_rescan);
	IncrHeapAccessStat(global_rescan);

	/* Note: set relation level read lock is still set */

	/* ----------------
	 *	unpin scan buffers
	 * ----------------
	 */
720
	unpinscan(scan);
721 722 723 724 725

	/* ----------------
	 *	reinitialize scan descriptor
	 * ----------------
	 */
726
	scan->rs_atend = (bool) scanFromEnd;
727
	initscan(scan, scan->rs_rd, scanFromEnd, scan->rs_nkeys, key);
728 729 730
}

/* ----------------
731
 *		heap_endscan	- end relation scan
732
 *
733 734
 *		See how to integrate with index scans.
 *		Check handling if reldesc caching.
735 736 737
 * ----------------
 */
void
738
heap_endscan(HeapScanDesc scan)
739
{
740 741 742 743 744 745 746 747 748 749 750 751 752
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_endscan);
	IncrHeapAccessStat(global_endscan);

	/* Note: no locking manipulations needed */

	/* ----------------
	 *	unpin scan buffers
	 * ----------------
	 */
753
	unpinscan(scan);
754

755 756 757 758 759 760
	/* ----------------
	 *	Release AccessShareLock acquired by heap_beginscan()
	 * ----------------
	 */
	UnlockRelation(scan->rs_rd, AccessShareLock);

761 762 763 764
	/* ----------------
	 *	decrement relation reference count and free scan descriptor storage
	 * ----------------
	 */
765
	RelationDecrementReferenceCount(scan->rs_rd);
766

767 768 769
	if (scan->rs_key)
		pfree(scan->rs_key);

770
	pfree(scan);
771 772 773
}

/* ----------------
774
 *		heap_getnext	- retrieve next tuple in scan
775
 *
776
 *		Fix to work with index relations.
777 778
 *		We don't return the buffer anymore, but you can get it from the
 *		returned HeapTuple.
779 780 781 782 783
 * ----------------
 */

#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
784
elog(DEBUG, "heap_getnext([%s,nkeys=%d],backw=%d) called", \
785
	 RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, backw)
786

787
#define HEAPDEBUG_2 \
788 789
	 elog(DEBUG, "heap_getnext called with backw (no tracing yet)")

790
#define HEAPDEBUG_3 \
791 792
	 elog(DEBUG, "heap_getnext returns NULL at end")

793
#define HEAPDEBUG_4 \
794 795
	 elog(DEBUG, "heap_getnext valid buffer UNPIN'd")

796
#define HEAPDEBUG_5 \
797 798
	 elog(DEBUG, "heap_getnext next tuple was cached")

799
#define HEAPDEBUG_6 \
800 801
	 elog(DEBUG, "heap_getnext returning EOS")

802
#define HEAPDEBUG_7 \
803
	 elog(DEBUG, "heap_getnext returning tuple");
804 805 806 807 808 809 810 811
#else
#define HEAPDEBUG_1
#define HEAPDEBUG_2
#define HEAPDEBUG_3
#define HEAPDEBUG_4
#define HEAPDEBUG_5
#define HEAPDEBUG_6
#define HEAPDEBUG_7
812
#endif	 /* !defined(HEAPDEBUGALL) */
813 814


815
HeapTuple
816
heap_getnext(HeapScanDesc scandesc, int backw)
817
{
818
	HeapScanDesc scan = scandesc;
819

820
	/* ----------------
821
	 *	increment access statistics
822 823
	 * ----------------
	 */
824 825 826 827 828 829 830 831
	IncrHeapAccessStat(local_getnext);
	IncrHeapAccessStat(global_getnext);

	/* Note: no locking manipulations needed */

	/* ----------------
	 *	argument checks
	 * ----------------
832
	 */
833
	if (scan == NULL)
834
		elog(ERROR, "heap_getnext: NULL relscan");
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850

	/* ----------------
	 *	initialize return buffer to InvalidBuffer
	 * ----------------
	 */

	HEAPDEBUG_1;				/* heap_getnext( info ) */

	if (backw)
	{
		/* ----------------
		 *	handle reverse scan
		 * ----------------
		 */
		HEAPDEBUG_2;			/* heap_getnext called with backw */

851
		if (scan->rs_ptup.t_data == scan->rs_ctup.t_data &&
852
			BufferIsInvalid(scan->rs_pbuf))
853
			return NULL;
854 855 856 857 858

		/*
		 * Copy the "current" tuple/buffer to "next". Pin/unpin the
		 * buffers accordingly
		 */
859
		if (scan->rs_nbuf != scan->rs_cbuf)
860
		{
861 862 863 864
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
			if (BufferIsValid(scan->rs_cbuf))
				IncrBufferRefCount(scan->rs_cbuf);
865
		}
866 867
		scan->rs_ntup = scan->rs_ctup;
		scan->rs_nbuf = scan->rs_cbuf;
868

869
		if (scan->rs_ptup.t_data != NULL)
870
		{
871
			if (scan->rs_cbuf != scan->rs_pbuf)
872
			{
873 874 875 876
				if (BufferIsValid(scan->rs_cbuf))
					ReleaseBuffer(scan->rs_cbuf);
				if (BufferIsValid(scan->rs_pbuf))
					IncrBufferRefCount(scan->rs_pbuf);
877
			}
878 879
			scan->rs_ctup = scan->rs_ptup;
			scan->rs_cbuf = scan->rs_pbuf;
880 881 882
		}
		else
		{						/* NONTUP */
Bruce Momjian's avatar
Bruce Momjian committed
883

884
			/*
885
			 * Don't release scan->rs_cbuf at this point, because
886 887 888 889 890 891 892
			 * heapgettup doesn't increase PrivateRefCount if it is
			 * already set. On a backward scan, both rs_ctup and rs_ntup
			 * usually point to the same buffer page, so
			 * PrivateRefCount[rs_cbuf] should be 2 (or more, if for
			 * instance ctup is stored in a TupleTableSlot).  - 01/09/94
			 */

893 894 895 896 897 898 899
			heapgettup(scan->rs_rd,
					   &(scan->rs_ctup),
					   -1,
					   &(scan->rs_cbuf),
					   scan->rs_snapshot,
					   scan->rs_nkeys,
					   scan->rs_key);
900 901
		}

902
		if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
903
		{
904 905
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
906
			scan->rs_ptup.t_datamcxt = NULL;
907
			scan->rs_ptup.t_data = NULL;
908
			scan->rs_pbuf = InvalidBuffer;
909
			return NULL;
910 911
		}

912 913
		if (BufferIsValid(scan->rs_pbuf))
			ReleaseBuffer(scan->rs_pbuf);
914
		scan->rs_ptup.t_datamcxt = NULL;
915
		scan->rs_ptup.t_data = NULL;
916
		scan->rs_pbuf = UnknownBuffer;
917 918 919 920 921 922 923 924

	}
	else
	{
		/* ----------------
		 *	handle forward scan
		 * ----------------
		 */
925
		if (scan->rs_ctup.t_data == scan->rs_ntup.t_data &&
926
			BufferIsInvalid(scan->rs_nbuf))
927 928
		{
			HEAPDEBUG_3;		/* heap_getnext returns NULL at end */
929
			return NULL;
930 931 932 933 934 935
		}

		/*
		 * Copy the "current" tuple/buffer to "previous". Pin/unpin the
		 * buffers accordingly
		 */
936
		if (scan->rs_pbuf != scan->rs_cbuf)
937
		{
938 939 940 941
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
			if (BufferIsValid(scan->rs_cbuf))
				IncrBufferRefCount(scan->rs_cbuf);
942
		}
943 944
		scan->rs_ptup = scan->rs_ctup;
		scan->rs_pbuf = scan->rs_cbuf;
945

946
		if (scan->rs_ntup.t_data != NULL)
947
		{
948
			if (scan->rs_cbuf != scan->rs_nbuf)
949
			{
950 951 952 953
				if (BufferIsValid(scan->rs_cbuf))
					ReleaseBuffer(scan->rs_cbuf);
				if (BufferIsValid(scan->rs_nbuf))
					IncrBufferRefCount(scan->rs_nbuf);
954
			}
955 956
			scan->rs_ctup = scan->rs_ntup;
			scan->rs_cbuf = scan->rs_nbuf;
957 958 959 960
			HEAPDEBUG_5;		/* heap_getnext next tuple was cached */
		}
		else
		{						/* NONTUP */
Bruce Momjian's avatar
Bruce Momjian committed
961

962
			/*
963
			 * Don't release scan->rs_cbuf at this point, because
964 965 966 967 968 969 970
			 * heapgettup doesn't increase PrivateRefCount if it is
			 * already set. On a forward scan, both rs_ctup and rs_ptup
			 * usually point to the same buffer page, so
			 * PrivateRefCount[rs_cbuf] should be 2 (or more, if for
			 * instance ctup is stored in a TupleTableSlot).  - 01/09/93
			 */

971 972 973 974 975 976 977
			heapgettup(scan->rs_rd,
					   &(scan->rs_ctup),
					   1,
					   &scan->rs_cbuf,
					   scan->rs_snapshot,
					   scan->rs_nkeys,
					   scan->rs_key);
978 979
		}

980
		if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
981
		{
982 983
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
984
			scan->rs_ntup.t_datamcxt = NULL;
985
			scan->rs_ntup.t_data = NULL;
986
			scan->rs_nbuf = InvalidBuffer;
987
			HEAPDEBUG_6;		/* heap_getnext returning EOS */
988
			return NULL;
989 990
		}

991 992
		if (BufferIsValid(scan->rs_nbuf))
			ReleaseBuffer(scan->rs_nbuf);
993
		scan->rs_ntup.t_datamcxt = NULL;
994
		scan->rs_ntup.t_data = NULL;
995
		scan->rs_nbuf = UnknownBuffer;
996 997
	}

998
	/* ----------------
999 1000
	 *	if we get here it means we have a new current scan tuple, so
	 *	point to the proper return buffer and return the tuple.
1001 1002
	 * ----------------
	 */
1003 1004 1005

	HEAPDEBUG_7;				/* heap_getnext returning tuple */

1006
	return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
1007 1008 1009
}

/* ----------------
1010
 *		heap_fetch		- retrive tuple with tid
1011
 *
1012
 *		Currently ignores LP_IVALID during processing!
1013 1014 1015 1016
 *
 *		Because this is not part of a scan, there is no way to
 *		automatically lock/unlock the shared buffers.
 *		For this reason, we require that the user retrieve the buffer
1017
 *		value, and they are required to BufferRelease() it when they
1018 1019
 *		are done.  If they want to make a copy of it before releasing it,
 *		they can call heap_copytyple().
1020 1021
 * ----------------
 */
1022
void
1023
heap_fetch(Relation relation,
1024
		   Snapshot snapshot,
1025
		   HeapTuple tuple,
1026
		   Buffer *userbuf)
1027
{
Bruce Momjian's avatar
Bruce Momjian committed
1028 1029 1030 1031 1032
	ItemId		lp;
	Buffer		buffer;
	PageHeader	dp;
	ItemPointer tid = &(tuple->t_self);
	OffsetNumber offnum;
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049

	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_fetch);
	IncrHeapAccessStat(global_fetch);

	/* ----------------
	 *	get the buffer from the relation descriptor
	 *	Note that this does a buffer pin.
	 * ----------------
	 */

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));

	if (!BufferIsValid(buffer))
1050
		elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%lx) failed",
1051
			 RelationGetRelationName(relation), (long) tid);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1052 1053

	LockBuffer(buffer, BUFFER_LOCK_SHARE);
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067

	/* ----------------
	 *	get the item line pointer corresponding to the requested tid
	 * ----------------
	 */
	dp = (PageHeader) BufferGetPage(buffer);
	offnum = ItemPointerGetOffsetNumber(tid);
	lp = PageGetItemId(dp, offnum);

	/* ----------------
	 *	more sanity checks
	 * ----------------
	 */

1068 1069 1070 1071
	if (!ItemIdIsUsed(lp))
	{
		ReleaseBuffer(buffer);
		*userbuf = InvalidBuffer;
1072
		tuple->t_datamcxt = NULL;
1073 1074 1075
		tuple->t_data = NULL;
		return;
	}
1076

1077
	tuple->t_datamcxt = NULL;
1078 1079 1080
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);

1081 1082 1083 1084 1085
	/* ----------------
	 *	check time qualification of tid
	 * ----------------
	 */

1086 1087
	HeapTupleSatisfies(tuple, relation, buffer, dp,
					   snapshot, 0, (ScanKey) NULL);
1088

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1089 1090
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

1091
	if (tuple->t_data == NULL)
1092
	{
1093
		/* Tuple failed time check, so we can release now. */
1094
		ReleaseBuffer(buffer);
1095 1096 1097 1098
		*userbuf = InvalidBuffer;
	}
	else
	{
1099 1100 1101 1102

		/*
		 * All checks passed, so return the tuple as valid. Caller is now
		 * responsible for releasing the buffer.
1103 1104
		 */
		*userbuf = buffer;
1105 1106 1107
	}
}

1108 1109 1110 1111 1112 1113 1114
/* ----------------
 *	heap_get_latest_tid -  get the latest tid of a specified tuple
 *
 * ----------------
 */
ItemPointer
heap_get_latest_tid(Relation relation,
1115 1116
					Snapshot snapshot,
					ItemPointer tid)
1117
{
1118
	ItemId		lp = NULL;
1119 1120
	Buffer		buffer;
	PageHeader	dp;
1121 1122 1123 1124 1125 1126
	OffsetNumber offnum;
	HeapTupleData tp;
	HeapTupleHeader t_data;
	ItemPointerData ctid;
	bool		invalidBlock,
				linkend;
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137

	/* ----------------
	 *	get the buffer from the relation descriptor
	 *	Note that this does a buffer pin.
	 * ----------------
	 */

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));

	if (!BufferIsValid(buffer))
		elog(ERROR, "heap_get_latest_tid: %s relation: ReadBuffer(%lx) failed",
1138
			 RelationGetRelationName(relation), (long) tid);
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155

	LockBuffer(buffer, BUFFER_LOCK_SHARE);

	/* ----------------
	 *	get the item line pointer corresponding to the requested tid
	 * ----------------
	 */
	dp = (PageHeader) BufferGetPage(buffer);
	offnum = ItemPointerGetOffsetNumber(tid);
	invalidBlock = true;
	if (!PageIsNew(dp))
	{
		lp = PageGetItemId(dp, offnum);
		if (ItemIdIsUsed(lp))
			invalidBlock = false;
	}
	if (invalidBlock)
1156
	{
1157 1158 1159
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		return NULL;
1160
	}
1161 1162 1163 1164 1165 1166

	/* ----------------
	 *	more sanity checks
	 * ----------------
	 */

1167
	tp.t_datamcxt = NULL;
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
	t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tp.t_len = ItemIdGetLength(lp);
	tp.t_self = *tid;
	ctid = tp.t_data->t_ctid;

	/* ----------------
	 *	check time qualification of tid
	 * ----------------
	 */

	HeapTupleSatisfies(&tp, relation, buffer, dp,
					   snapshot, 0, (ScanKey) NULL);

	linkend = true;
1182
	if ((t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
		!ItemPointerEquals(tid, &ctid))
		linkend = false;

	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
	ReleaseBuffer(buffer);

	if (tp.t_data == NULL)
	{
		if (linkend)
			return NULL;
1193
		return heap_get_latest_tid(relation, snapshot, &ctid);
1194 1195 1196 1197 1198
	}

	return tid;
}

1199
/* ----------------
1200
 *		heap_insert		- insert tuple
1201
 *
1202 1203
 *		The assignment of t_min (and thus the others) should be
 *		removed eventually.
1204
 *
1205 1206 1207 1208
 *		Currently places the tuple onto the last page.	If there is no room,
 *		it is placed on new pages.	(Heap relations)
 *		Note that concurrent inserts during a scan will probably have
 *		unexpected results, though this will be fixed eventually.
1209
 *
1210
 *		Fix to work with indexes.
1211 1212 1213 1214 1215
 * ----------------
 */
Oid
heap_insert(Relation relation, HeapTuple tup)
{
1216 1217 1218 1219 1220 1221
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_insert);
	IncrHeapAccessStat(global_insert);
1222

1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
	/* ----------------
	 *	If the object id of this tuple has already been assigned, trust
	 *	the caller.  There are a couple of ways this can happen.  At initial
	 *	db creation, the backend program sets oids for tuples.	When we
	 *	define an index, we set the oid.  Finally, in the future, we may
	 *	allow users to set their own object ids in order to support a
	 *	persistent object store (objects need to contain pointers to one
	 *	another).
	 * ----------------
	 */
1233
	if (!OidIsValid(tup->t_data->t_oid))
1234
	{
1235 1236
		tup->t_data->t_oid = newoid();
		LastOidProcessed = tup->t_data->t_oid;
1237 1238
	}
	else
1239
		CheckMaxObjectId(tup->t_data->t_oid);
1240

1241 1242 1243 1244 1245
	TransactionIdStore(GetCurrentTransactionId(), &(tup->t_data->t_xmin));
	tup->t_data->t_cmin = GetCurrentCommandId();
	StoreInvalidTransactionId(&(tup->t_data->t_xmax));
	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1246

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1247
	RelationPutHeapTupleAtEnd(relation, tup);
1248

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
#ifdef XLOG
	/* XLOG stuff */
	{
		xl_heap_insert	xlrec;
		xlrec.itid.dbId = relation->rd_lockInfo.lockRelId.dbId;
		xlrec.itid.relId = relation->rd_lockInfo.lockRelId.relId;
XXX		xlrec.itid.tid = tp.t_self;
		xlrec.t_natts = tup->t_data->t_natts;
		xlrec.t_oid = tup->t_data->t_oid;
		xlrec.t_hoff = tup->t_data->t_hoff;
		xlrec.mask = tup->t_data->t_infomask;
		
		XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT,
			(char*) xlrec, sizeof(xlrec), 
			(char*) tup->t_data + offsetof(HeapTupleHeaderData, tbits), 
			tup->t_len - offsetof(HeapTupleHeaderData, tbits));

		dp->pd_lsn = recptr;
	}
#endif

1270
	if (IsSystemRelationName(RelationGetRelationName(relation)))
1271
		RelationMark4RollbackHeapTuple(relation, tup);
1272

1273
	return tup->t_data->t_oid;
1274 1275
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1276 1277
/*
 *	heap_delete		- delete a tuple
1278
 */
1279
int
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1280
heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid)
1281
{
Bruce Momjian's avatar
Bruce Momjian committed
1282 1283 1284 1285 1286
	ItemId		lp;
	HeapTupleData tp;
	PageHeader	dp;
	Buffer		buffer;
	int			result;
1287

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1288
	/* increment access statistics */
1289 1290 1291 1292 1293
	IncrHeapAccessStat(local_delete);
	IncrHeapAccessStat(global_delete);

	Assert(ItemPointerIsValid(tid));

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1294
	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1295

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1296
	if (!BufferIsValid(buffer))
1297
		elog(ERROR, "heap_delete: failed ReadBuffer");
1298

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1299
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1300

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1301 1302
	dp = (PageHeader) BufferGetPage(buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1303
	tp.t_datamcxt = NULL;
1304 1305 1306
	tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tp.t_len = ItemIdGetLength(lp);
	tp.t_self = *tid;
Bruce Momjian's avatar
Bruce Momjian committed
1307

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1308 1309
l1:
	result = HeapTupleSatisfiesUpdate(&tp);
Bruce Momjian's avatar
Bruce Momjian committed
1310

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1311
	if (result == HeapTupleInvisible)
1312
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1313 1314 1315
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		elog(ERROR, "heap_delete: (am)invalid tid");
1316
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1317
	else if (result == HeapTupleBeingUpdated)
1318
	{
Bruce Momjian's avatar
Bruce Momjian committed
1319
		TransactionId xwait = tp.t_data->t_xmax;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1320

Bruce Momjian's avatar
Bruce Momjian committed
1321
		/* sleep until concurrent transaction ends */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1322 1323 1324 1325 1326 1327
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l1;
1328 1329 1330 1331 1332

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1333 1334 1335
		 */
		if (tp.t_data->t_xmax != xwait)
			goto l1;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
		if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
		if (ctid != NULL)
			*ctid = tp.t_data->t_ctid;
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		return result;
1355 1356
	}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
#ifdef XLOG
	/* XLOG stuff */
	{
		xl_heap_delete	xlrec;
		xlrec.dtid.dbId = relation->rd_lockInfo.lockRelId.dbId;
		xlrec.dtid.relId = relation->rd_lockInfo.lockRelId.relId;
		xlrec.dtid.tid = tp.t_self;
		XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE,
			(char*) xlrec, sizeof(xlrec), NULL, 0);

		dp->pd_lsn = recptr;
	}
#endif

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1371
	/* store transaction information of xact deleting the tuple */
1372 1373
	TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax));
	tp.t_data->t_cmax = GetCurrentCommandId();
Bruce Momjian's avatar
Bruce Momjian committed
1374 1375
	tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
							 HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
1376

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1377 1378 1379
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

	/* invalidate caches */
1380
	RelationInvalidateHeapTuple(relation, &tp);
1381

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1382
	WriteBuffer(buffer);
1383

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1384
	return HeapTupleMayBeUpdated;
1385 1386
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1387
/*
1388
 *	heap_update - replace a tuple
1389 1390
 */
int
1391
heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
1392
			ItemPointer ctid)
1393
{
Bruce Momjian's avatar
Bruce Momjian committed
1394 1395 1396 1397 1398
	ItemId		lp;
	HeapTupleData oldtup;
	PageHeader	dp;
	Buffer		buffer;
	int			result;
1399

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1400
	/* increment access statistics */
1401 1402 1403 1404 1405 1406 1407
	IncrHeapAccessStat(local_replace);
	IncrHeapAccessStat(global_replace);

	Assert(ItemPointerIsValid(otid));

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
	if (!BufferIsValid(buffer))
1408
		elog(ERROR, "amreplace: failed ReadBuffer");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1409
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1410

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1411
	dp = (PageHeader) BufferGetPage(buffer);
1412 1413
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));

1414
	oldtup.t_datamcxt = NULL;
1415 1416 1417
	oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
	oldtup.t_len = ItemIdGetLength(lp);
	oldtup.t_self = *otid;
1418

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1419 1420
l2:
	result = HeapTupleSatisfiesUpdate(&oldtup);
Bruce Momjian's avatar
Bruce Momjian committed
1421

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1422
	if (result == HeapTupleInvisible)
1423
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1424
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1425
		ReleaseBuffer(buffer);
1426
		elog(ERROR, "heap_update: (am)invalid tid");
1427
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1428
	else if (result == HeapTupleBeingUpdated)
1429
	{
Bruce Momjian's avatar
Bruce Momjian committed
1430
		TransactionId xwait = oldtup.t_data->t_xmax;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1431 1432 1433 1434 1435 1436 1437 1438

		/* sleep untill concurrent transaction ends */
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l2;
1439 1440 1441 1442 1443

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1444 1445 1446
		 */
		if (oldtup.t_data->t_xmax != xwait)
			goto l2;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463
		if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
		if (ctid != NULL)
			*ctid = oldtup.t_data->t_ctid;
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1464
		ReleaseBuffer(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1465
		return result;
1466 1467 1468
	}

	/* XXX order problems if not atomic assignment ??? */
1469 1470 1471 1472 1473
	newtup->t_data->t_oid = oldtup.t_data->t_oid;
	TransactionIdStore(GetCurrentTransactionId(), &(newtup->t_data->t_xmin));
	newtup->t_data->t_cmin = GetCurrentCommandId();
	StoreInvalidTransactionId(&(newtup->t_data->t_xmax));
	newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1474
	newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1475

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1476 1477 1478
	/* logically delete old item */
	TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
	oldtup.t_data->t_cmax = GetCurrentCommandId();
Bruce Momjian's avatar
Bruce Momjian committed
1479 1480
	oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
							 HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1481 1482

	/* insert new item */
1483
	if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1484
		RelationPutHeapTuple(relation, buffer, newtup);
1485 1486
	else
	{
Bruce Momjian's avatar
Bruce Momjian committed
1487

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1488
		/*
Bruce Momjian's avatar
Bruce Momjian committed
1489 1490 1491 1492
		 * New item won't fit on same page as old item, have to look for a
		 * new place to put it. Note that we have to unlock current buffer
		 * context - not good but RelationPutHeapTupleAtEnd uses extend
		 * lock.
1493
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1494 1495 1496
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		RelationPutHeapTupleAtEnd(relation, newtup);
		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1497
	}
1498 1499
	/* mark for rollback caches */
	RelationMark4RollbackHeapTuple(relation, newtup);
1500

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1501
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1502 1503
	 * New item in place, now record address of new tuple in t_ctid of old
	 * one.
1504
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1505
	oldtup.t_data->t_ctid = newtup->t_self;
1506

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1507 1508 1509
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

	/* invalidate caches */
1510
	RelationInvalidateHeapTuple(relation, &oldtup);
1511 1512 1513

	WriteBuffer(buffer);

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1514 1515 1516 1517 1518 1519 1520 1521 1522
	return HeapTupleMayBeUpdated;
}

/*
 *	heap_mark4update		- mark a tuple for update
 */
int
heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer)
{
Bruce Momjian's avatar
Bruce Momjian committed
1523 1524 1525 1526
	ItemPointer tid = &(tuple->t_self);
	ItemId		lp;
	PageHeader	dp;
	int			result;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540

	/* increment access statistics */
	IncrHeapAccessStat(local_mark4update);
	IncrHeapAccessStat(global_mark4update);

	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));

	if (!BufferIsValid(*buffer))
		elog(ERROR, "heap_mark4update: failed ReadBuffer");

	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);

	dp = (PageHeader) BufferGetPage(*buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1541
	tuple->t_datamcxt = NULL;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1542 1543
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);
Bruce Momjian's avatar
Bruce Momjian committed
1544

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1545 1546
l3:
	result = HeapTupleSatisfiesUpdate(tuple);
Bruce Momjian's avatar
Bruce Momjian committed
1547

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1548 1549 1550 1551 1552 1553 1554 1555
	if (result == HeapTupleInvisible)
	{
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(*buffer);
		elog(ERROR, "heap_mark4update: (am)invalid tid");
	}
	else if (result == HeapTupleBeingUpdated)
	{
Bruce Momjian's avatar
Bruce Momjian committed
1556
		TransactionId xwait = tuple->t_data->t_xmax;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1557 1558 1559 1560 1561 1562 1563 1564

		/* sleep untill concurrent transaction ends */
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l3;
1565 1566 1567 1568 1569

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1570 1571 1572
		 */
		if (tuple->t_data->t_xmax != xwait)
			goto l3;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
		if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(*buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1587
		tuple->t_self = tuple->t_data->t_ctid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		return result;
	}

	/* store transaction information of xact marking the tuple */
	TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
	tuple->t_data->t_cmax = GetCurrentCommandId();
	tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
	tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;

	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);

	WriteNoReleaseBuffer(*buffer);
1601

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1602
	return HeapTupleMayBeUpdated;
1603 1604 1605
}

/* ----------------
1606
 *		heap_markpos	- mark scan position
1607
 *
1608 1609 1610 1611 1612 1613
 *		Note:
 *				Should only one mark be maintained per scan at one time.
 *		Check if this can be done generally--say calls to get the
 *		next/previous tuple and NEVER pass struct scandesc to the
 *		user AM's.  Now, the mark is sent to the executor for safekeeping.
 *		Probably can store this info into a GENERAL scan structure.
1614
 *
1615 1616 1617
 *		May be best to change this call to store the marked position
 *		(up to 2?) in the scan structure itself.
 *		Fix to use the proper caching structure.
1618 1619 1620
 * ----------------
 */
void
1621
heap_markpos(HeapScanDesc scan)
1622
{
1623 1624 1625 1626 1627 1628 1629 1630 1631 1632

	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_markpos);
	IncrHeapAccessStat(global_markpos);

	/* Note: no locking manipulations needed */

1633
	if (scan->rs_ptup.t_data == NULL &&
1634
		BufferIsUnknown(scan->rs_pbuf))
1635
	{							/* == NONTUP */
1636 1637 1638 1639 1640 1641 1642 1643
		scan->rs_ptup = scan->rs_ctup;
		heapgettup(scan->rs_rd,
				   &(scan->rs_ptup),
				   -1,
				   &scan->rs_pbuf,
				   scan->rs_snapshot,
				   scan->rs_nkeys,
				   scan->rs_key);
1644 1645

	}
1646
	else if (scan->rs_ntup.t_data == NULL &&
1647
			 BufferIsUnknown(scan->rs_nbuf))
1648
	{							/* == NONTUP */
1649 1650 1651 1652 1653 1654 1655 1656
		scan->rs_ntup = scan->rs_ctup;
		heapgettup(scan->rs_rd,
				   &(scan->rs_ntup),
				   1,
				   &scan->rs_nbuf,
				   scan->rs_snapshot,
				   scan->rs_nkeys,
				   scan->rs_key);
1657 1658 1659 1660 1661 1662
	}

	/* ----------------
	 * Should not unpin the buffer pages.  They may still be in use.
	 * ----------------
	 */
1663 1664
	if (scan->rs_ptup.t_data != NULL)
		scan->rs_mptid = scan->rs_ptup.t_self;
1665
	else
1666
		ItemPointerSetInvalid(&scan->rs_mptid);
1667 1668
	if (scan->rs_ctup.t_data != NULL)
		scan->rs_mctid = scan->rs_ctup.t_self;
1669
	else
1670
		ItemPointerSetInvalid(&scan->rs_mctid);
1671 1672
	if (scan->rs_ntup.t_data != NULL)
		scan->rs_mntid = scan->rs_ntup.t_self;
1673
	else
1674
		ItemPointerSetInvalid(&scan->rs_mntid);
1675 1676 1677
}

/* ----------------
1678
 *		heap_restrpos	- restore position to marked location
1679
 *
1680 1681 1682 1683 1684
 *		Note:  there are bad side effects here.  If we were past the end
 *		of a relation when heapmarkpos is called, then if the relation is
 *		extended via insert, then the next call to heaprestrpos will set
 *		cause the added tuples to be visible when the scan continues.
 *		Problems also arise if the TID's are rearranged!!!
1685
 *
1686 1687 1688
 *		Now pins buffer once for each valid tuple pointer (rs_ptup,
 *		rs_ctup, rs_ntup) referencing it.
 *		 - 01/13/94
1689 1690
 *
 * XXX	might be better to do direct access instead of
1691
 *		using the generality of heapgettup().
1692 1693 1694 1695 1696 1697 1698
 *
 * XXX It is very possible that when a scan is restored, that a tuple
 * XXX which previously qualified may fail for time range purposes, unless
 * XXX some form of locking exists (ie., portals currently can act funny.
 * ----------------
 */
void
1699
heap_restrpos(HeapScanDesc scan)
1700
{
1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_restrpos);
	IncrHeapAccessStat(global_restrpos);

	/* XXX no amrestrpos checking that ammarkpos called */

	/* Note: no locking manipulations needed */

1712
	unpinscan(scan);
1713 1714

	/* force heapgettup to pin buffer for each loaded tuple */
1715 1716 1717
	scan->rs_pbuf = InvalidBuffer;
	scan->rs_cbuf = InvalidBuffer;
	scan->rs_nbuf = InvalidBuffer;
1718

1719
	if (!ItemPointerIsValid(&scan->rs_mptid))
1720 1721
	{
		scan->rs_ptup.t_datamcxt = NULL;
1722
		scan->rs_ptup.t_data = NULL;
1723
	}
1724 1725
	else
	{
1726
		scan->rs_ptup.t_self = scan->rs_mptid;
1727
		scan->rs_ptup.t_datamcxt = NULL;
1728 1729 1730 1731 1732 1733 1734 1735
		scan->rs_ptup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ptup),
				   0,
				   &(scan->rs_pbuf),
				   false,
				   0,
				   (ScanKey) NULL);
1736 1737
	}

1738
	if (!ItemPointerIsValid(&scan->rs_mctid))
1739 1740
	{
		scan->rs_ctup.t_datamcxt = NULL;
1741
		scan->rs_ctup.t_data = NULL;
1742
	}
1743 1744
	else
	{
1745
		scan->rs_ctup.t_self = scan->rs_mctid;
1746
		scan->rs_ctup.t_datamcxt = NULL;
1747 1748 1749 1750 1751 1752 1753 1754
		scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ctup),
				   0,
				   &(scan->rs_cbuf),
				   false,
				   0,
				   (ScanKey) NULL);
1755 1756
	}

1757
	if (!ItemPointerIsValid(&scan->rs_mntid))
1758 1759
	{
		scan->rs_ntup.t_datamcxt = NULL;
1760
		scan->rs_ntup.t_data = NULL;
1761
	}
1762 1763
	else
	{
1764
		scan->rs_ntup.t_datamcxt = NULL;
1765 1766 1767 1768 1769 1770 1771 1772 1773
		scan->rs_ntup.t_self = scan->rs_mntid;
		scan->rs_ntup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ntup),
				   0,
				   &scan->rs_nbuf,
				   false,
				   0,
				   (ScanKey) NULL);
1774
	}
1775
}