Commit df6bbe73 authored by Michael Paquier's avatar Michael Paquier

pageinspect: Fix handling of all-zero pages

Getting from get_raw_page() an all-zero page is considered as a valid
case by the buffer manager and it can happen for example when finding a
corrupted page with zero_damaged_pages enabled (using zero_damaged_pages
to look at corrupted pages happens), or after a crash when a relation
file is extended before any WAL for its new data is generated (before a
vacuum or autovacuum job comes in to do some cleanup).

However, all the functions of pageinspect, as of the index AMs (except
hash that has its own idea of new pages), heap, the FSM or the page
header have never worked with all-zero pages, causing various crashes
when going through the page internals.

This commit changes all the pageinspect functions to be compliant with
all-zero pages, where the choice is made to return NULL or no rows for
SRFs when finding a new page.  get_raw_page() still works the same way,
returning a batch of zeros in the bytea of the page retrieved.  A hard
error could be used but NULL, while more invasive, is useful when
scanning relation files in full to get a batch of results for a single
relation in one query.  Tests are added for all the code paths
impacted.

Reported-by: Daria Lepikhova
Author: Michael Paquier
Discussion: https://postgr.es/m/561e187b-3549-c8d5-03f5-525c14e65bd0@postgrespro.ru
Backpatch-through: 10
parent c590e514
...@@ -58,6 +58,9 @@ brin_page_type(PG_FUNCTION_ARGS) ...@@ -58,6 +58,9 @@ brin_page_type(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
/* verify the special space has the expected size */ /* verify the special space has the expected size */
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(BrinSpecialSpace))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(BrinSpecialSpace)))
ereport(ERROR, ereport(ERROR,
...@@ -95,6 +98,9 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype) ...@@ -95,6 +98,9 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype)
{ {
Page page = get_page_from_raw(raw_page); Page page = get_page_from_raw(raw_page);
if (PageIsNew(page))
return page;
/* verify the special space has the expected size */ /* verify the special space has the expected size */
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(BrinSpecialSpace))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(BrinSpecialSpace)))
ereport(ERROR, ereport(ERROR,
...@@ -182,6 +188,13 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -182,6 +188,13 @@ brin_page_items(PG_FUNCTION_ARGS)
/* minimally verify the page we got */ /* minimally verify the page we got */
page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular"); page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
if (PageIsNew(page))
{
brin_free_desc(bdesc);
index_close(indexRel, AccessShareLock);
PG_RETURN_NULL();
}
/* /*
* Initialize output functions for all indexed datatypes; simplifies * Initialize output functions for all indexed datatypes; simplifies
* calling them later. * calling them later.
...@@ -359,6 +372,9 @@ brin_metapage_info(PG_FUNCTION_ARGS) ...@@ -359,6 +372,9 @@ brin_metapage_info(PG_FUNCTION_ARGS)
page = verify_brin_page(raw_page, BRIN_PAGETYPE_META, "metapage"); page = verify_brin_page(raw_page, BRIN_PAGETYPE_META, "metapage");
if (PageIsNew(page))
PG_RETURN_NULL();
/* Build a tuple descriptor for our result type */ /* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type"); elog(ERROR, "return type must be a row type");
...@@ -410,6 +426,12 @@ brin_revmap_data(PG_FUNCTION_ARGS) ...@@ -410,6 +426,12 @@ brin_revmap_data(PG_FUNCTION_ARGS)
/* minimally verify the page we got */ /* minimally verify the page we got */
page = verify_brin_page(raw_page, BRIN_PAGETYPE_REVMAP, "revmap"); page = verify_brin_page(raw_page, BRIN_PAGETYPE_REVMAP, "revmap");
if (PageIsNew(page))
{
MemoryContextSwitchTo(mctx);
PG_RETURN_NULL();
}
state = palloc(sizeof(*state)); state = palloc(sizeof(*state));
state->tids = ((RevmapContents *) PageGetContents(page))->rm_tids; state->tids = ((RevmapContents *) PageGetContents(page))->rm_tids;
state->idx = 0; state->idx = 0;
......
...@@ -611,6 +611,12 @@ bt_page_items_bytea(PG_FUNCTION_ARGS) ...@@ -611,6 +611,12 @@ bt_page_items_bytea(PG_FUNCTION_ARGS)
uargs->page = get_page_from_raw(raw_page); uargs->page = get_page_from_raw(raw_page);
if (PageIsNew(uargs->page))
{
MemoryContextSwitchTo(mctx);
PG_RETURN_NULL();
}
uargs->offset = FirstOffsetNumber; uargs->offset = FirstOffsetNumber;
/* verify the special space has the expected size */ /* verify the special space has the expected size */
......
...@@ -62,4 +62,29 @@ ERROR: input page is not a valid BRIN page ...@@ -62,4 +62,29 @@ ERROR: input page is not a valid BRIN page
SELECT * FROM brin_revmap_data(get_raw_page('test1', 0)); SELECT * FROM brin_revmap_data(get_raw_page('test1', 0));
ERROR: input page is not a valid BRIN page ERROR: input page is not a valid BRIN page
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT brin_page_type(decode(repeat('00', :block_size), 'hex'));
brin_page_type
----------------
(1 row)
SELECT brin_page_items(decode(repeat('00', :block_size), 'hex'), 'test1_a_idx');
brin_page_items
-----------------
(0 rows)
SELECT brin_metapage_info(decode(repeat('00', :block_size), 'hex'));
brin_metapage_info
--------------------
(1 row)
SELECT brin_revmap_data(decode(repeat('00', :block_size), 'hex'));
brin_revmap_data
------------------
(1 row)
DROP TABLE test1; DROP TABLE test1;
...@@ -99,4 +99,10 @@ ERROR: input page is not a valid btree page ...@@ -99,4 +99,10 @@ ERROR: input page is not a valid btree page
SELECT bt_page_items(get_raw_page('test1_a_brin', 0)); SELECT bt_page_items(get_raw_page('test1_a_brin', 0));
ERROR: input page is not a valid btree page ERROR: input page is not a valid btree page
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT bt_page_items(decode(repeat('00', :block_size), 'hex'));
-[ RECORD 1 ]-+-
bt_page_items |
DROP TABLE test1; DROP TABLE test1;
...@@ -54,4 +54,18 @@ ERROR: input page is not a valid GIN data leaf page ...@@ -54,4 +54,18 @@ ERROR: input page is not a valid GIN data leaf page
SELECT * FROM gin_leafpage_items(get_raw_page('test1', 0)); SELECT * FROM gin_leafpage_items(get_raw_page('test1', 0));
ERROR: input page is not a valid GIN data leaf page ERROR: input page is not a valid GIN data leaf page
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT gin_leafpage_items(decode(repeat('00', :block_size), 'hex'));
-[ RECORD 1 ]------+-
gin_leafpage_items |
SELECT gin_metapage_info(decode(repeat('00', :block_size), 'hex'));
-[ RECORD 1 ]-----+-
gin_metapage_info |
SELECT gin_page_opaque_info(decode(repeat('00', :block_size), 'hex'));
-[ RECORD 1 ]--------+-
gin_page_opaque_info |
DROP TABLE test1; DROP TABLE test1;
...@@ -89,4 +89,22 @@ ERROR: input page is not a valid GiST page ...@@ -89,4 +89,22 @@ ERROR: input page is not a valid GiST page
SELECT gist_page_items_bytea(get_raw_page('test_gist_btree', 0)); SELECT gist_page_items_bytea(get_raw_page('test_gist_btree', 0));
ERROR: input page is not a valid GiST page ERROR: input page is not a valid GiST page
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT gist_page_items_bytea(decode(repeat('00', :block_size), 'hex'));
gist_page_items_bytea
-----------------------
(0 rows)
SELECT gist_page_items(decode(repeat('00', :block_size), 'hex'), 'test_gist_idx'::regclass);
gist_page_items
-----------------
(0 rows)
SELECT gist_page_opaque_info(decode(repeat('00', :block_size), 'hex'));
gist_page_opaque_info
-----------------------
(1 row)
DROP TABLE test_gist; DROP TABLE test_gist;
...@@ -190,4 +190,16 @@ ERROR: input page is not a valid hash page ...@@ -190,4 +190,16 @@ ERROR: input page is not a valid hash page
SELECT hash_page_type(get_raw_page('test_hash', 0)); SELECT hash_page_type(get_raw_page('test_hash', 0));
ERROR: input page is not a valid hash page ERROR: input page is not a valid hash page
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT hash_metapage_info(decode(repeat('00', :block_size), 'hex'));
ERROR: page is not a hash meta page
SELECT hash_page_items(decode(repeat('00', :block_size), 'hex'));
ERROR: page is not a hash bucket or overflow page
SELECT hash_page_stats(decode(repeat('00', :block_size), 'hex'));
ERROR: page is not a hash bucket or overflow page
SELECT hash_page_type(decode(repeat('00', :block_size), 'hex'));
-[ RECORD 1 ]--+-------
hash_page_type | unused
DROP TABLE test_hash; DROP TABLE test_hash;
...@@ -216,3 +216,23 @@ ERROR: invalid page size ...@@ -216,3 +216,23 @@ ERROR: invalid page size
SELECT page_header('ccc'::bytea); SELECT page_header('ccc'::bytea);
ERROR: invalid page size ERROR: invalid page size
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT fsm_page_contents(decode(repeat('00', :block_size), 'hex'));
fsm_page_contents
-------------------
(1 row)
SELECT page_header(decode(repeat('00', :block_size), 'hex'));
page_header
-----------------------
(0/0,0,0,0,0,0,0,0,0)
(1 row)
SELECT page_checksum(decode(repeat('00', :block_size), 'hex'), 1);
page_checksum
---------------
(1 row)
...@@ -46,6 +46,10 @@ fsm_page_contents(PG_FUNCTION_ARGS) ...@@ -46,6 +46,10 @@ fsm_page_contents(PG_FUNCTION_ARGS)
errmsg("must be superuser to use raw page functions"))); errmsg("must be superuser to use raw page functions")));
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
fsmpage = (FSMPage) PageGetContents(page); fsmpage = (FSMPage) PageGetContents(page);
initStringInfo(&sinfo); initStringInfo(&sinfo);
......
...@@ -49,6 +49,9 @@ gin_metapage_info(PG_FUNCTION_ARGS) ...@@ -49,6 +49,9 @@ gin_metapage_info(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData)))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
...@@ -115,6 +118,9 @@ gin_page_opaque_info(PG_FUNCTION_ARGS) ...@@ -115,6 +118,9 @@ gin_page_opaque_info(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData)))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
...@@ -200,6 +206,12 @@ gin_leafpage_items(PG_FUNCTION_ARGS) ...@@ -200,6 +206,12 @@ gin_leafpage_items(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
{
MemoryContextSwitchTo(mctx);
PG_RETURN_NULL();
}
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GinPageOpaqueData)))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
......
...@@ -55,6 +55,9 @@ gist_page_opaque_info(PG_FUNCTION_ARGS) ...@@ -55,6 +55,9 @@ gist_page_opaque_info(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
/* verify the special space has the expected size */ /* verify the special space has the expected size */
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
ereport(ERROR, ereport(ERROR,
...@@ -156,6 +159,9 @@ gist_page_items_bytea(PG_FUNCTION_ARGS) ...@@ -156,6 +159,9 @@ gist_page_items_bytea(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
/* verify the special space has the expected size */ /* verify the special space has the expected size */
if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData))) if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
ereport(ERROR, ereport(ERROR,
...@@ -272,6 +278,12 @@ gist_page_items(PG_FUNCTION_ARGS) ...@@ -272,6 +278,12 @@ gist_page_items(PG_FUNCTION_ARGS)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
{
index_close(indexRel, AccessShareLock);
PG_RETURN_NULL();
}
/* Avoid bogus PageGetMaxOffsetNumber() call with deleted pages */ /* Avoid bogus PageGetMaxOffsetNumber() call with deleted pages */
if (GistPageIsDeleted(page)) if (GistPageIsDeleted(page))
elog(NOTICE, "page is deleted"); elog(NOTICE, "page is deleted");
......
...@@ -349,6 +349,9 @@ page_checksum_internal(PG_FUNCTION_ARGS, enum pageinspect_version ext_version) ...@@ -349,6 +349,9 @@ page_checksum_internal(PG_FUNCTION_ARGS, enum pageinspect_version ext_version)
page = get_page_from_raw(raw_page); page = get_page_from_raw(raw_page);
if (PageIsNew(page))
PG_RETURN_NULL();
PG_RETURN_INT16(pg_checksum_page((char *) page, blkno)); PG_RETURN_INT16(pg_checksum_page((char *) page, blkno));
} }
......
...@@ -27,4 +27,11 @@ SELECT * FROM brin_metapage_info(get_raw_page('test1', 0)); ...@@ -27,4 +27,11 @@ SELECT * FROM brin_metapage_info(get_raw_page('test1', 0));
SELECT * FROM brin_revmap_data(get_raw_page('test1', 0)); SELECT * FROM brin_revmap_data(get_raw_page('test1', 0));
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT brin_page_type(decode(repeat('00', :block_size), 'hex'));
SELECT brin_page_items(decode(repeat('00', :block_size), 'hex'), 'test1_a_idx');
SELECT brin_metapage_info(decode(repeat('00', :block_size), 'hex'));
SELECT brin_revmap_data(decode(repeat('00', :block_size), 'hex'));
DROP TABLE test1; DROP TABLE test1;
...@@ -44,4 +44,8 @@ SELECT bt_page_items(get_raw_page('test1', 0)); ...@@ -44,4 +44,8 @@ SELECT bt_page_items(get_raw_page('test1', 0));
SELECT bt_page_items(get_raw_page('test1_a_brin', 0)); SELECT bt_page_items(get_raw_page('test1_a_brin', 0));
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT bt_page_items(decode(repeat('00', :block_size), 'hex'));
DROP TABLE test1; DROP TABLE test1;
...@@ -32,4 +32,10 @@ SELECT * FROM gin_page_opaque_info(get_raw_page('test1', 0)); ...@@ -32,4 +32,10 @@ SELECT * FROM gin_page_opaque_info(get_raw_page('test1', 0));
SELECT * FROM gin_leafpage_items(get_raw_page('test1', 0)); SELECT * FROM gin_leafpage_items(get_raw_page('test1', 0));
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT gin_leafpage_items(decode(repeat('00', :block_size), 'hex'));
SELECT gin_metapage_info(decode(repeat('00', :block_size), 'hex'));
SELECT gin_page_opaque_info(decode(repeat('00', :block_size), 'hex'));
DROP TABLE test1; DROP TABLE test1;
...@@ -44,4 +44,10 @@ SELECT gist_page_items_bytea(get_raw_page('test_gist', 0)); ...@@ -44,4 +44,10 @@ SELECT gist_page_items_bytea(get_raw_page('test_gist', 0));
SELECT gist_page_items_bytea(get_raw_page('test_gist_btree', 0)); SELECT gist_page_items_bytea(get_raw_page('test_gist_btree', 0));
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT gist_page_items_bytea(decode(repeat('00', :block_size), 'hex'));
SELECT gist_page_items(decode(repeat('00', :block_size), 'hex'), 'test_gist_idx'::regclass);
SELECT gist_page_opaque_info(decode(repeat('00', :block_size), 'hex'));
DROP TABLE test_gist; DROP TABLE test_gist;
...@@ -98,4 +98,11 @@ SELECT hash_page_stats(get_raw_page('test_hash', 0)); ...@@ -98,4 +98,11 @@ SELECT hash_page_stats(get_raw_page('test_hash', 0));
SELECT hash_page_type(get_raw_page('test_hash', 0)); SELECT hash_page_type(get_raw_page('test_hash', 0));
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT hash_metapage_info(decode(repeat('00', :block_size), 'hex'));
SELECT hash_page_items(decode(repeat('00', :block_size), 'hex'));
SELECT hash_page_stats(decode(repeat('00', :block_size), 'hex'));
SELECT hash_page_type(decode(repeat('00', :block_size), 'hex'));
DROP TABLE test_hash; DROP TABLE test_hash;
...@@ -91,3 +91,9 @@ SELECT fsm_page_contents('aaa'::bytea); ...@@ -91,3 +91,9 @@ SELECT fsm_page_contents('aaa'::bytea);
SELECT page_checksum('bbb'::bytea, 0); SELECT page_checksum('bbb'::bytea, 0);
SELECT page_header('ccc'::bytea); SELECT page_header('ccc'::bytea);
\set VERBOSITY default \set VERBOSITY default
-- Tests with all-zero pages.
SHOW block_size \gset
SELECT fsm_page_contents(decode(repeat('00', :block_size), 'hex'));
SELECT page_header(decode(repeat('00', :block_size), 'hex'));
SELECT page_checksum(decode(repeat('00', :block_size), 'hex'), 1);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment