heapam.c 72.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * heapam.c
4
 *	  heap access method code
5
 *
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.183 2005/02/20 21:46:47 tgl Exp $
12 13 14
 *
 *
 * INTERFACE ROUTINES
15
 *		relation_open	- open any relation by relation OID
Bruce Momjian's avatar
Bruce Momjian committed
16
 *		relation_openrv - open any relation specified by a RangeVar
17
 *		relation_openr	- open a system relation by name
18 19
 *		relation_close	- close any relation
 *		heap_open		- open a heap relation by relation OID
20 21
 *		heap_openrv		- open a heap relation specified by a RangeVar
 *		heap_openr		- open a system heap relation by name
22
 *		heap_close		- (now just a macro for relation_close)
23 24 25 26
 *		heap_beginscan	- begin relation scan
 *		heap_rescan		- restart a relation scan
 *		heap_endscan	- end relation scan
 *		heap_getnext	- retrieve next tuple in scan
27
 *		heap_fetch		- retrieve tuple with tid
28 29
 *		heap_insert		- insert tuple into a relation
 *		heap_delete		- delete a tuple from a relation
30
 *		heap_update		- replace a tuple in a relation with another tuple
31 32 33
 *		heap_markpos	- mark scan position
 *		heap_restrpos	- restore position to marked location
 *
34
 * NOTES
35 36 37
 *	  This file contains the heap_ routines which implement
 *	  the POSTGRES heap access method used for all POSTGRES
 *	  relations.
38 39 40
 *
 *-------------------------------------------------------------------------
 */
41
#include "postgres.h"
42

43 44
#include "access/heapam.h"
#include "access/hio.h"
Tom Lane's avatar
Tom Lane committed
45
#include "access/tuptoaster.h"
Bruce Momjian's avatar
Bruce Momjian committed
46
#include "access/valid.h"
47
#include "access/xlogutils.h"
48
#include "catalog/catalog.h"
49
#include "catalog/namespace.h"
Bruce Momjian's avatar
Bruce Momjian committed
50 51 52
#include "miscadmin.h"
#include "utils/inval.h"
#include "utils/relcache.h"
53
#include "pgstat.h"
54

55

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
56
/* comments are in heap_update */
57
static xl_heaptid _locked_tuple_;
58
static void _heap_unlock_tuple(void *data);
59 60
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
	   ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
61

Marc G. Fournier's avatar
Marc G. Fournier committed
62

63
/* ----------------------------------------------------------------
64
 *						 heap support routines
65 66 67 68
 * ----------------------------------------------------------------
 */

/* ----------------
69
 *		initscan - scan code common to heap_beginscan and heap_rescan
70 71 72
 * ----------------
 */
static void
73
initscan(HeapScanDesc scan, ScanKey key)
74
{
75
	/*
76 77
	 * Determine the number of blocks we have to scan.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
78 79 80
	 * It is sufficient to do this once at scan start, since any tuples added
	 * while the scan is in progress will be invisible to my transaction
	 * anyway...
81
	 */
82
	scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
83

84 85 86
	scan->rs_ctup.t_datamcxt = NULL;
	scan->rs_ctup.t_data = NULL;
	scan->rs_cbuf = InvalidBuffer;
87 88

	/* we don't have a marked position... */
89
	ItemPointerSetInvalid(&(scan->rs_mctid));
90

91 92
	/*
	 * copy the scan key, if appropriate
93
	 */
94
	if (key != NULL)
95
		memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
96 97 98
}

/* ----------------
99
 *		heapgettup - fetch next heap tuple
100
 *
101 102
 *		routine used by heap_getnext() which does most of the
 *		real work in scanning tuples.
103
 *
104 105 106 107
 *		The passed-in *buffer must be either InvalidBuffer or the pinned
 *		current page of the scan.  If we have to move to another page,
 *		we will unpin this buffer (if valid).  On return, *buffer is either
 *		InvalidBuffer or the ID of a pinned buffer.
108 109
 * ----------------
 */
110
static void
111
heapgettup(Relation relation,
112
		   int dir,
113
		   HeapTuple tuple,
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
114
		   Buffer *buffer,
115
		   Snapshot snapshot,
116
		   int nkeys,
117 118
		   ScanKey key,
		   BlockNumber pages)
119
{
Bruce Momjian's avatar
Bruce Momjian committed
120 121
	ItemId		lpp;
	Page		dp;
122
	BlockNumber page;
Bruce Momjian's avatar
Bruce Momjian committed
123 124 125
	int			lines;
	OffsetNumber lineoff;
	int			linesleft;
126
	ItemPointer tid;
127

128
	tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
129

130 131
	/*
	 * debugging stuff
132
	 *
133 134
	 * check validity of arguments, here and for other functions too Note: no
	 * locking manipulations needed--this is a local function
135
	 */
136 137
#ifdef	HEAPDEBUGALL
	if (ItemPointerIsValid(tid))
138
		elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
139 140 141
			 RelationGetRelationName(relation), tid, tid->ip_blkid,
			 tid->ip_posid, dir);
	else
142
		elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
143
			 RelationGetRelationName(relation), tid, dir);
144

145 146 147
	elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);

	elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
148
		 relation->rd_rel->relkind, RelationGetRelationName(relation),
149
		 snapshot);
150
#endif   /* HEAPDEBUGALL */
151 152

	if (!ItemPointerIsValid(tid))
153
	{
154
		Assert(!PointerIsValid(tid));
155 156
		tid = NULL;
	}
157

158
	tuple->t_tableOid = relation->rd_id;
159

160 161
	/*
	 * return null immediately if relation is empty
162
	 */
163
	if (pages == 0)
164
	{
165 166 167
		if (BufferIsValid(*buffer))
			ReleaseBuffer(*buffer);
		*buffer = InvalidBuffer;
168
		tuple->t_datamcxt = NULL;
169 170 171
		tuple->t_data = NULL;
		return;
	}
172

173 174
	/*
	 * calculate next starting lineoff, given scan direction
175
	 */
176
	if (dir == 0)
177
	{
178
		/*
179
		 * ``no movement'' scan direction: refetch same tuple
180
		 */
181
		if (tid == NULL)
182
		{
183 184
			if (BufferIsValid(*buffer))
				ReleaseBuffer(*buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
185
			*buffer = InvalidBuffer;
186
			tuple->t_datamcxt = NULL;
187 188
			tuple->t_data = NULL;
			return;
189
		}
190

191 192
		*buffer = ReleaseAndReadBuffer(*buffer,
									   relation,
193
									   ItemPointerGetBlockNumber(tid));
194

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
195 196 197
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
198 199 200
		lineoff = ItemPointerGetOffsetNumber(tid);
		lpp = PageGetItemId(dp, lineoff);

201
		tuple->t_datamcxt = NULL;
202 203
		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
		tuple->t_len = ItemIdGetLength(lpp);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
204
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
205

206
		return;
207
	}
208 209
	else if (dir < 0)
	{
210 211
		/*
		 * reverse scan direction
212 213 214 215 216 217 218 219 220
		 */
		if (tid == NULL)
		{
			page = pages - 1;	/* final page */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
		}
221 222

		Assert(page < pages);
223

224 225
		*buffer = ReleaseAndReadBuffer(*buffer,
									   relation,
226
									   page);
227

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
228 229 230
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
231 232 233 234
		lines = PageGetMaxOffsetNumber(dp);
		if (tid == NULL)
		{
			lineoff = lines;	/* final offnum */
235
		}
236 237 238 239 240 241 242 243 244
		else
		{
			lineoff =			/* previous offnum */
				OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
		}
		/* page and lineoff now reference the physically previous tid */
	}
	else
	{
245 246
		/*
		 * forward scan direction
247
		 */
248
		if (tid == NULL)
249 250 251 252 253 254 255 256 257 258 259
		{
			page = 0;			/* first page */
			lineoff = FirstOffsetNumber;		/* first offnum */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
			lineoff =			/* next offnum */
				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
		}

260
		Assert(page < pages);
261

262 263
		*buffer = ReleaseAndReadBuffer(*buffer,
									   relation,
264
									   page);
265

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
266 267 268
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
269
		lines = PageGetMaxOffsetNumber(dp);
270
		/* page and lineoff now reference the physically next tid */
271
	}
272 273 274

	/* 'dir' is now non-zero */

275 276 277
	/*
	 * calculate line pointer and number of remaining items to check on
	 * this page.
278
	 */
279 280 281 282 283 284
	lpp = PageGetItemId(dp, lineoff);
	if (dir < 0)
		linesleft = lineoff - 1;
	else
		linesleft = lines - lineoff;

285 286 287
	/*
	 * advance the scan until we find a qualifying tuple or run out of
	 * stuff to scan
288
	 */
289 290 291 292
	for (;;)
	{
		while (linesleft >= 0)
		{
293
			if (ItemIdIsUsed(lpp))
294
			{
Bruce Momjian's avatar
Bruce Momjian committed
295
				bool		valid;
296

297
				tuple->t_datamcxt = NULL;
298 299 300
				tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
				tuple->t_len = ItemIdGetLength(lpp);
				ItemPointerSet(&(tuple->t_self), page, lineoff);
301 302 303

				/*
				 * if current tuple qualifies, return it.
304
				 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
305
				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
306 307
								   snapshot, nkeys, key, valid);
				if (valid)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
308 309
				{
					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
310
					return;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
311
				}
312 313
			}

314 315
			/*
			 * otherwise move to the next item on the page
316 317 318 319 320
			 */
			--linesleft;
			if (dir < 0)
			{
				--lpp;			/* move back in this page's ItemId array */
321
				--lineoff;
322 323 324
			}
			else
			{
Bruce Momjian's avatar
Bruce Momjian committed
325 326
				++lpp;			/* move forward in this page's ItemId
								 * array */
327
				++lineoff;
328 329 330
			}
		}

331 332
		/*
		 * if we get here, it means we've exhausted the items on this page
333
		 * and it's time to move to the next.
334
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
335
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
336

337
		/*
338
		 * return NULL if we've exhausted all the pages
339
		 */
340
		if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
341
		{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
342 343 344
			if (BufferIsValid(*buffer))
				ReleaseBuffer(*buffer);
			*buffer = InvalidBuffer;
345
			tuple->t_datamcxt = NULL;
346 347
			tuple->t_data = NULL;
			return;
348 349
		}

350 351 352 353
		page = (dir < 0) ? (page - 1) : (page + 1);

		Assert(page < pages);

354 355
		*buffer = ReleaseAndReadBuffer(*buffer,
									   relation,
356
									   page);
357

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
358 359
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
		dp = (Page) BufferGetPage(*buffer);
360
		lines = PageGetMaxOffsetNumber((Page) dp);
361 362
		linesleft = lines - 1;
		if (dir < 0)
363 364 365 366
		{
			lineoff = lines;
			lpp = PageGetItemId(dp, lines);
		}
367
		else
368 369
		{
			lineoff = FirstOffsetNumber;
370
			lpp = PageGetItemId(dp, FirstOffsetNumber);
371
		}
372 373 374 375
	}
}


376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
#if defined(DISABLE_COMPLEX_MACRO)
/*
 * This is formatted so oddly so that the correspondence to the macro
 * definition in access/heapam.h is maintained.
 */
Datum
fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
			bool *isnull)
{
	return (
			(attnum) > 0 ?
			(
			 ((isnull) ? (*(isnull) = false) : (dummyret) NULL),
			 HeapTupleNoNulls(tup) ?
			 (
391
			  (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
392
			  (
393 394 395
			   fetchatt((tupleDesc)->attrs[(attnum) - 1],
						(char *) (tup)->t_data + (tup)->t_data->t_hoff +
						(tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
			   )
			  :
			  nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
			  )
			 :
			 (
			  att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
			  (
			   ((isnull) ? (*(isnull) = true) : (dummyret) NULL),
			   (Datum) NULL
			   )
			  :
			  (
			   nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
			   )
			  )
			 )
			:
			(
			 (Datum) NULL
			 )
417
		);
418
}
419
#endif   /* defined(DISABLE_COMPLEX_MACRO) */
420 421


422
/* ----------------------------------------------------------------
423
 *					 heap access method interface
424 425
 * ----------------------------------------------------------------
 */
426

427
/* ----------------
428
 *		relation_open - open any relation by relation OID
429
 *
430
 *		If lockmode is not "NoLock", the specified kind of lock is
431 432 433 434
 *		obtained on the relation.  (Generally, NoLock should only be
 *		used if the caller knows it has some appropriate lock on the
 *		relation already.)
 *
435
 *		An error is raised if the relation does not exist.
436 437 438
 *
 *		NB: a "relation" is anything with a pg_class entry.  The caller is
 *		expected to check whether the relkind is something it can handle.
439 440 441
 * ----------------
 */
Relation
442
relation_open(Oid relationId, LOCKMODE lockmode)
443
{
444
	Relation	r;
445

446 447 448 449
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

	/* The relcache does all the real work... */
	r = RelationIdGetRelation(relationId);
450

451
	if (!RelationIsValid(r))
452
		elog(ERROR, "could not open relation with OID %u", relationId);
453

454 455
	if (lockmode != NoLock)
		LockRelation(r, lockmode);
456

457
	return r;
458 459
}

460 461 462 463 464 465 466
/* ----------------
 *		conditional_relation_open - open with option not to wait
 *
 *		As above, but if nowait is true, then throw an error rather than
 *		waiting when the lock is not immediately obtainable.
 * ----------------
 */
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
Relation
conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
{
	Relation	r;

	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

	/* The relcache does all the real work... */
	r = RelationIdGetRelation(relationId);

	if (!RelationIsValid(r))
		elog(ERROR, "could not open relation with OID %u", relationId);

	if (lockmode != NoLock)
	{
		if (nowait)
		{
			if (!ConditionalLockRelation(r, lockmode))
485 486
				ereport(ERROR,
						(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
Peter Eisentraut's avatar
Peter Eisentraut committed
487
						 errmsg("could not obtain lock on relation \"%s\"",
488
								RelationGetRelationName(r))));
489 490 491 492 493 494 495 496
		}
		else
			LockRelation(r, lockmode);
	}

	return r;
}

497
/* ----------------
498
 *		relation_openrv - open any relation specified by a RangeVar
499
 *
500
 *		As above, but the relation is specified by a RangeVar.
501 502 503
 * ----------------
 */
Relation
504
relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
505
{
506
	Oid			relOid;
507

508
	/*
509
	 * In bootstrap mode, don't do any namespace processing.
510
	 */
511 512 513 514 515
	if (IsBootstrapProcessingMode())
	{
		Assert(relation->schemaname == NULL);
		return relation_openr(relation->relname, lockmode);
	}
516

517 518
	/*
	 * Check for shared-cache-inval messages before trying to open the
519 520
	 * relation.  This is needed to cover the case where the name
	 * identifies a rel that has been dropped and recreated since the
521
	 * start of our transaction: if we don't flush the old syscache entry
522 523 524
	 * then we'll latch onto that entry and suffer an error when we do
	 * LockRelation. Note that relation_open does not need to do this,
	 * since a relation's OID never changes.
525 526 527 528 529 530 531
	 *
	 * We skip this if asked for NoLock, on the assumption that the caller
	 * has already ensured some appropriate lock is held.
	 */
	if (lockmode != NoLock)
		AcceptInvalidationMessages();

532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
	/* Look up the appropriate relation using namespace search */
	relOid = RangeVarGetRelid(relation, false);

	/* Let relation_open do the rest */
	return relation_open(relOid, lockmode);
}

/* ----------------
 *		relation_openr - open a system relation specified by name.
 *
 *		As above, but the relation is specified by an unqualified name;
 *		it is assumed to live in the system catalog namespace.
 * ----------------
 */
Relation
relation_openr(const char *sysRelationName, LOCKMODE lockmode)
{
	Relation	r;

	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

	/*
	 * We assume we should not need to worry about the rel's OID changing,
	 * hence no need for AcceptInvalidationMessages here.
	 */

558
	/* The relcache does all the real work... */
559
	r = RelationSysNameGetRelation(sysRelationName);
560

561
	if (!RelationIsValid(r))
562
		elog(ERROR, "could not open relation \"%s\"", sysRelationName);
563 564 565 566 567 568 569 570

	if (lockmode != NoLock)
		LockRelation(r, lockmode);

	return r;
}

/* ----------------
571 572 573
 *		relation_close - close any relation
 *
 *		If lockmode is not "NoLock", we first release the specified lock.
574
 *
575 576
 *		Note that it is often sensible to hold a lock beyond relation_close;
 *		in that case, the lock is released automatically at xact end.
577 578
 * ----------------
 */
579 580
void
relation_close(Relation relation, LOCKMODE lockmode)
581
{
582
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
583

584 585
	if (lockmode != NoLock)
		UnlockRelation(relation, lockmode);
586

587 588
	/* The relcache does the real work... */
	RelationClose(relation);
589
}
590

591

592
/* ----------------
593
 *		heap_open - open a heap relation by relation OID
594
 *
595 596 597
 *		This is essentially relation_open plus check that the relation
 *		is not an index or special relation.  (The caller should also check
 *		that it's not a view before assuming it has storage.)
598 599 600
 * ----------------
 */
Relation
601
heap_open(Oid relationId, LOCKMODE lockmode)
602 603
{
	Relation	r;
604

605
	r = relation_open(relationId, lockmode);
606

607
	if (r->rd_rel->relkind == RELKIND_INDEX)
608 609
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
610
				 errmsg("\"%s\" is an index",
611
						RelationGetRelationName(r))));
612
	else if (r->rd_rel->relkind == RELKIND_SPECIAL)
613 614 615 616
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a special relation",
						RelationGetRelationName(r))));
617
	else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
618 619 620 621
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a composite type",
						RelationGetRelationName(r))));
622

623
	pgstat_initstats(&r->pgstat_info, r);
624

625
	return r;
626 627 628
}

/* ----------------
629 630 631 632 633 634 635 636 637 638 639 640 641 642
 *		heap_openrv - open a heap relation specified
 *		by a RangeVar node
 *
 *		As above, but relation is specified by a RangeVar.
 * ----------------
 */
Relation
heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
{
	Relation	r;

	r = relation_openrv(relation, lockmode);

	if (r->rd_rel->relkind == RELKIND_INDEX)
643 644
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
645
				 errmsg("\"%s\" is an index",
646
						RelationGetRelationName(r))));
647
	else if (r->rd_rel->relkind == RELKIND_SPECIAL)
648 649 650 651
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a special relation",
						RelationGetRelationName(r))));
652
	else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
653 654 655 656
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a composite type",
						RelationGetRelationName(r))));
657 658 659 660 661 662 663 664

	pgstat_initstats(&r->pgstat_info, r);

	return r;
}

/* ----------------
 *		heap_openr - open a system heap relation specified by name.
665
 *
666 667
 *		As above, but the relation is specified by an unqualified name;
 *		it is assumed to live in the system catalog namespace.
668 669
 * ----------------
 */
670
Relation
671
heap_openr(const char *sysRelationName, LOCKMODE lockmode)
672
{
673
	Relation	r;
674

675
	r = relation_openr(sysRelationName, lockmode);
676

677
	if (r->rd_rel->relkind == RELKIND_INDEX)
678 679
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
680
				 errmsg("\"%s\" is an index",
681
						RelationGetRelationName(r))));
682
	else if (r->rd_rel->relkind == RELKIND_SPECIAL)
683 684 685 686
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a special relation",
						RelationGetRelationName(r))));
687
	else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
688 689 690 691
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is a composite type",
						RelationGetRelationName(r))));
692

693 694 695
	pgstat_initstats(&r->pgstat_info, r);

	return r;
696 697 698 699
}


/* ----------------
700
 *		heap_beginscan	- begin relation scan
701 702 703
 * ----------------
 */
HeapScanDesc
704 705
heap_beginscan(Relation relation, Snapshot snapshot,
			   int nkeys, ScanKey key)
706
{
707
	HeapScanDesc scan;
708

709 710
	/*
	 * increment relation ref count while scanning relation
711
	 *
712 713 714
	 * This is just to make really sure the relcache entry won't go away
	 * while the scan has a pointer to it.	Caller should be holding the
	 * rel open anyway, so this is redundant in all normal scenarios...
715
	 */
716
	RelationIncrementReferenceCount(relation);
717

718 719
	/*
	 * allocate and initialize scan descriptor
720
	 */
721
	scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
722

723
	scan->rs_rd = relation;
724
	scan->rs_snapshot = snapshot;
725
	scan->rs_nkeys = nkeys;
726

727
	/*
728 729
	 * we do this here instead of in initscan() because heap_rescan also
	 * calls initscan() and we don't want to allocate memory again
730
	 */
731
	if (nkeys > 0)
732
		scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
733
	else
734
		scan->rs_key = NULL;
735

736 737 738
	pgstat_initstats(&scan->rs_pgstat_info, relation);

	initscan(scan, key);
739

740
	return scan;
741 742 743
}

/* ----------------
744
 *		heap_rescan		- restart a relation scan
745 746 747
 * ----------------
 */
void
748
heap_rescan(HeapScanDesc scan,
749
			ScanKey key)
750
{
751 752
	/*
	 * unpin scan buffers
753
	 */
754 755
	if (BufferIsValid(scan->rs_cbuf))
		ReleaseBuffer(scan->rs_cbuf);
756

757 758
	/*
	 * reinitialize scan descriptor
759
	 */
760
	initscan(scan, key);
761 762

	pgstat_reset_heap_scan(&scan->rs_pgstat_info);
763 764 765
}

/* ----------------
766
 *		heap_endscan	- end relation scan
767
 *
768 769
 *		See how to integrate with index scans.
 *		Check handling if reldesc caching.
770 771 772
 * ----------------
 */
void
773
heap_endscan(HeapScanDesc scan)
774
{
775 776
	/* Note: no locking manipulations needed */

777 778
	/*
	 * unpin scan buffers
779
	 */
780 781
	if (BufferIsValid(scan->rs_cbuf))
		ReleaseBuffer(scan->rs_cbuf);
782

783 784
	/*
	 * decrement relation reference count and free scan descriptor storage
785
	 */
786
	RelationDecrementReferenceCount(scan->rs_rd);
787

788 789 790
	if (scan->rs_key)
		pfree(scan->rs_key);

791
	pfree(scan);
792 793 794
}

/* ----------------
795
 *		heap_getnext	- retrieve next tuple in scan
796
 *
797
 *		Fix to work with index relations.
798 799
 *		We don't return the buffer anymore, but you can get it from the
 *		returned HeapTuple.
800 801 802 803 804
 * ----------------
 */

#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
805
	elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
806
		 RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
807
#define HEAPDEBUG_2 \
808
	elog(DEBUG2, "heap_getnext returning EOS")
809
#define HEAPDEBUG_3 \
810
	elog(DEBUG2, "heap_getnext returning tuple")
811 812 813 814
#else
#define HEAPDEBUG_1
#define HEAPDEBUG_2
#define HEAPDEBUG_3
815
#endif   /* !defined(HEAPDEBUGALL) */
816 817


818
HeapTuple
819
heap_getnext(HeapScanDesc scan, ScanDirection direction)
820
{
821 822 823 824
	/* Note: no locking manipulations needed */

	HEAPDEBUG_1;				/* heap_getnext( info ) */

825 826 827 828 829 830 831 832 833
	/*
	 * Note: we depend here on the -1/0/1 encoding of ScanDirection.
	 */
	heapgettup(scan->rs_rd,
			   (int) direction,
			   &(scan->rs_ctup),
			   &(scan->rs_cbuf),
			   scan->rs_snapshot,
			   scan->rs_nkeys,
834 835
			   scan->rs_key,
			   scan->rs_nblocks);
836 837

	if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
838
	{
839 840
		HEAPDEBUG_2;			/* heap_getnext returning EOS */
		return NULL;
841 842
	}

843 844
	pgstat_count_heap_scan(&scan->rs_pgstat_info);

845 846 847
	/*
	 * if we get here it means we have a new current scan tuple, so point
	 * to the proper return buffer and return the tuple.
848
	 */
849

850
	HEAPDEBUG_3;				/* heap_getnext returning tuple */
851

852 853 854
	if (scan->rs_ctup.t_data != NULL)
		pgstat_count_heap_getnext(&scan->rs_pgstat_info);

855
	return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
856 857
}

858 859
/*
 *	heap_fetch		- retrieve tuple with given tid
860
 *
861 862 863
 * On entry, tuple->t_self is the TID to fetch.  We pin the buffer holding
 * the tuple, fill in the remaining fields of *tuple, and check the tuple
 * against the specified snapshot.
864
 *
865 866 867
 * If successful (tuple found and passes snapshot time qual), then *userbuf
 * is set to the buffer holding the tuple and TRUE is returned.  The caller
 * must unpin the buffer when done with the tuple.
868
 *
869 870
 * If the tuple is not found (ie, item number references a deleted slot),
 * then tuple->t_data is set to NULL and FALSE is returned.
871
 *
872 873 874 875 876 877 878
 * If the tuple is found but fails the time qual check, then FALSE is returned
 * but tuple->t_data is left pointing to the tuple.
 *
 * keep_buf determines what is done with the buffer in the FALSE-result cases.
 * When the caller specifies keep_buf = true, we retain the pin on the buffer
 * and return it in *userbuf (so the caller must eventually unpin it); when
 * keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
879
 *
880
 * It is somewhat inconsistent that we ereport() on invalid block number but
881 882 883 884
 * return false on invalid item number.  This is historical.  The only
 * justification I can see is that the caller can relatively easily check the
 * block number for validity, but cannot check the item number without reading
 * the page himself.
885
 */
886
bool
887
heap_fetch(Relation relation,
888
		   Snapshot snapshot,
889
		   HeapTuple tuple,
890
		   Buffer *userbuf,
891
		   bool keep_buf,
892
		   PgStat_Info *pgstat_info)
893 894 895 896 897 898 899 900 901 902 903 904 905 906
{
	/* Assume *userbuf is undefined on entry */
	*userbuf = InvalidBuffer;
	return heap_release_fetch(relation, snapshot, tuple,
							  userbuf, keep_buf, pgstat_info);
}

/*
 *	heap_release_fetch		- retrieve tuple with given tid
 *
 * This has the same API as heap_fetch except that if *userbuf is not
 * InvalidBuffer on entry, that buffer will be released before reading
 * the new page.  This saves a separate ReleaseBuffer step and hence
 * one entry into the bufmgr when looping through multiple fetches.
907 908
 * Also, if *userbuf is the same buffer that holds the target tuple,
 * we avoid bufmgr manipulation altogether.
909 910 911 912 913 914 915 916
 */
bool
heap_release_fetch(Relation relation,
				   Snapshot snapshot,
				   HeapTuple tuple,
				   Buffer *userbuf,
				   bool keep_buf,
				   PgStat_Info *pgstat_info)
917
{
918
	ItemPointer tid = &(tuple->t_self);
Bruce Momjian's avatar
Bruce Momjian committed
919 920 921 922
	ItemId		lp;
	Buffer		buffer;
	PageHeader	dp;
	OffsetNumber offnum;
923
	bool		valid;
924

925
	/*
926
	 * get the buffer from the relation descriptor. Note that this does a
927
	 * buffer pin, and releases the old *userbuf if not InvalidBuffer.
928
	 */
929 930
	buffer = ReleaseAndReadBuffer(*userbuf, relation,
								  ItemPointerGetBlockNumber(tid));
931

932 933 934
	/*
	 * Need share lock on buffer to examine tuple commit status.
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
935
	LockBuffer(buffer, BUFFER_LOCK_SHARE);
936

937 938
	/*
	 * get the item line pointer corresponding to the requested tid
939 940 941 942 943
	 */
	dp = (PageHeader) BufferGetPage(buffer);
	offnum = ItemPointerGetOffsetNumber(tid);
	lp = PageGetItemId(dp, offnum);

944
	/*
945 946 947
	 * must check for deleted tuple (see for example analyze.c, which is
	 * careful to pass an offnum in range, but doesn't know if the offnum
	 * actually corresponds to an undeleted tuple).
948
	 */
949 950
	if (!ItemIdIsUsed(lp))
	{
951
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
952 953 954 955 956 957 958
		if (keep_buf)
			*userbuf = buffer;
		else
		{
			ReleaseBuffer(buffer);
			*userbuf = InvalidBuffer;
		}
959 960 961
		tuple->t_datamcxt = NULL;
		tuple->t_data = NULL;
		return false;
962
	}
963

964 965 966
	/*
	 * fill in *tuple fields
	 */
967
	tuple->t_datamcxt = NULL;
968 969
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);
970
	tuple->t_tableOid = relation->rd_id;
971

972
	/*
973
	 * check time qualification of tuple, then release lock
974
	 */
975
	HeapTupleSatisfies(tuple, relation, buffer, dp,
976
					   snapshot, 0, NULL, valid);
977

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
978 979
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

980
	if (valid)
981
	{
982 983 984
		/*
		 * All checks passed, so return the tuple as valid. Caller is now
		 * responsible for releasing the buffer.
985 986
		 */
		*userbuf = buffer;
987

988
		/*
Bruce Momjian's avatar
Bruce Momjian committed
989 990
		 * Count the successful fetch in *pgstat_info if given, otherwise
		 * in the relation's default statistics area.
991 992 993
		 */
		if (pgstat_info != NULL)
			pgstat_count_heap_fetch(pgstat_info);
994 995
		else
			pgstat_count_heap_fetch(&relation->pgstat_info);
996 997

		return true;
998
	}
999 1000 1001 1002

	/* Tuple failed time qual, but maybe caller wants to see it anyway. */
	if (keep_buf)
		*userbuf = buffer;
1003 1004 1005 1006
	else
	{
		ReleaseBuffer(buffer);
		*userbuf = InvalidBuffer;
1007 1008 1009
	}

	return false;
1010 1011
}

1012
/*
1013 1014 1015 1016
 *	heap_get_latest_tid -  get the latest tid of a specified tuple
 */
ItemPointer
heap_get_latest_tid(Relation relation,
1017 1018
					Snapshot snapshot,
					ItemPointer tid)
1019
{
1020
	ItemId		lp = NULL;
1021 1022
	Buffer		buffer;
	PageHeader	dp;
1023 1024 1025 1026 1027
	OffsetNumber offnum;
	HeapTupleData tp;
	HeapTupleHeader t_data;
	ItemPointerData ctid;
	bool		invalidBlock,
1028 1029
				linkend,
				valid;
1030

1031 1032 1033
	/*
	 * get the buffer from the relation descriptor Note that this does a
	 * buffer pin.
1034 1035 1036 1037
	 */
	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
	LockBuffer(buffer, BUFFER_LOCK_SHARE);

1038 1039
	/*
	 * get the item line pointer corresponding to the requested tid
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
	 */
	dp = (PageHeader) BufferGetPage(buffer);
	offnum = ItemPointerGetOffsetNumber(tid);
	invalidBlock = true;
	if (!PageIsNew(dp))
	{
		lp = PageGetItemId(dp, offnum);
		if (ItemIdIsUsed(lp))
			invalidBlock = false;
	}
	if (invalidBlock)
1051
	{
1052 1053 1054
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		return NULL;
1055
	}
1056

1057 1058
	/*
	 * more sanity checks
1059 1060
	 */

1061
	tp.t_datamcxt = NULL;
1062 1063 1064 1065 1066
	t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tp.t_len = ItemIdGetLength(lp);
	tp.t_self = *tid;
	ctid = tp.t_data->t_ctid;

1067 1068
	/*
	 * check time qualification of tid
1069 1070 1071
	 */

	HeapTupleSatisfies(&tp, relation, buffer, dp,
1072
					   snapshot, 0, NULL, valid);
1073 1074

	linkend = true;
1075
	if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
1076 1077 1078 1079 1080 1081
		!ItemPointerEquals(tid, &ctid))
		linkend = false;

	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
	ReleaseBuffer(buffer);

1082
	if (!valid)
1083 1084 1085
	{
		if (linkend)
			return NULL;
1086 1087
		heap_get_latest_tid(relation, snapshot, &ctid);
		*tid = ctid;
1088 1089 1090 1091 1092
	}

	return tid;
}

1093 1094
/*
 *	heap_insert		- insert tuple into a heap
1095
 *
1096 1097
 * The new tuple is stamped with current transaction ID and the specified
 * command ID.
1098 1099
 */
Oid
1100
heap_insert(Relation relation, HeapTuple tup, CommandId cid)
1101
{
1102
	TransactionId xid = GetCurrentTransactionId();
1103
	Buffer		buffer;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1104

1105 1106
	if (relation->rd_rel->relhasoids)
	{
1107 1108 1109 1110
#ifdef NOT_USED
		/* this is redundant with an Assert in HeapTupleSetOid */
		Assert(tup->t_data->t_infomask & HEAP_HASOID);
#endif
Bruce Momjian's avatar
Bruce Momjian committed
1111

1112
		/*
1113 1114 1115 1116 1117 1118 1119
		 * If the object id of this tuple has already been assigned, trust
		 * the caller.	There are a couple of ways this can happen.  At
		 * initial db creation, the backend program sets oids for tuples.
		 * When we define an index, we set the oid.  Finally, in the
		 * future, we may allow users to set their own object ids in order
		 * to support a persistent object store (objects need to contain
		 * pointers to one another).
1120
		 */
1121 1122
		if (!OidIsValid(HeapTupleGetOid(tup)))
			HeapTupleSetOid(tup, newoid());
1123
		else
1124
			CheckMaxObjectId(HeapTupleGetOid(tup));
1125
	}
1126 1127 1128 1129 1130
	else
	{
		/* check there is not space for an OID */
		Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
	}
1131

1132
	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1133
	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1134
	HeapTupleHeaderSetXmin(tup->t_data, xid);
1135
	HeapTupleHeaderSetCmin(tup->t_data, cid);
1136 1137
	HeapTupleHeaderSetXmax(tup->t_data, 0);		/* zero out Datum fields */
	HeapTupleHeaderSetCmax(tup->t_data, 0);		/* for cleanliness */
1138
	tup->t_tableOid = relation->rd_id;
1139

1140 1141
	/*
	 * If the new tuple is too big for storage or contains already toasted
Bruce Momjian's avatar
Bruce Momjian committed
1142 1143
	 * out-of-line attributes from some other relation, invoke the
	 * toaster.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1144
	 */
1145
	if (HeapTupleHasExternal(tup) ||
1146
		(MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD))
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1147 1148
		heap_tuple_toast_attrs(relation, tup, NULL);

1149 1150
	/* Find buffer to insert this tuple into */
	buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1151

1152
	/* NO EREPORT(ERROR) from here till changes are logged */
1153
	START_CRIT_SECTION();
1154

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1155
	RelationPutHeapTuple(relation, buffer, tup);
1156

1157 1158
	pgstat_count_heap_insert(&relation->pgstat_info);

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1159
	/* XLOG stuff */
1160
	if (!relation->rd_istemp)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1161
	{
1162 1163 1164 1165 1166 1167
		xl_heap_insert xlrec;
		xl_heap_header xlhdr;
		XLogRecPtr	recptr;
		XLogRecData rdata[3];
		Page		page = BufferGetPage(buffer);
		uint8		info = XLOG_HEAP_INSERT;
1168

1169 1170
		xlrec.target.node = relation->rd_node;
		xlrec.target.tid = tup->t_self;
1171
		rdata[0].buffer = InvalidBuffer;
1172
		rdata[0].data = (char *) &xlrec;
1173 1174 1175 1176
		rdata[0].len = SizeOfHeapInsert;
		rdata[0].next = &(rdata[1]);

		xlhdr.t_natts = tup->t_data->t_natts;
1177
		xlhdr.t_infomask = tup->t_data->t_infomask;
1178
		xlhdr.t_hoff = tup->t_data->t_hoff;
Bruce Momjian's avatar
Bruce Momjian committed
1179

1180 1181 1182 1183 1184
		/*
		 * note we mark rdata[1] as belonging to buffer; if XLogInsert
		 * decides to write the whole page to the xlog, we don't need to
		 * store xl_heap_header in the xlog.
		 */
1185
		rdata[1].buffer = buffer;
1186
		rdata[1].data = (char *) &xlhdr;
1187 1188 1189 1190
		rdata[1].len = SizeOfHeapHeader;
		rdata[1].next = &(rdata[2]);

		rdata[2].buffer = buffer;
1191
		/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
1192
		rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits);
1193 1194 1195
		rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits);
		rdata[2].next = NULL;

1196
		/*
Bruce Momjian's avatar
Bruce Momjian committed
1197 1198 1199
		 * If this is the single and first tuple on page, we can reinit
		 * the page instead of restoring the whole thing.  Set flag, and
		 * hide buffer references from XLogInsert.
1200
		 */
1201 1202 1203 1204 1205 1206
		if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber &&
			PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
		{
			info |= XLOG_HEAP_INIT_PAGE;
			rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
		}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1207

1208 1209 1210
		recptr = XLogInsert(RM_HEAP_ID, info, rdata);

		PageSetLSN(page, recptr);
1211
		PageSetTLI(page, ThisTimeLineID);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1212
	}
1213 1214 1215 1216 1217 1218
	else
	{
		/* No XLOG record, but still need to flag that XID exists on disk */
		MyXactMadeTempRelUpdate = true;
	}

1219
	END_CRIT_SECTION();
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1220

Jan Wieck's avatar
TOAST  
Jan Wieck committed
1221
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1222
	WriteBuffer(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1223

1224
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1225 1226 1227 1228
	 * If tuple is cachable, mark it for invalidation from the caches in
	 * case we abort.  Note it is OK to do this after WriteBuffer releases
	 * the buffer, because the "tup" data structure is all in local
	 * memory, not in the shared buffer.
1229
	 */
1230
	CacheInvalidateHeapTuple(relation, tup);
1231

1232
	return HeapTupleGetOid(tup);
1233 1234
}

1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
/*
 *	simple_heap_insert - insert a tuple
 *
 * Currently, this routine differs from heap_insert only in supplying
 * a default command ID.  But it should be used rather than using
 * heap_insert directly in most places where we are modifying system catalogs.
 */
Oid
simple_heap_insert(Relation relation, HeapTuple tup)
{
	return heap_insert(relation, tup, GetCurrentCommandId());
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1248 1249
/*
 *	heap_delete		- delete a tuple
1250 1251 1252
 *
 * NB: do not call this directly unless you are prepared to deal with
 * concurrent-update conditions.  Use simple_heap_delete instead.
1253
 *
1254 1255 1256 1257
 *	relation - table to be modified
 *	tid - TID of tuple to be deleted
 *	ctid - output parameter, used only for failure case (see below)
 *	cid - delete command ID to use in verifying tuple visibility
1258
 *	crosscheck - if not InvalidSnapshot, also check tuple against this
1259 1260
 *	wait - true if should wait for any conflicting update to commit/abort
 *
1261 1262 1263
 * Normal, successful return value is HeapTupleMayBeUpdated, which
 * actually means we did delete it.  Failure return codes are
 * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1264 1265 1266
 * (the last only possible if wait == false).  On a failure return,
 * *ctid is set to the ctid link of the target tuple (possibly a later
 * version of the row).
1267
 */
1268
int
1269
heap_delete(Relation relation, ItemPointer tid,
1270 1271
			ItemPointer ctid, CommandId cid,
			Snapshot crosscheck, bool wait)
1272
{
1273
	TransactionId xid = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
1274 1275 1276 1277 1278
	ItemId		lp;
	HeapTupleData tp;
	PageHeader	dp;
	Buffer		buffer;
	int			result;
1279 1280 1281

	Assert(ItemPointerIsValid(tid));

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1282 1283
	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1284

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1285 1286
	dp = (PageHeader) BufferGetPage(buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1287
	tp.t_datamcxt = NULL;
1288 1289 1290
	tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tp.t_len = ItemIdGetLength(lp);
	tp.t_self = *tid;
1291
	tp.t_tableOid = relation->rd_id;
Bruce Momjian's avatar
Bruce Momjian committed
1292

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1293
l1:
1294
	result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
Bruce Momjian's avatar
Bruce Momjian committed
1295

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1296
	if (result == HeapTupleInvisible)
1297
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1298 1299
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
1300
		elog(ERROR, "attempted to delete invisible tuple");
1301
	}
1302
	else if (result == HeapTupleBeingUpdated && wait)
1303
	{
1304
		TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1305

Bruce Momjian's avatar
Bruce Momjian committed
1306
		/* sleep until concurrent transaction ends */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1307 1308 1309 1310
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1311
		if (!TransactionIdDidCommit(xwait))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1312
			goto l1;
1313 1314 1315 1316 1317

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1318
		 */
1319
		if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait))
1320
			goto l1;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
		if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
1332

1333
	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1334 1335
	{
		/* Perform additional check for serializable RI updates */
1336
		if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck, buffer))
1337 1338 1339
			result = HeapTupleUpdated;
	}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1340 1341
	if (result != HeapTupleMayBeUpdated)
	{
1342 1343 1344
		Assert(result == HeapTupleSelfUpdated ||
			   result == HeapTupleUpdated ||
			   result == HeapTupleBeingUpdated);
1345
		*ctid = tp.t_data->t_ctid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1346 1347 1348
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		return result;
1349 1350
	}

1351
	START_CRIT_SECTION();
1352

1353 1354
	/* store transaction information of xact deleting the tuple */
	tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1355 1356 1357
							   HEAP_XMAX_INVALID |
							   HEAP_MARKED_FOR_UPDATE |
							   HEAP_MOVED);
1358
	HeapTupleHeaderSetXmax(tp.t_data, xid);
1359
	HeapTupleHeaderSetCmax(tp.t_data, cid);
1360 1361
	/* Make sure there is no forward chain link in t_ctid */
	tp.t_data->t_ctid = tp.t_self;
1362

1363
	/* XLOG stuff */
1364
	if (!relation->rd_istemp)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1365
	{
1366 1367 1368
		xl_heap_delete xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
1369

1370 1371
		xlrec.target.node = relation->rd_node;
		xlrec.target.tid = tp.t_self;
1372
		rdata[0].buffer = InvalidBuffer;
1373
		rdata[0].data = (char *) &xlrec;
1374 1375 1376 1377 1378 1379 1380 1381 1382
		rdata[0].len = SizeOfHeapDelete;
		rdata[0].next = &(rdata[1]);

		rdata[1].buffer = buffer;
		rdata[1].data = NULL;
		rdata[1].len = 0;
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1383

1384
		PageSetLSN(dp, recptr);
1385
		PageSetTLI(dp, ThisTimeLineID);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1386
	}
1387 1388 1389 1390 1391 1392
	else
	{
		/* No XLOG record, but still need to flag that XID exists on disk */
		MyXactMadeTempRelUpdate = true;
	}

1393
	END_CRIT_SECTION();
1394

1395 1396
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

1397
	/*
1398
	 * If the tuple has toasted out-of-line attributes, we need to delete
Bruce Momjian's avatar
Bruce Momjian committed
1399 1400 1401
	 * those items too.  We have to do this before WriteBuffer because we
	 * need to look at the contents of the tuple, but it's OK to release
	 * the context lock on the buffer first.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1402
	 */
1403 1404
	if (HeapTupleHasExternal(&tp))
		heap_tuple_toast_attrs(relation, NULL, &tp);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1405

1406 1407
	pgstat_count_heap_delete(&relation->pgstat_info);

1408
	/*
1409 1410 1411 1412
	 * Mark tuple for invalidation from system caches at next command
	 * boundary. We have to do this before WriteBuffer because we need to
	 * look at the contents of the tuple, so we need to hold our refcount
	 * on the buffer.
1413
	 */
1414
	CacheInvalidateHeapTuple(relation, &tp);
1415

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1416
	WriteBuffer(buffer);
1417

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1418
	return HeapTupleMayBeUpdated;
1419 1420
}

1421 1422 1423 1424 1425
/*
 *	simple_heap_delete - delete a tuple
 *
 * This routine may be used to delete a tuple when concurrent updates of
 * the target tuple are not expected (for example, because we have a lock
1426
 * on the relation associated with the tuple).	Any failure is reported
1427
 * via ereport().
1428 1429 1430 1431 1432 1433 1434
 */
void
simple_heap_delete(Relation relation, ItemPointer tid)
{
	ItemPointerData ctid;
	int			result;

1435 1436
	result = heap_delete(relation, tid,
						 &ctid,
1437
						 GetCurrentCommandId(), InvalidSnapshot,
Bruce Momjian's avatar
Bruce Momjian committed
1438
						 true /* wait for commit */ );
1439 1440 1441 1442
	switch (result)
	{
		case HeapTupleSelfUpdated:
			/* Tuple was already updated in current command? */
1443
			elog(ERROR, "tuple already updated by self");
1444 1445 1446 1447 1448 1449 1450
			break;

		case HeapTupleMayBeUpdated:
			/* done successfully */
			break;

		case HeapTupleUpdated:
1451
			elog(ERROR, "tuple concurrently updated");
1452 1453 1454
			break;

		default:
1455
			elog(ERROR, "unrecognized heap_delete status: %u", result);
1456 1457 1458 1459
			break;
	}
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1460
/*
1461
 *	heap_update - replace a tuple
1462 1463 1464
 *
 * NB: do not call this directly unless you are prepared to deal with
 * concurrent-update conditions.  Use simple_heap_update instead.
1465
 *
1466 1467 1468 1469 1470
 *	relation - table to be modified
 *	otid - TID of old tuple to be replaced
 *	newtup - newly constructed tuple data to store
 *	ctid - output parameter, used only for failure case (see below)
 *	cid - update command ID to use in verifying old tuple visibility
1471
 *	crosscheck - if not InvalidSnapshot, also check old tuple against this
1472 1473
 *	wait - true if should wait for any conflicting update to commit/abort
 *
1474 1475 1476
 * Normal, successful return value is HeapTupleMayBeUpdated, which
 * actually means we *did* update it.  Failure return codes are
 * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1477 1478 1479 1480 1481
 * (the last only possible if wait == false).  On a failure return,
 * *ctid is set to the ctid link of the old tuple (possibly a later
 * version of the row).
 * On success, newtup->t_self is set to the TID where the new tuple
 * was inserted.
1482 1483
 */
int
1484
heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
1485 1486
			ItemPointer ctid, CommandId cid,
			Snapshot crosscheck, bool wait)
1487
{
1488
	TransactionId xid = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
1489 1490 1491
	ItemId		lp;
	HeapTupleData oldtup;
	PageHeader	dp;
1492 1493 1494 1495
	Buffer		buffer,
				newbuf;
	bool		need_toast,
				already_marked;
1496 1497
	Size		newtupsize,
				pagefree;
Bruce Momjian's avatar
Bruce Momjian committed
1498
	int			result;
1499

1500 1501 1502
	Assert(ItemPointerIsValid(otid));

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1503
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1504

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1505
	dp = (PageHeader) BufferGetPage(buffer);
1506 1507
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));

1508
	oldtup.t_datamcxt = NULL;
1509 1510 1511
	oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
	oldtup.t_len = ItemIdGetLength(lp);
	oldtup.t_self = *otid;
1512

1513 1514 1515 1516 1517 1518
	/*
	 * Note: beyond this point, use oldtup not otid to refer to old tuple.
	 * otid may very well point at newtup->t_self, which we will overwrite
	 * with the new tuple's location, so there's great risk of confusion
	 * if we use otid anymore.
	 */
1519

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1520
l2:
1521
	result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid, buffer);
Bruce Momjian's avatar
Bruce Momjian committed
1522

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1523
	if (result == HeapTupleInvisible)
1524
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1525
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1526
		ReleaseBuffer(buffer);
1527
		elog(ERROR, "attempted to update invisible tuple");
1528
	}
1529
	else if (result == HeapTupleBeingUpdated && wait)
1530
	{
1531
		TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1532

1533
		/* sleep until concurrent transaction ends */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1534 1535 1536 1537
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1538
		if (!TransactionIdDidCommit(xwait))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1539
			goto l2;
1540 1541 1542 1543 1544

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1545
		 */
1546
		if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait))
1547
			goto l2;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558
		if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
1559

1560
	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1561 1562
	{
		/* Perform additional check for serializable RI updates */
1563
		if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck, buffer))
1564 1565 1566
			result = HeapTupleUpdated;
	}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1567 1568
	if (result != HeapTupleMayBeUpdated)
	{
1569 1570 1571
		Assert(result == HeapTupleSelfUpdated ||
			   result == HeapTupleUpdated ||
			   result == HeapTupleBeingUpdated);
1572
		*ctid = oldtup.t_data->t_ctid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1573
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1574
		ReleaseBuffer(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1575
		return result;
1576 1577
	}

1578
	/* Fill in OID and transaction status data for newtup */
1579 1580
	if (relation->rd_rel->relhasoids)
	{
1581 1582 1583 1584
#ifdef NOT_USED
		/* this is redundant with an Assert in HeapTupleSetOid */
		Assert(newtup->t_data->t_infomask & HEAP_HASOID);
#endif
1585 1586
		HeapTupleSetOid(newtup, HeapTupleGetOid(&oldtup));
	}
1587 1588 1589 1590 1591 1592
	else
	{
		/* check there is not space for an OID */
		Assert(!(newtup->t_data->t_infomask & HEAP_HASOID));
	}

1593
	newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1594
	newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1595
	HeapTupleHeaderSetXmin(newtup->t_data, xid);
1596
	HeapTupleHeaderSetCmin(newtup->t_data, cid);
1597 1598
	HeapTupleHeaderSetXmax(newtup->t_data, 0);	/* zero out Datum fields */
	HeapTupleHeaderSetCmax(newtup->t_data, 0);	/* for cleanliness */
1599

1600 1601
	/*
	 * If the toaster needs to be activated, OR if the new tuple will not
1602 1603 1604 1605 1606 1607 1608 1609 1610
	 * fit on the same page as the old, then we need to release the
	 * context lock (but not the pin!) on the old tuple's buffer while we
	 * are off doing TOAST and/or table-file-extension work.  We must mark
	 * the old tuple to show that it's already being updated, else other
	 * processes may try to update it themselves. To avoid second XLOG log
	 * record, we use xact mgr hook to unlock old tuple without reading
	 * log if xact will abort before update is logged. In the event of
	 * crash prio logging, TQUAL routines will see HEAP_XMAX_UNLOGGED
	 * flag...
1611
	 *
1612 1613 1614
	 * NOTE: this trick is useless currently but saved for future when we'll
	 * implement UNDO and will re-use transaction IDs after postmaster
	 * startup.
1615
	 *
1616 1617
	 * We need to invoke the toaster if there are already any out-of-line
	 * toasted values present, or if the new tuple is over-threshold.
1618
	 */
1619 1620
	need_toast = (HeapTupleHasExternal(&oldtup) ||
				  HeapTupleHasExternal(newtup) ||
1621
				  (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
1622

1623 1624 1625 1626
	newtupsize = MAXALIGN(newtup->t_len);
	pagefree = PageGetFreeSpace((Page) dp);

	if (need_toast || newtupsize > pagefree)
1627
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1628
		_locked_tuple_.node = relation->rd_node;
1629
		_locked_tuple_.tid = oldtup.t_self;
1630
		XactPushRollback(_heap_unlock_tuple, (void *) &_locked_tuple_);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1631

1632
		oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1633
									   HEAP_XMAX_INVALID |
1634 1635
									   HEAP_MARKED_FOR_UPDATE |
									   HEAP_MOVED);
1636
		oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED;
1637
		HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1638
		HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1639
		already_marked = true;
1640
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1641 1642 1643

		/* Let the toaster do its thing */
		if (need_toast)
1644
		{
1645
			heap_tuple_toast_attrs(relation, newtup, &oldtup);
1646 1647
			newtupsize = MAXALIGN(newtup->t_len);
		}
1648

1649
		/*
1650 1651 1652 1653 1654 1655
		 * Now, do we need a new page for the tuple, or not?  This is a
		 * bit tricky since someone else could have added tuples to the
		 * page while we weren't looking.  We have to recheck the
		 * available space after reacquiring the buffer lock.  But don't
		 * bother to do that if the former amount of free space is still
		 * not enough; it's unlikely there's more free now than before.
1656 1657
		 *
		 * What's more, if we need to get a new page, we will need to acquire
1658 1659 1660 1661 1662 1663 1664 1665
		 * buffer locks on both old and new pages.	To avoid deadlock
		 * against some other backend trying to get the same two locks in
		 * the other order, we must be consistent about the order we get
		 * the locks in. We use the rule "lock the lower-numbered page of
		 * the relation first".  To implement this, we must do
		 * RelationGetBufferForTuple while not holding the lock on the old
		 * page, and we must rely on it to get the locks on both pages in
		 * the correct order.
1666 1667 1668 1669 1670
		 */
		if (newtupsize > pagefree)
		{
			/* Assume there's no chance to put newtup on same page. */
			newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1671
											   buffer);
1672
		}
1673
		else
1674 1675 1676 1677 1678 1679 1680 1681 1682
		{
			/* Re-acquire the lock on the old tuple's page. */
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
			/* Re-check using the up-to-date free space */
			pagefree = PageGetFreeSpace((Page) dp);
			if (newtupsize > pagefree)
			{
				/*
				 * Rats, it doesn't fit anymore.  We must now unlock and
1683 1684
				 * relock to avoid deadlock.  Fortunately, this path
				 * should seldom be taken.
1685 1686 1687
				 */
				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
				newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1688
												   buffer);
1689 1690 1691 1692 1693 1694 1695
			}
			else
			{
				/* OK, it fits here, so we're done. */
				newbuf = buffer;
			}
		}
1696
	}
1697 1698 1699 1700 1701 1702
	else
	{
		/* No TOAST work needed, and it'll fit on same page */
		already_marked = false;
		newbuf = buffer;
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1703

1704 1705
	pgstat_count_heap_update(&relation->pgstat_info);

1706
	/*
1707 1708 1709
	 * At this point newbuf and buffer are both pinned and locked, and
	 * newbuf has enough space for the new tuple.  If they are the same
	 * buffer, only one pin is held.
1710 1711
	 */

1712
	/* NO EREPORT(ERROR) from here till changes are logged */
1713
	START_CRIT_SECTION();
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1714

1715
	RelationPutHeapTuple(relation, newbuf, newtup);		/* insert new tuple */
1716 1717

	if (already_marked)
1718
	{
1719 1720
		oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED;
		XactPopRollback();
1721 1722 1723
	}
	else
	{
1724 1725
		oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
									   HEAP_XMAX_INVALID |
1726 1727
									   HEAP_MARKED_FOR_UPDATE |
									   HEAP_MOVED);
1728
		HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1729
		HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1730
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1731

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1732 1733 1734 1735
	/* record address of new tuple in t_ctid of old one */
	oldtup.t_data->t_ctid = newtup->t_self;

	/* XLOG stuff */
1736
	if (!relation->rd_istemp)
1737
	{
1738
		XLogRecPtr	recptr = log_heap_update(relation, buffer, oldtup.t_self,
1739
											 newbuf, newtup, false);
Bruce Momjian's avatar
Bruce Momjian committed
1740

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1741 1742
		if (newbuf != buffer)
		{
1743
			PageSetLSN(BufferGetPage(newbuf), recptr);
1744
			PageSetTLI(BufferGetPage(newbuf), ThisTimeLineID);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1745
		}
1746
		PageSetLSN(BufferGetPage(buffer), recptr);
1747
		PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
1748
	}
1749 1750 1751 1752 1753
	else
	{
		/* No XLOG record, but still need to flag that XID exists on disk */
		MyXactMadeTempRelUpdate = true;
	}
1754

1755
	END_CRIT_SECTION();
1756

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1757 1758
	if (newbuf != buffer)
		LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1759 1760
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

1761 1762 1763 1764 1765
	/*
	 * Mark old tuple for invalidation from system caches at next command
	 * boundary. We have to do this before WriteBuffer because we need to
	 * look at the contents of the tuple, so we need to hold our refcount.
	 */
1766
	CacheInvalidateHeapTuple(relation, &oldtup);
1767 1768 1769 1770 1771 1772

	if (newbuf != buffer)
		WriteBuffer(newbuf);
	WriteBuffer(buffer);

	/*
Bruce Momjian's avatar
Bruce Momjian committed
1773 1774 1775 1776
	 * If new tuple is cachable, mark it for invalidation from the caches
	 * in case we abort.  Note it is OK to do this after WriteBuffer
	 * releases the buffer, because the "newtup" data structure is all in
	 * local memory, not in the shared buffer.
1777
	 */
1778
	CacheInvalidateHeapTuple(relation, newtup);
1779

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1780 1781 1782
	return HeapTupleMayBeUpdated;
}

1783 1784 1785 1786 1787
/*
 *	simple_heap_update - replace a tuple
 *
 * This routine may be used to update a tuple when concurrent updates of
 * the target tuple are not expected (for example, because we have a lock
1788
 * on the relation associated with the tuple).	Any failure is reported
1789
 * via ereport().
1790 1791 1792 1793 1794 1795 1796
 */
void
simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
{
	ItemPointerData ctid;
	int			result;

1797 1798
	result = heap_update(relation, otid, tup,
						 &ctid,
1799
						 GetCurrentCommandId(), InvalidSnapshot,
Bruce Momjian's avatar
Bruce Momjian committed
1800
						 true /* wait for commit */ );
1801 1802 1803 1804
	switch (result)
	{
		case HeapTupleSelfUpdated:
			/* Tuple was already updated in current command? */
1805
			elog(ERROR, "tuple already updated by self");
1806 1807 1808 1809 1810 1811 1812
			break;

		case HeapTupleMayBeUpdated:
			/* done successfully */
			break;

		case HeapTupleUpdated:
1813
			elog(ERROR, "tuple concurrently updated");
1814 1815 1816
			break;

		default:
1817
			elog(ERROR, "unrecognized heap_update status: %u", result);
1818 1819 1820 1821
			break;
	}
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1822 1823 1824 1825
/*
 *	heap_mark4update		- mark a tuple for update
 */
int
1826 1827
heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
				 CommandId cid)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1828
{
1829
	TransactionId xid = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
1830 1831 1832 1833
	ItemPointer tid = &(tuple->t_self);
	ItemId		lp;
	PageHeader	dp;
	int			result;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1834 1835 1836 1837 1838 1839

	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);

	dp = (PageHeader) BufferGetPage(*buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1840
	tuple->t_datamcxt = NULL;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1841 1842
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);
Bruce Momjian's avatar
Bruce Momjian committed
1843

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1844
l3:
1845
	result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
Bruce Momjian's avatar
Bruce Momjian committed
1846

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1847 1848 1849 1850
	if (result == HeapTupleInvisible)
	{
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(*buffer);
1851
		elog(ERROR, "attempted to mark4update invisible tuple");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1852 1853 1854
	}
	else if (result == HeapTupleBeingUpdated)
	{
1855
		TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1856

1857
		/* sleep until concurrent transaction ends */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1858 1859 1860 1861
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1862
		if (!TransactionIdDidCommit(xwait))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1863
			goto l3;
1864 1865 1866 1867 1868

		/*
		 * xwait is committed but if xwait had just marked the tuple for
		 * update then some other xaction could update this tuple before
		 * we got to this point.
1869
		 */
1870
		if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait))
1871
			goto l3;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885
		if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(*buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1886
		tuple->t_self = tuple->t_data->t_ctid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1887 1888 1889 1890
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		return result;
	}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1891 1892 1893 1894
	/*
	 * XLOG stuff: no logging is required as long as we have no
	 * savepoints. For savepoints private log could be used...
	 */
1895
	PageSetTLI(BufferGetPage(*buffer), ThisTimeLineID);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1896

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1897
	/* store transaction information of xact marking the tuple */
1898 1899 1900
	tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
								   HEAP_XMAX_INVALID |
								   HEAP_MOVED);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1901
	tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
1902
	HeapTupleHeaderSetXmax(tuple->t_data, xid);
1903
	HeapTupleHeaderSetCmax(tuple->t_data, cid);
1904 1905
	/* Make sure there is no forward chain link in t_ctid */
	tuple->t_data->t_ctid = *tid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1906 1907 1908 1909

	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);

	WriteNoReleaseBuffer(*buffer);
1910

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1911
	return HeapTupleMayBeUpdated;
1912 1913 1914
}

/* ----------------
1915
 *		heap_markpos	- mark scan position
1916
 *
1917 1918 1919 1920 1921 1922
 *		Note:
 *				Should only one mark be maintained per scan at one time.
 *		Check if this can be done generally--say calls to get the
 *		next/previous tuple and NEVER pass struct scandesc to the
 *		user AM's.  Now, the mark is sent to the executor for safekeeping.
 *		Probably can store this info into a GENERAL scan structure.
1923
 *
1924 1925 1926
 *		May be best to change this call to store the marked position
 *		(up to 2?) in the scan structure itself.
 *		Fix to use the proper caching structure.
1927 1928 1929
 * ----------------
 */
void
1930
heap_markpos(HeapScanDesc scan)
1931
{
1932 1933
	/* Note: no locking manipulations needed */

1934 1935
	if (scan->rs_ctup.t_data != NULL)
		scan->rs_mctid = scan->rs_ctup.t_self;
1936
	else
1937
		ItemPointerSetInvalid(&scan->rs_mctid);
1938 1939 1940
}

/* ----------------
1941
 *		heap_restrpos	- restore position to marked location
1942
 *
1943 1944 1945 1946 1947
 *		Note:  there are bad side effects here.  If we were past the end
 *		of a relation when heapmarkpos is called, then if the relation is
 *		extended via insert, then the next call to heaprestrpos will set
 *		cause the added tuples to be visible when the scan continues.
 *		Problems also arise if the TID's are rearranged!!!
1948 1949
 *
 * XXX	might be better to do direct access instead of
1950
 *		using the generality of heapgettup().
1951 1952 1953 1954 1955 1956 1957
 *
 * XXX It is very possible that when a scan is restored, that a tuple
 * XXX which previously qualified may fail for time range purposes, unless
 * XXX some form of locking exists (ie., portals currently can act funny.
 * ----------------
 */
void
1958
heap_restrpos(HeapScanDesc scan)
1959
{
1960 1961 1962 1963
	/* XXX no amrestrpos checking that ammarkpos called */

	/* Note: no locking manipulations needed */

1964 1965 1966 1967 1968
	/*
	 * unpin scan buffers
	 */
	if (BufferIsValid(scan->rs_cbuf))
		ReleaseBuffer(scan->rs_cbuf);
1969
	scan->rs_cbuf = InvalidBuffer;
1970

1971
	if (!ItemPointerIsValid(&scan->rs_mctid))
1972 1973
	{
		scan->rs_ctup.t_datamcxt = NULL;
1974
		scan->rs_ctup.t_data = NULL;
1975
	}
1976 1977
	else
	{
1978
		scan->rs_ctup.t_self = scan->rs_mctid;
1979
		scan->rs_ctup.t_datamcxt = NULL;
1980 1981 1982
		scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   0,
1983
				   &(scan->rs_ctup),
1984
				   &(scan->rs_cbuf),
1985
				   scan->rs_snapshot,
1986
				   0,
1987 1988
				   NULL,
				   scan->rs_nblocks);
1989
	}
1990
}
1991

1992
XLogRecPtr
1993
log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
1994
{
1995 1996
	xl_heap_clean xlrec;
	XLogRecPtr	recptr;
1997
	XLogRecData rdata[2];
1998

1999 2000 2001
	/* Caller should not call me on a temp relation */
	Assert(!reln->rd_istemp);

2002 2003
	xlrec.node = reln->rd_node;
	xlrec.block = BufferGetBlockNumber(buffer);
2004

2005
	rdata[0].buffer = InvalidBuffer;
2006
	rdata[0].data = (char *) &xlrec;
2007 2008
	rdata[0].len = SizeOfHeapClean;
	rdata[0].next = &(rdata[1]);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2009

2010 2011
	/*
	 * The unused-offsets array is not actually in the buffer, but pretend
Bruce Momjian's avatar
Bruce Momjian committed
2012
	 * that it is.	When XLogInsert stores the whole buffer, the offsets
2013 2014 2015 2016
	 * array need not be stored too.
	 */
	rdata[1].buffer = buffer;
	if (uncnt > 0)
2017
	{
2018 2019
		rdata[1].data = (char *) unused;
		rdata[1].len = uncnt * sizeof(OffsetNumber);
2020 2021
	}
	else
2022 2023 2024 2025 2026
	{
		rdata[1].data = NULL;
		rdata[1].len = 0;
	}
	rdata[1].next = NULL;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2027

2028 2029
	recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);

2030
	return (recptr);
2031 2032 2033
}

static XLogRecPtr
2034
log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
2035 2036
				Buffer newbuf, HeapTuple newtup, bool move)
{
2037 2038 2039 2040
	/*
	 * Note: xlhdr is declared to have adequate size and correct alignment
	 * for an xl_heap_header.  However the two tids, if present at all,
	 * will be packed in with no wasted space after the xl_heap_header;
2041 2042
	 * they aren't necessarily aligned as implied by this struct
	 * declaration.
2043
	 */
2044 2045 2046 2047 2048
	struct
	{
		xl_heap_header hdr;
		TransactionId tid1;
		TransactionId tid2;
2049
	}			xlhdr;
2050
	int			hsize = SizeOfHeapHeader;
2051
	xl_heap_update xlrec;
2052 2053 2054 2055
	XLogRecPtr	recptr;
	XLogRecData rdata[4];
	Page		page = BufferGetPage(newbuf);
	uint8		info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
2056

2057 2058 2059
	/* Caller should not call me on a temp relation */
	Assert(!reln->rd_istemp);

2060 2061 2062 2063
	xlrec.target.node = reln->rd_node;
	xlrec.target.tid = from;
	xlrec.newtid = newtup->t_self;
	rdata[0].buffer = InvalidBuffer;
2064
	rdata[0].data = (char *) &xlrec;
2065 2066 2067 2068 2069 2070 2071 2072
	rdata[0].len = SizeOfHeapUpdate;
	rdata[0].next = &(rdata[1]);

	rdata[1].buffer = oldbuf;
	rdata[1].data = NULL;
	rdata[1].len = 0;
	rdata[1].next = &(rdata[2]);

2073
	xlhdr.hdr.t_natts = newtup->t_data->t_natts;
2074
	xlhdr.hdr.t_infomask = newtup->t_data->t_infomask;
2075
	xlhdr.hdr.t_hoff = newtup->t_data->t_hoff;
2076
	if (move)					/* remember xmax & xmin */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2077
	{
Bruce Momjian's avatar
Bruce Momjian committed
2078
		TransactionId xid[2];	/* xmax, xmin */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2079

2080
		if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID |
Bruce Momjian's avatar
Bruce Momjian committed
2081
										  HEAP_MARKED_FOR_UPDATE))
2082
			xid[0] = InvalidTransactionId;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2083
		else
2084 2085 2086
			xid[0] = HeapTupleHeaderGetXmax(newtup->t_data);
		xid[1] = HeapTupleHeaderGetXmin(newtup->t_data);
		memcpy((char *) &xlhdr + hsize,
Bruce Momjian's avatar
Bruce Momjian committed
2087
			   (char *) xid,
2088
			   2 * sizeof(TransactionId));
2089
		hsize += 2 * sizeof(TransactionId);
2090
	}
Bruce Momjian's avatar
Bruce Momjian committed
2091

2092
	/*
Bruce Momjian's avatar
Bruce Momjian committed
2093 2094
	 * As with insert records, we need not store the rdata[2] segment if
	 * we decide to store the whole buffer instead.
2095
	 */
2096
	rdata[2].buffer = newbuf;
2097
	rdata[2].data = (char *) &xlhdr;
2098 2099 2100 2101
	rdata[2].len = hsize;
	rdata[2].next = &(rdata[3]);

	rdata[3].buffer = newbuf;
2102
	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
2103
	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
2104 2105 2106 2107 2108 2109 2110 2111 2112
	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
	rdata[3].next = NULL;

	/* If new tuple is the single and first tuple on page... */
	if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
		PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
	{
		info |= XLOG_HEAP_INIT_PAGE;
		rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2113
	}
2114 2115

	recptr = XLogInsert(RM_HEAP_ID, info, rdata);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2116

2117
	return (recptr);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2118 2119 2120
}

XLogRecPtr
2121 2122
log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
			  Buffer newbuf, HeapTuple newtup)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2123
{
2124
	return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2125 2126
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2127
static void
2128
heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2129
{
2130 2131 2132 2133
	xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
	Relation	reln;
	Buffer		buffer;
	Page		page;
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143

	if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
		return;

	reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;

	buffer = XLogReadBuffer(false, reln, xlrec->block);
	if (!BufferIsValid(buffer))
2144
		elog(PANIC, "heap_clean_redo: no block");
2145 2146 2147

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
2148
		elog(PANIC, "heap_clean_redo: uninitialized page");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2149

2150 2151
	if (XLByteLE(lsn, PageGetLSN(page)))
	{
2152 2153
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
2154 2155 2156
		return;
	}

2157
	if (record->xl_len > SizeOfHeapClean)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2158
	{
2159 2160
		OffsetNumber *unused;
		OffsetNumber *unend;
2161
		ItemId		lp;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2162

2163 2164
		unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
		unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2165

2166
		while (unused < unend)
2167
		{
2168
			lp = PageGetItemId(page, *unused + 1);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2169
			lp->lp_flags &= ~LP_USED;
2170 2171
			unused++;
		}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2172 2173
	}

2174
	PageRepairFragmentation(page, NULL);
2175 2176

	PageSetLSN(page, lsn);
2177
	PageSetTLI(page, ThisTimeLineID);
2178 2179
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
	WriteBuffer(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2180 2181
}

2182 2183 2184 2185 2186 2187 2188 2189 2190
static void
heap_xlog_newpage(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
	xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
	Relation	reln;
	Buffer		buffer;
	Page		page;

	/*
Bruce Momjian's avatar
Bruce Momjian committed
2191 2192
	 * Note: the NEWPAGE log record is used for both heaps and indexes, so
	 * do not do anything that assumes we are touching a heap.
2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
	 */

	if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
		return;

	reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;
	buffer = XLogReadBuffer(true, reln, xlrec->blkno);
	if (!BufferIsValid(buffer))
		elog(PANIC, "heap_newpage_redo: no block");
	page = (Page) BufferGetPage(buffer);

	Assert(record->xl_len == SizeOfHeapNewpage + BLCKSZ);
	memcpy(page, (char *) xlrec + SizeOfHeapNewpage, BLCKSZ);

	PageSetLSN(page, lsn);
2210
	PageSetTLI(page, ThisTimeLineID);
2211 2212 2213 2214
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
	WriteBuffer(buffer);
}

2215 2216
static void
heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
2217
{
2218
	xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
2219
	Relation	reln;
2220 2221 2222 2223 2224
	Buffer		buffer;
	Page		page;
	OffsetNumber offnum;
	ItemId		lp = NULL;
	HeapTupleHeader htup;
2225

2226 2227 2228
	if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
		return;

2229 2230
	reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);

2231 2232
	if (!RelationIsValid(reln))
		return;
2233

2234
	buffer = XLogReadBuffer(false, reln,
2235 2236
						ItemPointerGetBlockNumber(&(xlrec->target.tid)));
	if (!BufferIsValid(buffer))
2237
		elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un");
2238

2239
	page = (Page) BufferGetPage(buffer);
2240
	if (PageIsNew((PageHeader) page))
2241
		elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un");
2242 2243 2244 2245 2246

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
		{
2247 2248
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
			ReleaseBuffer(buffer);
2249 2250 2251
			return;
		}
	}
2252 2253
	else if (XLByteLT(PageGetLSN(page), lsn))	/* changes are not applied
												 * ?! */
2254
		elog(PANIC, "heap_delete_undo: bad page LSN");
2255

2256
	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2257 2258
	if (PageGetMaxOffsetNumber(page) >= offnum)
		lp = PageGetItemId(page, offnum);
2259

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2260
	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2261
		elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2262

2263
	htup = (HeapTupleHeader) PageGetItem(page, lp);
2264 2265 2266

	if (redo)
	{
2267 2268 2269 2270
		htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
							  HEAP_XMAX_INVALID |
							  HEAP_MARKED_FOR_UPDATE |
							  HEAP_MOVED);
2271 2272
		HeapTupleHeaderSetXmax(htup, record->xl_xid);
		HeapTupleHeaderSetCmax(htup, FirstCommandId);
2273 2274
		/* Make sure there is no forward chain link in t_ctid */
		htup->t_ctid = xlrec->target.tid;
2275
		PageSetLSN(page, lsn);
2276
		PageSetTLI(page, ThisTimeLineID);
2277 2278
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		WriteBuffer(buffer);
2279 2280 2281
		return;
	}

2282
	elog(PANIC, "heap_delete_undo: unimplemented");
2283 2284
}

2285 2286
static void
heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
2287
{
2288
	xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
2289
	Relation	reln;
2290 2291 2292
	Buffer		buffer;
	Page		page;
	OffsetNumber offnum;
2293

2294 2295 2296
	if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
		return;

2297 2298
	reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);

2299 2300
	if (!RelationIsValid(reln))
		return;
2301

2302
	buffer = XLogReadBuffer((redo) ? true : false, reln,
2303 2304 2305 2306
						ItemPointerGetBlockNumber(&(xlrec->target.tid)));
	if (!BufferIsValid(buffer))
		return;

2307
	page = (Page) BufferGetPage(buffer);
2308 2309
	if (PageIsNew((PageHeader) page) &&
		(!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2310
		elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2311 2312 2313

	if (redo)
	{
2314 2315
		struct
		{
2316
			HeapTupleHeaderData hdr;
2317
			char		data[MaxTupleSize];
2318 2319
		}			tbuf;
		HeapTupleHeader htup;
2320 2321
		xl_heap_header xlhdr;
		uint32		newlen;
2322 2323 2324

		if (record->xl_info & XLOG_HEAP_INIT_PAGE)
			PageInit(page, BufferGetPageSize(buffer), 0);
2325

2326 2327
		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
		{
2328 2329
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
			ReleaseBuffer(buffer);
2330 2331 2332
			return;
		}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2333 2334
		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2335
			elog(PANIC, "heap_insert_redo: invalid max offset number");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2336

2337
		newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
2338 2339 2340 2341
		Assert(newlen <= MaxTupleSize);
		memcpy((char *) &xlhdr,
			   (char *) xlrec + SizeOfHeapInsert,
			   SizeOfHeapHeader);
2342 2343 2344
		htup = &tbuf.hdr;
		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2345
		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2346 2347
			   (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
			   newlen);
2348
		newlen += offsetof(HeapTupleHeaderData, t_bits);
2349
		htup->t_natts = xlhdr.t_natts;
2350
		htup->t_infomask = xlhdr.t_infomask;
2351
		htup->t_hoff = xlhdr.t_hoff;
2352 2353
		HeapTupleHeaderSetXmin(htup, record->xl_xid);
		HeapTupleHeaderSetCmin(htup, FirstCommandId);
2354
		htup->t_ctid = xlrec->target.tid;
2355 2356 2357

		offnum = PageAddItem(page, (Item) htup, newlen, offnum,
							 LP_USED | OverwritePageMode);
2358
		if (offnum == InvalidOffsetNumber)
2359
			elog(PANIC, "heap_insert_redo: failed to add tuple");
2360
		PageSetLSN(page, lsn);
2361
		PageSetTLI(page, ThisTimeLineID);
2362 2363
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		WriteBuffer(buffer);
2364 2365 2366 2367
		return;
	}

	/* undo insert */
2368 2369
	if (XLByteLT(PageGetLSN(page), lsn))		/* changes are not applied
												 * ?! */
2370
		elog(PANIC, "heap_insert_undo: bad page LSN");
2371

2372
	elog(PANIC, "heap_insert_undo: unimplemented");
2373 2374
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2375 2376 2377
/*
 * Handles UPDATE & MOVE
 */
2378
static void
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2379
heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
2380
{
2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
	Relation	reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
	Buffer		buffer;
	bool		samepage =
	(ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
	 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
	Page		page;
	OffsetNumber offnum;
	ItemId		lp = NULL;
	HeapTupleHeader htup;
2391

2392 2393 2394
	if (!RelationIsValid(reln))
		return;

2395 2396 2397
	if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
		goto newt;

2398 2399
	/* Deal with old tuple version */

2400
	buffer = XLogReadBuffer(false, reln,
2401
						ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2402
	if (!BufferIsValid(buffer))
2403
		elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un");
2404 2405 2406

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
2407
		elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un");
2408 2409 2410 2411 2412

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
		{
2413 2414
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
			ReleaseBuffer(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2415 2416
			if (samepage)
				return;
2417 2418 2419
			goto newt;
		}
	}
2420 2421
	else if (XLByteLT(PageGetLSN(page), lsn))	/* changes are not applied
												 * ?! */
2422
		elog(PANIC, "heap_update_undo: bad old tuple page LSN");
2423 2424

	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2425 2426
	if (PageGetMaxOffsetNumber(page) >= offnum)
		lp = PageGetItemId(page, offnum);
2427

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2428
	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2429
		elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un");
2430

2431 2432 2433 2434
	htup = (HeapTupleHeader) PageGetItem(page, lp);

	if (redo)
	{
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2435 2436
		if (move)
		{
2437 2438 2439
			htup->t_infomask &= ~(HEAP_XMIN_COMMITTED |
								  HEAP_XMIN_INVALID |
								  HEAP_MOVED_IN);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2440
			htup->t_infomask |= HEAP_MOVED_OFF;
2441
			HeapTupleHeaderSetXvac(htup, record->xl_xid);
2442 2443
			/* Make sure there is no forward chain link in t_ctid */
			htup->t_ctid = xlrec->target.tid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2444 2445 2446
		}
		else
		{
2447 2448 2449 2450
			htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
								  HEAP_XMAX_INVALID |
								  HEAP_MARKED_FOR_UPDATE |
								  HEAP_MOVED);
2451 2452
			HeapTupleHeaderSetXmax(htup, record->xl_xid);
			HeapTupleHeaderSetCmax(htup, FirstCommandId);
2453 2454
			/* Set forward chain link in t_ctid */
			htup->t_ctid = xlrec->newtid;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2455
		}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2456 2457
		if (samepage)
			goto newsame;
2458
		PageSetLSN(page, lsn);
2459
		PageSetTLI(page, ThisTimeLineID);
2460 2461
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		WriteBuffer(buffer);
2462 2463 2464
		goto newt;
	}

2465
	elog(PANIC, "heap_update_undo: unimplemented");
2466 2467 2468 2469 2470

	/* Deal with new tuple */

newt:;

2471 2472
	if (redo &&
		((record->xl_info & XLR_BKP_BLOCK_2) ||
2473
		 ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)))
2474 2475
		return;

2476 2477
	buffer = XLogReadBuffer((redo) ? true : false, reln,
							ItemPointerGetBlockNumber(&(xlrec->newtid)));
2478 2479 2480 2481
	if (!BufferIsValid(buffer))
		return;

	page = (Page) BufferGetPage(buffer);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2482 2483

newsame:;
2484 2485
	if (PageIsNew((PageHeader) page) &&
		(!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2486
		elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un");
2487 2488 2489

	if (redo)
	{
2490 2491
		struct
		{
2492
			HeapTupleHeaderData hdr;
2493
			char		data[MaxTupleSize];
2494
		}			tbuf;
2495 2496 2497
		xl_heap_header xlhdr;
		int			hsize;
		uint32		newlen;
2498 2499 2500

		if (record->xl_info & XLOG_HEAP_INIT_PAGE)
			PageInit(page, BufferGetPageSize(buffer), 0);
2501

2502 2503
		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
		{
2504 2505
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
			ReleaseBuffer(buffer);
2506 2507 2508
			return;
		}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2509 2510
		offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2511
			elog(PANIC, "heap_update_redo: invalid max offset number");
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2512

2513
		hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2514
		if (move)
2515
			hsize += (2 * sizeof(TransactionId));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2516

2517
		newlen = record->xl_len - hsize;
2518 2519 2520 2521
		Assert(newlen <= MaxTupleSize);
		memcpy((char *) &xlhdr,
			   (char *) xlrec + SizeOfHeapUpdate,
			   SizeOfHeapHeader);
2522 2523 2524
		htup = &tbuf.hdr;
		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2525
		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2526 2527
			   (char *) xlrec + hsize,
			   newlen);
2528
		newlen += offsetof(HeapTupleHeaderData, t_bits);
2529
		htup->t_natts = xlhdr.t_natts;
2530
		htup->t_infomask = xlhdr.t_infomask;
2531
		htup->t_hoff = xlhdr.t_hoff;
2532

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2533 2534
		if (move)
		{
Bruce Momjian's avatar
Bruce Momjian committed
2535 2536
			TransactionId xid[2];		/* xmax, xmin */

2537
			memcpy((char *) xid,
2538 2539
				   (char *) xlrec + SizeOfHeapUpdate + SizeOfHeapHeader,
				   2 * sizeof(TransactionId));
2540 2541
			HeapTupleHeaderSetXmin(htup, xid[1]);
			HeapTupleHeaderSetXmax(htup, xid[0]);
2542
			HeapTupleHeaderSetXvac(htup, record->xl_xid);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2543 2544 2545
		}
		else
		{
2546 2547
			HeapTupleHeaderSetXmin(htup, record->xl_xid);
			HeapTupleHeaderSetCmin(htup, FirstCommandId);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2548
		}
2549 2550
		/* Make sure there is no forward chain link in t_ctid */
		htup->t_ctid = xlrec->newtid;
2551 2552 2553

		offnum = PageAddItem(page, (Item) htup, newlen, offnum,
							 LP_USED | OverwritePageMode);
2554
		if (offnum == InvalidOffsetNumber)
2555
			elog(PANIC, "heap_update_redo: failed to add tuple");
2556
		PageSetLSN(page, lsn);
2557
		PageSetTLI(page, ThisTimeLineID);
2558 2559
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		WriteBuffer(buffer);
2560 2561 2562 2563
		return;
	}

	/* undo */
2564
	if (XLByteLT(PageGetLSN(page), lsn))		/* changes not applied?! */
2565
		elog(PANIC, "heap_update_undo: bad new tuple page LSN");
2566

2567
	elog(PANIC, "heap_update_undo: unimplemented");
2568

2569 2570 2571 2572 2573
}

static void
_heap_unlock_tuple(void *data)
{
2574
	TransactionId xid = GetCurrentTransactionId();
2575 2576 2577 2578 2579 2580 2581
	xl_heaptid *xltid = (xl_heaptid *) data;
	Relation	reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node);
	Buffer		buffer;
	Page		page;
	OffsetNumber offnum;
	ItemId		lp;
	HeapTupleHeader htup;
2582 2583

	if (!RelationIsValid(reln))
2584
		elog(PANIC, "_heap_unlock_tuple: can't open relation");
2585

2586 2587
	buffer = XLogReadBuffer(false, reln,
							ItemPointerGetBlockNumber(&(xltid->tid)));
2588
	if (!BufferIsValid(buffer))
2589
		elog(PANIC, "_heap_unlock_tuple: can't read buffer");
2590 2591 2592

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
2593
		elog(PANIC, "_heap_unlock_tuple: uninitialized page");
2594 2595 2596

	offnum = ItemPointerGetOffsetNumber(&(xltid->tid));
	if (offnum > PageGetMaxOffsetNumber(page))
2597
		elog(PANIC, "_heap_unlock_tuple: invalid itemid");
2598 2599 2600
	lp = PageGetItemId(page, offnum);

	if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
2601
		elog(PANIC, "_heap_unlock_tuple: unused/deleted tuple in rollback");
2602 2603

	htup = (HeapTupleHeader) PageGetItem(page, lp);
2604

2605
	if (!TransactionIdEquals(HeapTupleHeaderGetXmax(htup), xid))
2606
		elog(PANIC, "_heap_unlock_tuple: invalid xmax in rollback");
2607 2608
	htup->t_infomask &= ~HEAP_XMAX_UNLOGGED;
	htup->t_infomask |= HEAP_XMAX_INVALID;
2609 2610
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
	WriteBuffer(buffer);
2611 2612
}

2613 2614
void
heap_redo(XLogRecPtr lsn, XLogRecord *record)
2615
{
2616
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
2617

2618
	info &= XLOG_HEAP_OPMASK;
2619 2620 2621 2622 2623
	if (info == XLOG_HEAP_INSERT)
		heap_xlog_insert(true, lsn, record);
	else if (info == XLOG_HEAP_DELETE)
		heap_xlog_delete(true, lsn, record);
	else if (info == XLOG_HEAP_UPDATE)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2624
		heap_xlog_update(true, lsn, record, false);
2625
	else if (info == XLOG_HEAP_MOVE)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2626
		heap_xlog_update(true, lsn, record, true);
2627 2628
	else if (info == XLOG_HEAP_CLEAN)
		heap_xlog_clean(true, lsn, record);
2629 2630
	else if (info == XLOG_HEAP_NEWPAGE)
		heap_xlog_newpage(true, lsn, record);
2631
	else
2632
		elog(PANIC, "heap_redo: unknown op code %u", info);
2633 2634
}

2635 2636
void
heap_undo(XLogRecPtr lsn, XLogRecord *record)
2637
{
2638
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
2639

2640
	info &= XLOG_HEAP_OPMASK;
2641 2642 2643 2644 2645
	if (info == XLOG_HEAP_INSERT)
		heap_xlog_insert(false, lsn, record);
	else if (info == XLOG_HEAP_DELETE)
		heap_xlog_delete(false, lsn, record);
	else if (info == XLOG_HEAP_UPDATE)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2646
		heap_xlog_update(false, lsn, record, false);
2647
	else if (info == XLOG_HEAP_MOVE)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2648
		heap_xlog_update(false, lsn, record, true);
2649 2650
	else if (info == XLOG_HEAP_CLEAN)
		heap_xlog_clean(false, lsn, record);
2651 2652
	else if (info == XLOG_HEAP_NEWPAGE)
		heap_xlog_newpage(false, lsn, record);
2653
	else
2654
		elog(PANIC, "heap_undo: unknown op code %u", info);
2655 2656
}

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2657 2658 2659
static void
out_target(char *buf, xl_heaptid *target)
{
2660
	sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
Bruce Momjian's avatar
Bruce Momjian committed
2661
		 target->node.spcNode, target->node.dbNode, target->node.relNode,
2662 2663
			ItemPointerGetBlockNumber(&(target->tid)),
			ItemPointerGetOffsetNumber(&(target->tid)));
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2664
}
2665

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2666
void
2667
heap_desc(char *buf, uint8 xl_info, char *rec)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2668
{
2669
	uint8		info = xl_info & ~XLR_INFO_MASK;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2670

2671
	info &= XLOG_HEAP_OPMASK;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2672 2673
	if (info == XLOG_HEAP_INSERT)
	{
2674 2675
		xl_heap_insert *xlrec = (xl_heap_insert *) rec;

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2676 2677 2678 2679 2680
		strcat(buf, "insert: ");
		out_target(buf, &(xlrec->target));
	}
	else if (info == XLOG_HEAP_DELETE)
	{
2681 2682
		xl_heap_delete *xlrec = (xl_heap_delete *) rec;

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2683 2684 2685
		strcat(buf, "delete: ");
		out_target(buf, &(xlrec->target));
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2686
	else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2687
	{
2688 2689
		xl_heap_update *xlrec = (xl_heap_update *) rec;

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2690 2691 2692 2693
		if (info == XLOG_HEAP_UPDATE)
			strcat(buf, "update: ");
		else
			strcat(buf, "move: ");
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2694 2695
		out_target(buf, &(xlrec->target));
		sprintf(buf + strlen(buf), "; new %u/%u",
2696 2697
				ItemPointerGetBlockNumber(&(xlrec->newtid)),
				ItemPointerGetOffsetNumber(&(xlrec->newtid)));
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2698
	}
2699 2700
	else if (info == XLOG_HEAP_CLEAN)
	{
2701 2702
		xl_heap_clean *xlrec = (xl_heap_clean *) rec;

2703 2704 2705
		sprintf(buf + strlen(buf), "clean: rel %u/%u/%u; blk %u",
				xlrec->node.spcNode, xlrec->node.dbNode,
				xlrec->node.relNode, xlrec->block);
2706
	}
2707 2708 2709 2710 2711 2712 2713 2714
	else if (info == XLOG_HEAP_NEWPAGE)
	{
		xl_heap_newpage *xlrec = (xl_heap_newpage *) rec;

		sprintf(buf + strlen(buf), "newpage: rel %u/%u/%u; blk %u",
				xlrec->node.spcNode, xlrec->node.dbNode,
				xlrec->node.relNode, xlrec->blkno);
	}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
2715 2716 2717
	else
		strcat(buf, "UNKNOWN");
}