tuptoaster.c 35 KB
Newer Older
1 2 3 4
/*-------------------------------------------------------------------------
 *
 * tuptoaster.c
 *	  Support routines for external and compressed storage of
5
 *	  variable size attributes.
6
 *
7
 * Copyright (c) 2000-2005, PostgreSQL Global Development Group
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.58 2006/01/11 08:43:11 neilc Exp $
12 13 14
 *
 *
 * INTERFACE ROUTINES
15
 *		toast_insert_or_update -
16 17 18
 *			Try to make a given tuple fit into one page by compressing
 *			or moving off attributes
 *
19 20 21
 *		toast_delete -
 *			Reclaim toast storage when a tuple is deleted
 *
Jan Wieck's avatar
TOAST  
Jan Wieck committed
22 23 24
 *		heap_tuple_untoast_attr -
 *			Fetch back a given value from the "secondary" relation
 *
25 26 27
 *-------------------------------------------------------------------------
 */

28 29
#include "postgres.h"

Jan Wieck's avatar
TOAST  
Jan Wieck committed
30 31
#include <unistd.h>
#include <fcntl.h>
32

Jan Wieck's avatar
TOAST  
Jan Wieck committed
33 34 35 36 37 38 39 40
#include "access/heapam.h"
#include "access/genam.h"
#include "access/tuptoaster.h"
#include "catalog/catalog.h"
#include "utils/rel.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lzcompress.h"
41
#include "utils/typcache.h"
42 43


Jan Wieck's avatar
TOAST  
Jan Wieck committed
44 45
#undef TOAST_DEBUG

46
static void toast_delete_datum(Relation rel, Datum value);
47
static Datum toast_save_datum(Relation rel, Datum value);
48
static varattrib *toast_fetch_datum(varattrib *attr);
49
static varattrib *toast_fetch_datum_slice(varattrib *attr,
Bruce Momjian's avatar
Bruce Momjian committed
50
						int32 sliceoffset, int32 length);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
51

52

53 54 55
/* ----------
 * heap_tuple_fetch_attr -
 *
56
 *	Public entry point to get back a toasted value
57 58 59
 *	external storage (possibly still in compressed format).
 * ----------
 */
60
varattrib *
61
heap_tuple_fetch_attr(varattrib *attr)
62
{
63
	varattrib  *result;
64 65 66

	if (VARATT_IS_EXTERNAL(attr))
	{
67
		/*
68 69 70 71
		 * This is an external stored plain value
		 */
		result = toast_fetch_datum(attr);
	}
72
	else
73
	{
74
		/*
75
		 * This is a plain value inside of the main tuple - why am I called?
76 77
		 */
		result = attr;
78
	}
79 80

	return result;
81
}
Jan Wieck's avatar
TOAST  
Jan Wieck committed
82 83 84 85 86 87 88 89 90


/* ----------
 * heap_tuple_untoast_attr -
 *
 *	Public entry point to get back a toasted value from compression
 *	or external storage.
 * ----------
 */
91
varattrib *
Jan Wieck's avatar
TOAST  
Jan Wieck committed
92 93
heap_tuple_untoast_attr(varattrib *attr)
{
94
	varattrib  *result;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
95 96 97 98 99 100 101 102 103 104

	if (VARATT_IS_EXTERNAL(attr))
	{
		if (VARATT_IS_COMPRESSED(attr))
		{
			/* ----------
			 * This is an external stored compressed value
			 * Fetch it from the toast heap and decompress.
			 * ----------
			 */
105
			varattrib  *tmp;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
106 107

			tmp = toast_fetch_datum(attr);
108 109
			result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
										  + VARHDRSZ);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
110
			VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
111 112
				+ VARHDRSZ;
			pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
113 114 115 116 117

			pfree(tmp);
		}
		else
		{
118
			/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
119 120 121 122 123 124 125
			 * This is an external stored plain value
			 */
			result = toast_fetch_datum(attr);
		}
	}
	else if (VARATT_IS_COMPRESSED(attr))
	{
126
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
127 128
		 * This is a compressed value inside of the main tuple
		 */
129 130
		result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
									  + VARHDRSZ);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
131
		VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
132 133
			+ VARHDRSZ;
		pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
134 135
	}
	else
136 137

		/*
138
		 * This is a plain value inside of the main tuple - why am I called?
Jan Wieck's avatar
TOAST  
Jan Wieck committed
139 140 141 142 143 144 145
		 */
		return attr;

	return result;
}


146 147 148
/* ----------
 * heap_tuple_untoast_attr_slice -
 *
Bruce Momjian's avatar
Bruce Momjian committed
149 150
 *		Public entry point to get back part of a toasted value
 *		from compression or external storage.
151 152
 * ----------
 */
Bruce Momjian's avatar
Bruce Momjian committed
153
varattrib *
154 155 156 157
heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
{
	varattrib  *preslice;
	varattrib  *result;
Bruce Momjian's avatar
Bruce Momjian committed
158 159
	int32		attrsize;

160 161
	if (VARATT_IS_COMPRESSED(attr))
	{
Bruce Momjian's avatar
Bruce Momjian committed
162 163
		varattrib  *tmp;

164 165 166 167
		if (VARATT_IS_EXTERNAL(attr))
			tmp = toast_fetch_datum(attr);
		else
		{
Bruce Momjian's avatar
Bruce Momjian committed
168
			tmp = attr;			/* compressed in main tuple */
169
		}
Bruce Momjian's avatar
Bruce Momjian committed
170

171 172 173 174
		preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
										+ VARHDRSZ);
		VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
		pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
Bruce Momjian's avatar
Bruce Momjian committed
175 176

		if (tmp != attr)
177 178
			pfree(tmp);
	}
Bruce Momjian's avatar
Bruce Momjian committed
179
	else
180 181 182
	{
		/* Plain value */
		if (VARATT_IS_EXTERNAL(attr))
Bruce Momjian's avatar
Bruce Momjian committed
183
		{
184
			/* fast path */
185
			return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
186 187 188 189
		}
		else
			preslice = attr;
	}
Bruce Momjian's avatar
Bruce Momjian committed
190

191
	/* slicing of datum for compressed cases and plain value */
Bruce Momjian's avatar
Bruce Momjian committed
192

193
	attrsize = VARSIZE(preslice) - VARHDRSZ;
Bruce Momjian's avatar
Bruce Momjian committed
194
	if (sliceoffset >= attrsize)
195 196 197 198
	{
		sliceoffset = 0;
		slicelength = 0;
	}
Bruce Momjian's avatar
Bruce Momjian committed
199

200 201
	if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
		slicelength = attrsize - sliceoffset;
Bruce Momjian's avatar
Bruce Momjian committed
202

203 204
	result = (varattrib *) palloc(slicelength + VARHDRSZ);
	VARATT_SIZEP(result) = slicelength + VARHDRSZ;
Bruce Momjian's avatar
Bruce Momjian committed
205

206
	memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
Bruce Momjian's avatar
Bruce Momjian committed
207 208 209 210

	if (preslice != attr)
		pfree(preslice);

211 212 213 214
	return result;
}


215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
/* ----------
 * toast_raw_datum_size -
 *
 *	Return the raw (detoasted) size of a varlena datum
 * ----------
 */
Size
toast_raw_datum_size(Datum value)
{
	varattrib  *attr = (varattrib *) DatumGetPointer(value);
	Size		result;

	if (VARATT_IS_COMPRESSED(attr))
	{
		/*
230 231
		 * va_rawsize shows the original data size, whether the datum is
		 * external or not.
232 233 234 235 236 237
		 */
		result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
	}
	else if (VARATT_IS_EXTERNAL(attr))
	{
		/*
238 239
		 * an uncompressed external attribute has rawsize including the header
		 * (not too consistent!)
240 241 242 243 244 245 246 247 248 249 250
		 */
		result = attr->va_content.va_external.va_rawsize;
	}
	else
	{
		/* plain untoasted datum */
		result = VARSIZE(attr);
	}
	return result;
}

251 252 253 254 255 256
/* ----------
 * toast_datum_size
 *
 *	Return the physical storage size (possibly compressed) of a varlena datum
 * ----------
 */
257
Size
258 259
toast_datum_size(Datum value)
{
260
	varattrib  *attr = (varattrib *) DatumGetPointer(value);
261 262 263 264 265 266
	Size		result;

	if (VARATT_IS_EXTERNAL(attr))
	{
		/*
		 * Attribute is stored externally - return the extsize whether
267 268
		 * compressed or not.  We do not count the size of the toast pointer
		 * ... should we?
269 270 271 272 273 274
		 */
		result = attr->va_content.va_external.va_extsize;
	}
	else
	{
		/*
275 276
		 * Attribute is stored inline either compressed or not, just calculate
		 * the size of the datum in either case.
277 278 279 280 281 282
		 */
		result = VARSIZE(attr);
	}
	return result;
}

283

Jan Wieck's avatar
TOAST  
Jan Wieck committed
284 285 286 287 288 289
/* ----------
 * toast_delete -
 *
 *	Cascaded delete toast-entries on DELETE
 * ----------
 */
290
void
Jan Wieck's avatar
TOAST  
Jan Wieck committed
291 292
toast_delete(Relation rel, HeapTuple oldtup)
{
293 294 295 296
	TupleDesc	tupleDesc;
	Form_pg_attribute *att;
	int			numAttrs;
	int			i;
297
	Datum		toast_values[MaxHeapAttributeNumber];
298
	bool		toast_isnull[MaxHeapAttributeNumber];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
299

300
	/*
301 302
	 * Get the tuple descriptor and break down the tuple into fields.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
303
	 * NOTE: it's debatable whether to use heap_deformtuple() here or just
304 305 306 307 308 309
	 * heap_getattr() only the varlena columns.  The latter could win if there
	 * are few varlena columns and many non-varlena ones. However,
	 * heap_deformtuple costs only O(N) while the heap_getattr way would cost
	 * O(N^2) if there are many varlena columns, so it seems better to err on
	 * the side of linear cost.  (We won't even be here unless there's at
	 * least one varlena column, by the way.)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
310
	 */
311 312
	tupleDesc = rel->rd_att;
	att = tupleDesc->attrs;
313 314 315
	numAttrs = tupleDesc->natts;

	Assert(numAttrs <= MaxHeapAttributeNumber);
316
	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
317

318
	/*
319 320
	 * Check for external stored attributes and delete them from the secondary
	 * relation.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
321 322 323
	 */
	for (i = 0; i < numAttrs; i++)
	{
324 325
		if (att[i]->attlen == -1)
		{
Bruce Momjian's avatar
Bruce Momjian committed
326
			Datum		value = toast_values[i];
327

328
			if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
Jan Wieck's avatar
TOAST  
Jan Wieck committed
329
				toast_delete_datum(rel, value);
330
		}
Jan Wieck's avatar
TOAST  
Jan Wieck committed
331 332 333 334 335 336 337
	}
}


/* ----------
 * toast_insert_or_update -
 *
338
 *	Delete no-longer-used toast-entries and create new ones to
Jan Wieck's avatar
TOAST  
Jan Wieck committed
339
 *	make the new tuple fit on INSERT or UPDATE
340 341 342 343 344 345 346 347 348 349
 *
 * Inputs:
 *	newtup: the candidate new tuple to be inserted
 *	oldtup: the old row version for UPDATE, or NULL for INSERT
 * Result:
 *	either newtup if no toasting is needed, or a palloc'd modified tuple
 *	that is what should actually get stored
 *
 * NOTE: neither newtup nor oldtup will be modified.  This is a change
 * from the pre-8.1 API of this routine.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
350 351
 * ----------
 */
352
HeapTuple
Jan Wieck's avatar
TOAST  
Jan Wieck committed
353 354
toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
{
355
	HeapTuple	result_tuple;
356 357 358 359 360 361 362 363 364 365 366 367 368
	TupleDesc	tupleDesc;
	Form_pg_attribute *att;
	int			numAttrs;
	int			i;

	bool		need_change = false;
	bool		need_free = false;
	bool		need_delold = false;
	bool		has_nulls = false;

	Size		maxDataLen;

	char		toast_action[MaxHeapAttributeNumber];
369 370
	bool		toast_isnull[MaxHeapAttributeNumber];
	bool		toast_oldisnull[MaxHeapAttributeNumber];
371
	Datum		toast_values[MaxHeapAttributeNumber];
372
	Datum		toast_oldvalues[MaxHeapAttributeNumber];
373 374 375
	int32		toast_sizes[MaxHeapAttributeNumber];
	bool		toast_free[MaxHeapAttributeNumber];
	bool		toast_delold[MaxHeapAttributeNumber];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
376

377
	/*
378
	 * Get the tuple descriptor and break down the tuple(s) into fields.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
379
	 */
380 381
	tupleDesc = rel->rd_att;
	att = tupleDesc->attrs;
382 383 384
	numAttrs = tupleDesc->natts;

	Assert(numAttrs <= MaxHeapAttributeNumber);
385
	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
386
	if (oldtup != NULL)
387
		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
388 389 390

	/* ----------
	 * Then collect information about the values given
391 392 393 394 395
	 *
	 * NOTE: toast_action[i] can have these values:
	 *		' '		default handling
	 *		'p'		already processed --- don't touch it
	 *		'x'		incompressible, but OK to move off
396 397 398
	 *
	 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
	 *		toast_action[i] different from 'p'.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
399 400
	 * ----------
	 */
401 402 403
	memset(toast_action, ' ', numAttrs * sizeof(char));
	memset(toast_free, 0, numAttrs * sizeof(bool));
	memset(toast_delold, 0, numAttrs * sizeof(bool));
404

Jan Wieck's avatar
TOAST  
Jan Wieck committed
405 406
	for (i = 0; i < numAttrs; i++)
	{
407 408
		varattrib  *old_value;
		varattrib  *new_value;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
409 410 411

		if (oldtup != NULL)
		{
412
			/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
413 414
			 * For UPDATE get the old and new values of this attribute
			 */
415
			old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
416
			new_value = (varattrib *) DatumGetPointer(toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
417

418 419 420
			/*
			 * If the old value is an external stored one, check if it has
			 * changed so we have to delete it later.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
421
			 */
422
			if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
423
				VARATT_IS_EXTERNAL(old_value))
Jan Wieck's avatar
TOAST  
Jan Wieck committed
424
			{
425
				if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
426 427 428 429
					old_value->va_content.va_external.va_valueid !=
					new_value->va_content.va_external.va_valueid ||
					old_value->va_content.va_external.va_toastrelid !=
					new_value->va_content.va_external.va_toastrelid)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
430
				{
431
					/*
432
					 * The old external stored value isn't needed any more
433
					 * after the update
Jan Wieck's avatar
TOAST  
Jan Wieck committed
434 435 436 437 438 439
					 */
					toast_delold[i] = true;
					need_delold = true;
				}
				else
				{
440
					/*
441 442 443
					 * This attribute isn't changed by this update so we reuse
					 * the original reference to the old value in the new
					 * tuple.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
444 445 446 447 448 449 450 451 452
					 */
					toast_action[i] = 'p';
					toast_sizes[i] = VARATT_SIZE(toast_values[i]);
					continue;
				}
			}
		}
		else
		{
453
			/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
454 455
			 * For INSERT simply get the new value
			 */
456
			new_value = (varattrib *) DatumGetPointer(toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
457 458
		}

459
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
460 461
		 * Handle NULL attributes
		 */
462
		if (toast_isnull[i])
Jan Wieck's avatar
TOAST  
Jan Wieck committed
463 464 465 466 467 468
		{
			toast_action[i] = 'p';
			has_nulls = true;
			continue;
		}

469
		/*
470
		 * Now look at varlena attributes
Jan Wieck's avatar
TOAST  
Jan Wieck committed
471 472 473
		 */
		if (att[i]->attlen == -1)
		{
474
			/*
475
			 * If the table's attribute says PLAIN always, force it so.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
476 477 478 479
			 */
			if (att[i]->attstorage == 'p')
				toast_action[i] = 'p';

480
			/*
481
			 * We took care of UPDATE above, so any external value we find
482 483
			 * still in the tuple must be someone else's we cannot reuse.
			 * Expand it to plain (and, probably, toast it again below).
Jan Wieck's avatar
TOAST  
Jan Wieck committed
484
			 */
485
			if (VARATT_IS_EXTERNAL(new_value))
Jan Wieck's avatar
TOAST  
Jan Wieck committed
486
			{
487 488
				new_value = heap_tuple_untoast_attr(new_value);
				toast_values[i] = PointerGetDatum(new_value);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
489 490 491 492 493
				toast_free[i] = true;
				need_change = true;
				need_free = true;
			}

494
			/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
495 496
			 * Remember the size of this attribute
			 */
497
			toast_sizes[i] = VARATT_SIZE(new_value);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
498 499 500
		}
		else
		{
501
			/*
502
			 * Not a varlena attribute, plain storage always
Jan Wieck's avatar
TOAST  
Jan Wieck committed
503 504 505 506 507 508
			 */
			toast_action[i] = 'p';
		}
	}

	/* ----------
509
	 * Compress and/or save external until data fits into target length
Jan Wieck's avatar
TOAST  
Jan Wieck committed
510 511 512
	 *
	 *	1: Inline compress attributes with attstorage 'x'
	 *	2: Store attributes with attstorage 'x' or 'e' external
513
	 *	3: Inline compress attributes with attstorage 'm'
Jan Wieck's avatar
TOAST  
Jan Wieck committed
514 515 516 517 518 519
	 *	4: Store attributes with attstorage 'm' external
	 * ----------
	 */
	maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
	if (has_nulls)
		maxDataLen += BITMAPLEN(numAttrs);
520
	maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
521

522
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
523 524
	 * Look for attributes with attstorage 'x' to compress
	 */
525 526
	while (MAXALIGN(heap_compute_data_size(tupleDesc,
										   toast_values, toast_isnull)) >
527
		   maxDataLen)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
528
	{
529 530 531 532
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(sizeof(varattrib));
		Datum		old_value;
		Datum		new_value;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
533

534
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
535 536 537 538
		 * Search for the biggest yet uncompressed internal attribute
		 */
		for (i = 0; i < numAttrs; i++)
		{
539
			if (toast_action[i] != ' ')
Jan Wieck's avatar
TOAST  
Jan Wieck committed
540 541 542 543 544 545 546 547
				continue;
			if (VARATT_IS_EXTENDED(toast_values[i]))
				continue;
			if (att[i]->attstorage != 'x')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
548
				biggest_size = toast_sizes[i];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
549 550 551 552 553 554
			}
		}

		if (biggest_attno < 0)
			break;

555
		/*
556
		 * Attempt to compress it inline
Jan Wieck's avatar
TOAST  
Jan Wieck committed
557
		 */
558 559 560
		i = biggest_attno;
		old_value = toast_values[i];
		new_value = toast_compress_datum(old_value);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
561

562 563 564 565 566
		if (DatumGetPointer(new_value) != NULL)
		{
			/* successful compression */
			if (toast_free[i])
				pfree(DatumGetPointer(old_value));
567 568 569 570 571
			toast_values[i] = new_value;
			toast_free[i] = true;
			toast_sizes[i] = VARATT_SIZE(toast_values[i]);
			need_change = true;
			need_free = true;
572 573 574
		}
		else
		{
575
			/*
576
			 * incompressible data, ignore on subsequent compression passes
577
			 */
578 579
			toast_action[i] = 'x';
		}
Jan Wieck's avatar
TOAST  
Jan Wieck committed
580 581
	}

582
	/*
583 584
	 * Second we look for attributes of attstorage 'x' or 'e' that are still
	 * inline.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
585
	 */
586 587
	while (MAXALIGN(heap_compute_data_size(tupleDesc,
										   toast_values, toast_isnull)) >
588
		   maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
589
	{
590 591 592
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(sizeof(varattrib));
		Datum		old_value;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
593

594 595 596 597
		/*------
		 * Search for the biggest yet inlined attribute with
		 * attstorage equals 'x' or 'e'
		 *------
Jan Wieck's avatar
TOAST  
Jan Wieck committed
598 599 600 601 602 603 604 605 606 607 608 609
		 */
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] == 'p')
				continue;
			if (VARATT_IS_EXTERNAL(toast_values[i]))
				continue;
			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
610
				biggest_size = toast_sizes[i];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
611 612 613 614 615 616
			}
		}

		if (biggest_attno < 0)
			break;

617
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
618 619
		 * Store this external
		 */
620 621 622
		i = biggest_attno;
		old_value = toast_values[i];
		toast_action[i] = 'p';
623
		toast_values[i] = toast_save_datum(rel, toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
624 625 626
		if (toast_free[i])
			pfree(DatumGetPointer(old_value));

627 628
		toast_free[i] = true;
		toast_sizes[i] = VARATT_SIZE(toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
629 630

		need_change = true;
631
		need_free = true;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
632 633
	}

634 635 636
	/*
	 * Round 3 - this time we take attributes with storage 'm' into
	 * compression
Jan Wieck's avatar
TOAST  
Jan Wieck committed
637
	 */
638 639
	while (MAXALIGN(heap_compute_data_size(tupleDesc,
										   toast_values, toast_isnull)) >
640
		   maxDataLen)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
641
	{
642 643 644 645
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(sizeof(varattrib));
		Datum		old_value;
		Datum		new_value;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
646

647
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
648 649 650 651
		 * Search for the biggest yet uncompressed internal attribute
		 */
		for (i = 0; i < numAttrs; i++)
		{
652
			if (toast_action[i] != ' ')
Jan Wieck's avatar
TOAST  
Jan Wieck committed
653 654 655 656 657 658 659 660
				continue;
			if (VARATT_IS_EXTENDED(toast_values[i]))
				continue;
			if (att[i]->attstorage != 'm')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
661
				biggest_size = toast_sizes[i];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
662 663 664 665 666 667
			}
		}

		if (biggest_attno < 0)
			break;

668
		/*
669
		 * Attempt to compress it inline
Jan Wieck's avatar
TOAST  
Jan Wieck committed
670
		 */
671 672 673
		i = biggest_attno;
		old_value = toast_values[i];
		new_value = toast_compress_datum(old_value);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
674

675 676 677 678 679
		if (DatumGetPointer(new_value) != NULL)
		{
			/* successful compression */
			if (toast_free[i])
				pfree(DatumGetPointer(old_value));
680 681 682 683 684
			toast_values[i] = new_value;
			toast_free[i] = true;
			toast_sizes[i] = VARATT_SIZE(toast_values[i]);
			need_change = true;
			need_free = true;
685 686 687
		}
		else
		{
688
			/*
689
			 * incompressible data, ignore on subsequent compression passes
690
			 */
691 692
			toast_action[i] = 'x';
		}
Jan Wieck's avatar
TOAST  
Jan Wieck committed
693 694
	}

695
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
696 697
	 * Finally we store attributes of type 'm' external
	 */
698 699
	while (MAXALIGN(heap_compute_data_size(tupleDesc,
										   toast_values, toast_isnull)) >
700
		   maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
701
	{
702 703 704
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(sizeof(varattrib));
		Datum		old_value;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
705

706 707 708 709
		/*--------
		 * Search for the biggest yet inlined attribute with
		 * attstorage = 'm'
		 *--------
Jan Wieck's avatar
TOAST  
Jan Wieck committed
710 711 712 713 714 715 716 717 718 719 720 721
		 */
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] == 'p')
				continue;
			if (VARATT_IS_EXTERNAL(toast_values[i]))
				continue;
			if (att[i]->attstorage != 'm')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
722
				biggest_size = toast_sizes[i];
Jan Wieck's avatar
TOAST  
Jan Wieck committed
723 724 725 726 727 728
			}
		}

		if (biggest_attno < 0)
			break;

729
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
730 731
		 * Store this external
		 */
732 733 734
		i = biggest_attno;
		old_value = toast_values[i];
		toast_action[i] = 'p';
735
		toast_values[i] = toast_save_datum(rel, toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
736 737 738
		if (toast_free[i])
			pfree(DatumGetPointer(old_value));

739 740
		toast_free[i] = true;
		toast_sizes[i] = VARATT_SIZE(toast_values[i]);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
741 742

		need_change = true;
743
		need_free = true;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
744 745
	}

746
	/*
747 748
	 * In the case we toasted any values, we need to build a new heap tuple
	 * with the changed values.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
749 750 751
	 */
	if (need_change)
	{
752
		HeapTupleHeader olddata = newtup->t_data;
753
		HeapTupleHeader new_data;
754
		int32		new_len;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
755

756
		/*
757 758
		 * Calculate the new size of the tuple.  Header size should not
		 * change, but data size might.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
759 760 761 762
		 */
		new_len = offsetof(HeapTupleHeaderData, t_bits);
		if (has_nulls)
			new_len += BITMAPLEN(numAttrs);
763
		if (olddata->t_infomask & HEAP_HASOID)
764
			new_len += sizeof(Oid);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
765
		new_len = MAXALIGN(new_len);
766
		Assert(new_len == olddata->t_hoff);
767 768
		new_len += heap_compute_data_size(tupleDesc,
										  toast_values, toast_isnull);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
769

770
		/*
771
		 * Allocate and zero the space needed, and fill HeapTupleData fields.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
772
		 */
773 774 775 776 777 778
		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
		result_tuple->t_len = new_len;
		result_tuple->t_self = newtup->t_self;
		result_tuple->t_tableOid = newtup->t_tableOid;
		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
		result_tuple->t_data = new_data;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
779

780
		/*
781
		 * Put the existing tuple header and the changed values into place
Jan Wieck's avatar
TOAST  
Jan Wieck committed
782
		 */
783
		memcpy(new_data, olddata, olddata->t_hoff);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
784

785 786 787 788
		heap_fill_tuple(tupleDesc,
						toast_values,
						toast_isnull,
						(char *) new_data + olddata->t_hoff,
789 790
						&(new_data->t_infomask),
						has_nulls ? new_data->t_bits : NULL);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
791
	}
792 793
	else
		result_tuple = newtup;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
794

795
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
796 797 798 799 800 801 802
	 * Free allocated temp values
	 */
	if (need_free)
		for (i = 0; i < numAttrs; i++)
			if (toast_free[i])
				pfree(DatumGetPointer(toast_values[i]));

803
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
804 805 806 807 808
	 * Delete external values from the old tuple
	 */
	if (need_delold)
		for (i = 0; i < numAttrs; i++)
			if (toast_delold[i])
809
				toast_delete_datum(rel, toast_oldvalues[i]);
810 811

	return result_tuple;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
812 813 814
}


815 816 817 818 819
/* ----------
 * toast_flatten_tuple_attribute -
 *
 *	If a Datum is of composite type, "flatten" it to contain no toasted fields.
 *	This must be invoked on any potentially-composite field that is to be
Bruce Momjian's avatar
Bruce Momjian committed
820
 *	inserted into a tuple.	Doing this preserves the invariant that toasting
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
 *	goes only one level deep in a tuple.
 * ----------
 */
Datum
toast_flatten_tuple_attribute(Datum value,
							  Oid typeId, int32 typeMod)
{
	TupleDesc	tupleDesc;
	HeapTupleHeader olddata;
	HeapTupleHeader new_data;
	int32		new_len;
	HeapTupleData tmptup;
	Form_pg_attribute *att;
	int			numAttrs;
	int			i;
	bool		need_change = false;
	bool		has_nulls = false;
	Datum		toast_values[MaxTupleAttributeNumber];
839
	bool		toast_isnull[MaxTupleAttributeNumber];
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
	bool		toast_free[MaxTupleAttributeNumber];

	/*
	 * See if it's a composite type, and get the tupdesc if so.
	 */
	tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
	if (tupleDesc == NULL)
		return value;			/* not a composite type */

	att = tupleDesc->attrs;
	numAttrs = tupleDesc->natts;

	/*
	 * Break down the tuple into fields.
	 */
	olddata = DatumGetHeapTupleHeader(value);
	Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
	Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
	/* Build a temporary HeapTuple control structure */
	tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
	ItemPointerSetInvalid(&(tmptup.t_self));
	tmptup.t_tableOid = InvalidOid;
	tmptup.t_data = olddata;

	Assert(numAttrs <= MaxTupleAttributeNumber);
865
	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
866 867 868 869 870 871 872 873

	memset(toast_free, 0, numAttrs * sizeof(bool));

	for (i = 0; i < numAttrs; i++)
	{
		/*
		 * Look at non-null varlena attributes
		 */
874
		if (toast_isnull[i])
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897
			has_nulls = true;
		else if (att[i]->attlen == -1)
		{
			varattrib  *new_value;

			new_value = (varattrib *) DatumGetPointer(toast_values[i]);
			if (VARATT_IS_EXTENDED(new_value))
			{
				new_value = heap_tuple_untoast_attr(new_value);
				toast_values[i] = PointerGetDatum(new_value);
				toast_free[i] = true;
				need_change = true;
			}
		}
	}

	/*
	 * If nothing to untoast, just return the original tuple.
	 */
	if (!need_change)
		return value;

	/*
898 899
	 * Calculate the new size of the tuple.  Header size should not change,
	 * but data size might.
900 901 902 903 904 905 906 907
	 */
	new_len = offsetof(HeapTupleHeaderData, t_bits);
	if (has_nulls)
		new_len += BITMAPLEN(numAttrs);
	if (olddata->t_infomask & HEAP_HASOID)
		new_len += sizeof(Oid);
	new_len = MAXALIGN(new_len);
	Assert(new_len == olddata->t_hoff);
908
	new_len += heap_compute_data_size(tupleDesc, toast_values, toast_isnull);
909 910 911 912 913 914 915 916 917 918

	new_data = (HeapTupleHeader) palloc0(new_len);

	/*
	 * Put the tuple header and the changed values into place
	 */
	memcpy(new_data, olddata, olddata->t_hoff);

	HeapTupleHeaderSetDatumLength(new_data, new_len);

919 920 921 922 923 924
	heap_fill_tuple(tupleDesc,
					toast_values,
					toast_isnull,
					(char *) new_data + olddata->t_hoff,
					&(new_data->t_infomask),
					has_nulls ? new_data->t_bits : NULL);
925 926 927 928 929 930 931 932 933 934 935 936

	/*
	 * Free allocated temp values
	 */
	for (i = 0; i < numAttrs; i++)
		if (toast_free[i])
			pfree(DatumGetPointer(toast_values[i]));

	return PointerGetDatum(new_data);
}


Jan Wieck's avatar
TOAST  
Jan Wieck committed
937 938 939
/* ----------
 * toast_compress_datum -
 *
940
 *	Create a compressed version of a varlena datum
941 942 943 944
 *
 *	If we fail (ie, compressed result is actually bigger than original)
 *	then return NULL.  We must not use compressed data if it'd expand
 *	the tuple!
Jan Wieck's avatar
TOAST  
Jan Wieck committed
945 946
 * ----------
 */
947
Datum
Jan Wieck's avatar
TOAST  
Jan Wieck committed
948 949
toast_compress_datum(Datum value)
{
950
	varattrib  *tmp;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
951

952
	tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
953
	pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
954 955 956 957 958 959 960 961 962 963 964 965 966 967
				  (PGLZ_Header *) tmp,
				  PGLZ_strategy_default);
	if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
	{
		/* successful compression */
		VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
		return PointerGetDatum(tmp);
	}
	else
	{
		/* incompressible data */
		pfree(tmp);
		return PointerGetDatum(NULL);
	}
Jan Wieck's avatar
TOAST  
Jan Wieck committed
968 969 970 971 972 973 974 975 976 977 978
}


/* ----------
 * toast_save_datum -
 *
 *	Save one single datum into the secondary relation and return
 *	a varattrib reference for it.
 * ----------
 */
static Datum
979
toast_save_datum(Relation rel, Datum value)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
980
{
981 982 983 984 985
	Relation	toastrel;
	Relation	toastidx;
	HeapTuple	toasttup;
	TupleDesc	toasttupDesc;
	Datum		t_values[3];
986
	bool		t_isnull[3];
987
	varattrib  *result;
988 989 990 991
	struct
	{
		struct varlena hdr;
		char		data[TOAST_MAX_CHUNK_SIZE];
992
	}			chunk_data;
993 994 995 996
	int32		chunk_size;
	int32		chunk_seq = 0;
	char	   *data_p;
	int32		data_todo;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
997

998
	/*
999 1000 1001
	 * Open the toast relation and its index.  We can use the index to check
	 * uniqueness of the OID we assign to the toasted item, even though it has
	 * additional columns besides OID.
1002 1003 1004 1005 1006
	 */
	toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
	toasttupDesc = toastrel->rd_att;
	toastidx = index_open(toastrel->rd_rel->reltoastidxid);

1007
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1008 1009
	 * Create the varattrib reference
	 */
1010
	result = (varattrib *) palloc(sizeof(varattrib));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1011

1012
	result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1013 1014 1015
	if (VARATT_IS_COMPRESSED(value))
	{
		result->va_header |= VARATT_FLAG_COMPRESSED;
1016 1017
		result->va_content.va_external.va_rawsize =
			((varattrib *) value)->va_content.va_compressed.va_rawsize;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1018 1019 1020
	}
	else
		result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
1021 1022 1023

	result->va_content.va_external.va_extsize =
		VARATT_SIZE(value) - VARHDRSZ;
1024 1025
	result->va_content.va_external.va_valueid =
		GetNewOidWithIndex(toastrel, toastidx);
1026 1027
	result->va_content.va_external.va_toastrelid =
		rel->rd_rel->reltoastrelid;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1028

1029
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1030 1031 1032
	 * Initialize constant parts of the tuple data
	 */
	t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
1033
	t_values[2] = PointerGetDatum(&chunk_data);
1034 1035 1036
	t_isnull[0] = false;
	t_isnull[1] = false;
	t_isnull[2] = false;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1037

1038
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1039 1040
	 * Get the data to process
	 */
1041 1042
	data_p = VARATT_DATA(value);
	data_todo = VARATT_SIZE(value) - VARHDRSZ;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1043

1044
	/*
1045 1046
	 * We must explicitly lock the toast index because we aren't using an
	 * index scan here.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1047
	 */
1048
	LockRelation(toastidx, RowExclusiveLock);
1049

1050
	/*
1051
	 * Split up the item into chunks
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1052 1053 1054
	 */
	while (data_todo > 0)
	{
1055
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1056 1057
		 * Calculate the size of this chunk
		 */
1058
		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1059

1060
		/*
1061
		 * Build a tuple and store it
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1062
		 */
1063
		t_values[1] = Int32GetDatum(chunk_seq++);
1064 1065
		VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
		memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1066
		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1067
		if (!HeapTupleIsValid(toasttup))
1068
			elog(ERROR, "failed to build TOAST tuple");
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1069

1070
		simple_heap_insert(toastrel, toasttup);
1071

1072
		/*
1073
		 * Create the index entry.	We cheat a little here by not using
1074 1075
		 * FormIndexDatum: this relies on the knowledge that the index columns
		 * are the same as the initial columns of the table.
1076
		 *
1077 1078
		 * Note also that there had better not be any user-created index on
		 * the TOAST table, since we don't bother to update anything else.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1079
		 */
1080 1081 1082
		index_insert(toastidx, t_values, t_isnull,
					 &(toasttup->t_self),
					 toastrel, toastidx->rd_index->indisunique);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1083

1084
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1085 1086
		 * Free memory
		 */
1087
		heap_freetuple(toasttup);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1088

1089
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1090 1091 1092 1093 1094 1095
		 * Move on to next chunk
		 */
		data_todo -= chunk_size;
		data_p += chunk_size;
	}

1096
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1097 1098
	 * Done - close toast relation and return the reference
	 */
1099
	UnlockRelation(toastidx, RowExclusiveLock);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
	index_close(toastidx);
	heap_close(toastrel, RowExclusiveLock);

	return PointerGetDatum(result);
}


/* ----------
 * toast_delete_datum -
 *
 *	Delete a single external stored value.
 * ----------
 */
static void
toast_delete_datum(Relation rel, Datum value)
{
1116
	varattrib  *attr = (varattrib *) DatumGetPointer(value);
1117 1118 1119 1120
	Relation	toastrel;
	Relation	toastidx;
	ScanKeyData toastkey;
	IndexScanDesc toastscan;
1121
	HeapTuple	toasttup;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1122 1123 1124 1125

	if (!VARATT_IS_EXTERNAL(attr))
		return;

1126
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1127 1128
	 * Open the toast relation and it's index
	 */
1129 1130
	toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
						 RowExclusiveLock);
1131
	toastidx = index_open(toastrel->rd_rel->reltoastidxid);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1132

1133
	/*
1134 1135
	 * Setup a scan key to fetch from the index by va_valueid (we don't
	 * particularly care whether we see them in sequence or not)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1136
	 */
1137 1138 1139
	ScanKeyInit(&toastkey,
				(AttrNumber) 1,
				BTEqualStrategyNumber, F_OIDEQ,
1140
				ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1141

1142
	/*
1143
	 * Find the chunks by index
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1144
	 */
1145 1146
	toastscan = index_beginscan(toastrel, toastidx, true,
								SnapshotToast, 1, &toastkey);
1147
	while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1148
	{
1149
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1150 1151
		 * Have a chunk, delete it
		 */
1152
		simple_heap_delete(toastrel, &toasttup->t_self);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1153 1154
	}

1155
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1156 1157 1158 1159 1160
	 * End scan and close relations
	 */
	index_endscan(toastscan);
	index_close(toastidx);
	heap_close(toastrel, RowExclusiveLock);
1161 1162 1163
}


Jan Wieck's avatar
TOAST  
Jan Wieck committed
1164 1165 1166 1167 1168 1169 1170 1171 1172
/* ----------
 * toast_fetch_datum -
 *
 *	Reconstruct an in memory varattrib from the chunks saved
 *	in the toast relation
 * ----------
 */
static varattrib *
toast_fetch_datum(varattrib *attr)
1173
{
1174 1175 1176 1177 1178 1179 1180 1181
	Relation	toastrel;
	Relation	toastidx;
	ScanKeyData toastkey;
	IndexScanDesc toastscan;
	HeapTuple	ttup;
	TupleDesc	toasttupDesc;
	varattrib  *result;
	int32		ressize;
1182 1183 1184
	int32		residx,
				nextidx;
	int32		numchunks;
1185 1186 1187 1188
	Pointer		chunk;
	bool		isnull;
	int32		chunksize;

Jan Wieck's avatar
TOAST  
Jan Wieck committed
1189
	ressize = attr->va_content.va_external.va_extsize;
1190
	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1191

1192
	result = (varattrib *) palloc(ressize + VARHDRSZ);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1193 1194 1195 1196
	VARATT_SIZEP(result) = ressize + VARHDRSZ;
	if (VARATT_IS_COMPRESSED(attr))
		VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;

1197
	/*
1198
	 * Open the toast relation and its index
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1199
	 */
1200 1201
	toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
						 AccessShareLock);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1202
	toasttupDesc = toastrel->rd_att;
1203
	toastidx = index_open(toastrel->rd_rel->reltoastidxid);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1204

1205
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1206 1207
	 * Setup a scan key to fetch from the index by va_valueid
	 */
1208 1209 1210
	ScanKeyInit(&toastkey,
				(AttrNumber) 1,
				BTEqualStrategyNumber, F_OIDEQ,
1211
				ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1212

1213
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1214
	 * Read the chunks by index
1215
	 *
1216 1217 1218
	 * Note that because the index is actually on (valueid, chunkidx) we will
	 * see the chunks in chunkidx order, even though we didn't explicitly ask
	 * for it.
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1219
	 */
1220 1221
	nextidx = 0;

1222 1223
	toastscan = index_beginscan(toastrel, toastidx, true,
								SnapshotToast, 1, &toastkey);
1224
	while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1225
	{
1226
		/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1227 1228
		 * Have a chunk, extract the sequence number and the data
		 */
1229
		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1230
		Assert(!isnull);
1231
		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1232 1233
		Assert(!isnull);
		chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1234

1235
		/*
1236 1237
		 * Some checks on the data we've found
		 */
1238 1239 1240
		if (residx != nextidx)
			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
				 residx, nextidx,
1241
				 attr->va_content.va_external.va_valueid);
1242
		if (residx < numchunks - 1)
1243 1244
		{
			if (chunksize != TOAST_MAX_CHUNK_SIZE)
1245
				elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1246 1247 1248
					 chunksize, residx,
					 attr->va_content.va_external.va_valueid);
		}
1249
		else if (residx < numchunks)
1250 1251
		{
			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1252
				elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1253 1254 1255
					 chunksize, residx,
					 attr->va_content.va_external.va_valueid);
		}
1256 1257
		else
			elog(ERROR, "unexpected chunk number %d for toast value %u",
1258 1259
				 residx,
				 attr->va_content.va_external.va_valueid);
1260

1261
		/*
1262
		 * Copy the data into proper place in our result
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1263
		 */
1264
		memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1265 1266
			   VARATT_DATA(chunk),
			   chunksize);
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1267

1268
		nextidx++;
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1269 1270
	}

1271
	/*
1272 1273
	 * Final checks that we successfully fetched the datum
	 */
1274 1275 1276
	if (nextidx != numchunks)
		elog(ERROR, "missing chunk number %d for toast value %u",
			 nextidx,
1277
			 attr->va_content.va_external.va_valueid);
1278

1279
	/*
Jan Wieck's avatar
TOAST  
Jan Wieck committed
1280 1281 1282 1283 1284 1285 1286
	 * End scan and close relations
	 */
	index_endscan(toastscan);
	index_close(toastidx);
	heap_close(toastrel, AccessShareLock);

	return result;
1287 1288
}

1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
/* ----------
 * toast_fetch_datum_slice -
 *
 *	Reconstruct a segment of a varattrib from the chunks saved
 *	in the toast relation
 * ----------
 */
static varattrib *
toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
{
	Relation	toastrel;
	Relation	toastidx;
	ScanKeyData toastkey[3];
1302
	int			nscankeys;
1303 1304 1305 1306 1307 1308
	IndexScanDesc toastscan;
	HeapTuple	ttup;
	TupleDesc	toasttupDesc;
	varattrib  *result;
	int32		attrsize;
	int32		residx;
Bruce Momjian's avatar
Bruce Momjian committed
1309 1310 1311 1312
	int32		nextidx;
	int			numchunks;
	int			startchunk;
	int			endchunk;
1313 1314
	int32		startoffset;
	int32		endoffset;
Bruce Momjian's avatar
Bruce Momjian committed
1315
	int			totalchunks;
1316 1317 1318
	Pointer		chunk;
	bool		isnull;
	int32		chunksize;
Bruce Momjian's avatar
Bruce Momjian committed
1319 1320
	int32		chcpystrt;
	int32		chcpyend;
1321 1322 1323 1324

	attrsize = attr->va_content.va_external.va_extsize;
	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;

Bruce Momjian's avatar
Bruce Momjian committed
1325
	if (sliceoffset >= attrsize)
1326
	{
Bruce Momjian's avatar
Bruce Momjian committed
1327 1328
		sliceoffset = 0;
		length = 0;
1329
	}
1330 1331

	if (((sliceoffset + length) > attrsize) || length < 0)
Bruce Momjian's avatar
Bruce Momjian committed
1332
		length = attrsize - sliceoffset;
1333 1334 1335 1336 1337 1338

	result = (varattrib *) palloc(length + VARHDRSZ);
	VARATT_SIZEP(result) = length + VARHDRSZ;

	if (VARATT_IS_COMPRESSED(attr))
		VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
Bruce Momjian's avatar
Bruce Momjian committed
1339 1340

	if (length == 0)
1341
		return result;		/* Can save a lot of work at this point! */
1342 1343 1344

	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
Bruce Momjian's avatar
Bruce Momjian committed
1345 1346
	numchunks = (endchunk - startchunk) + 1;

1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;

	/*
	 * Open the toast relation and it's index
	 */
	toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
						 AccessShareLock);
	toasttupDesc = toastrel->rd_att;
	toastidx = index_open(toastrel->rd_rel->reltoastidxid);

	/*
1359 1360
	 * Setup a scan key to fetch from the index. This is either two keys or
	 * three depending on the number of chunks.
1361
	 */
1362 1363 1364
	ScanKeyInit(&toastkey[0],
				(AttrNumber) 1,
				BTEqualStrategyNumber, F_OIDEQ,
1365
				ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
Bruce Momjian's avatar
Bruce Momjian committed
1366

1367
	/*
1368
	 * Use equality condition for one chunk, a range condition otherwise:
1369
	 */
Bruce Momjian's avatar
Bruce Momjian committed
1370
	if (numchunks == 1)
1371
	{
1372 1373 1374 1375
		ScanKeyInit(&toastkey[1],
					(AttrNumber) 2,
					BTEqualStrategyNumber, F_INT4EQ,
					Int32GetDatum(startchunk));
Bruce Momjian's avatar
Bruce Momjian committed
1376
		nscankeys = 2;
1377 1378 1379
	}
	else
	{
1380 1381 1382 1383 1384 1385 1386 1387
		ScanKeyInit(&toastkey[1],
					(AttrNumber) 2,
					BTGreaterEqualStrategyNumber, F_INT4GE,
					Int32GetDatum(startchunk));
		ScanKeyInit(&toastkey[2],
					(AttrNumber) 2,
					BTLessEqualStrategyNumber, F_INT4LE,
					Int32GetDatum(endchunk));
Bruce Momjian's avatar
Bruce Momjian committed
1388
		nscankeys = 3;
1389 1390 1391 1392 1393 1394 1395 1396
	}

	/*
	 * Read the chunks by index
	 *
	 * The index is on (valueid, chunkidx) so they will come in order
	 */
	nextidx = startchunk;
1397 1398
	toastscan = index_beginscan(toastrel, toastidx, true,
								SnapshotToast, nscankeys, toastkey);
1399
	while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1400 1401 1402 1403
	{
		/*
		 * Have a chunk, extract the sequence number and the data
		 */
1404
		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1405
		Assert(!isnull);
1406
		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
		Assert(!isnull);
		chunksize = VARATT_SIZE(chunk) - VARHDRSZ;

		/*
		 * Some checks on the data we've found
		 */
		if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
				 residx, nextidx,
				 attr->va_content.va_external.va_valueid);
		if (residx < totalchunks - 1)
		{
			if (chunksize != TOAST_MAX_CHUNK_SIZE)
				elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
					 chunksize, residx,
					 attr->va_content.va_external.va_valueid);
		}
		else
		{
			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
				elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
					 chunksize, residx,
					 attr->va_content.va_external.va_valueid);
		}

		/*
		 * Copy the data into proper place in our result
		 */
		chcpystrt = 0;
		chcpyend = chunksize - 1;
Bruce Momjian's avatar
Bruce Momjian committed
1437 1438 1439 1440 1441 1442 1443
		if (residx == startchunk)
			chcpystrt = startoffset;
		if (residx == endchunk)
			chcpyend = endoffset;

		memcpy(((char *) VARATT_DATA(result)) +
			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1444 1445
			   VARATT_DATA(chunk) + chcpystrt,
			   (chcpyend - chcpystrt) + 1);
Bruce Momjian's avatar
Bruce Momjian committed
1446

1447 1448 1449 1450 1451 1452
		nextidx++;
	}

	/*
	 * Final checks that we successfully fetched the datum
	 */
Bruce Momjian's avatar
Bruce Momjian committed
1453
	if (nextidx != (endchunk + 1))
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
		elog(ERROR, "missing chunk number %d for toast value %u",
			 nextidx,
			 attr->va_content.va_external.va_valueid);

	/*
	 * End scan and close relations
	 */
	index_endscan(toastscan);
	index_close(toastidx);
	heap_close(toastrel, AccessShareLock);

	return result;
}