Commit 97060928 authored by Robert Haas's avatar Robert Haas

Add pg_amcheck, a CLI for contrib/amcheck.

This makes it a lot easier to run the corruption checks that are
implemented by contrib/amcheck against lots of relations and get
the result in an easily understandable format. It has a wide variety
of options for choosing which relations to check and which checks
to perform, and it can run checks in parallel if you want.

Mark Dilger, reviewed by Peter Geoghegan and by me.

Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com
Discussion: http://postgr.es/m/BA592F2D-F928-46FF-9516-2B827F067F57@enterprisedb.com
parent 48d67fd8
......@@ -196,6 +196,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY dropuser SYSTEM "dropuser.sgml">
<!ENTITY ecpgRef SYSTEM "ecpg-ref.sgml">
<!ENTITY initdb SYSTEM "initdb.sgml">
<!ENTITY pgamcheck SYSTEM "pg_amcheck.sgml">
<!ENTITY pgarchivecleanup SYSTEM "pgarchivecleanup.sgml">
<!ENTITY pgBasebackup SYSTEM "pg_basebackup.sgml">
<!ENTITY pgbench SYSTEM "pgbench.sgml">
......
<!--
doc/src/sgml/ref/pg_amcheck.sgml
PostgreSQL documentation
-->
<refentry id="app-pgamcheck">
<indexterm zone="app-pgamcheck">
<primary>pg_amcheck</primary>
</indexterm>
<refmeta>
<refentrytitle><application>pg_amcheck</application></refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo>Application</refmiscinfo>
</refmeta>
<refnamediv>
<refname>pg_amcheck</refname>
<refpurpose>checks for corruption in one or more
<productname>PostgreSQL</productname> databases</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>pg_amcheck</command>
<arg rep="repeat"><replaceable>option</replaceable></arg>
<arg><replaceable>dbname</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<application>pg_amcheck</application> supports running
<xref linkend="amcheck"/>'s corruption checking functions against one or
more databases, with options to select which schemas, tables and indexes to
check, which kinds of checking to perform, and whether to perform the checks
in parallel, and if so, the number of parallel connections to establish and
use.
</para>
<para>
Only table relations and btree indexes are currently supported. Other
relation types are silently skipped.
</para>
<para>
If <literal>dbname</literal> is specified, it should be the name of a
single database to check, and no other database selection options should
be present. Otherwise, if any database selection options are present,
all matching databases will be checked. If no such options are present,
the default database will be checked. Database selection options include
<option>--all</option>, <option>--database</option> and
<option>--exclude-database</option>. They also include
<option>--relation</option>, <option>--exclude-relation</option>,
<option>--table</option>, <option>--exclude-table</option>,
<option>--index</option>, and <option>--exclude-index</option>,
but only when such options are used with a three-part pattern
(e.g. <option>mydb*.myschema*.myrel*</option>). Finally, they include
<option>--schema</option> and <option>--exclude-schema</option>
when such options are used with a two-part pattern
(e.g. <option>mydb*.myschema*</option>).
</para>
<para>
<replaceable>dbname</replaceable> can also be a
<link linkend="libpq-connstring">connection string</link>.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<para>
pg_amcheck accepts the following command-line arguments:
<variablelist>
<varlistentry>
<term><option>-a</option></term>
<term><option>--all</option></term>
<listitem>
<para>
Check all databases, except for any excluded via
<option>--exclude-database</option>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-d <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--database=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Check databases matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>,
except for any excluded by <option>--exclude-database</option>.
This option can be specified more than once.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-D <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--exclude-database=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Exclude databases matching the given
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>.
This option can be specified more than once.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-e</option></term>
<term><option>--echo</option></term>
<listitem>
<para>
Echo to stdout all SQL sent to the server.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--endblock=<replaceable class="parameter">block</replaceable></option></term>
<listitem>
<para>
End checking at the specified block number. An error will occur if the
table relation being checked has fewer than this number of blocks.
This option does not apply to indexes, and is probably only useful when
checking a single table relation. If both a regular table and a toast
table are checked, this option will apply to both, but higher-numbered
toast blocks may still be accessed while validating toast pointers,
unless that is suppressed using
<option>--exclude-toast-pointers</option>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--exclude-toast-pointers</option></term>
<listitem>
<para>
By default, whenever a toast pointer is encountered in a table,
a lookup is performed to ensure that it references apparently-valid
entries in the toast table. These checks can be quite slow, and this
option can be used to skip them.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--heapallindexed</option></term>
<listitem>
<para>
For each index checked, verify the presence of all heap tuples as index
tuples in the index using <xref linkend="amcheck"/>'s
<option>heapallindexed</option> option.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-?</option></term>
<term><option>--help</option></term>
<listitem>
<para>
Show help about <application>pg_amcheck</application> command line
arguments, and exit.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-h <replaceable class="parameter">hostname</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">hostname</replaceable></option></term>
<listitem>
<para>
Specifies the host name of the machine on which the server is running.
If the value begins with a slash, it is used as the directory for the
Unix domain socket.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-i <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--index=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Check indexes matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>,
unless they are otherwise excluded.
This option can be specified more than once.
</para>
<para>
This is similar to the <option>--relation</option> option, except that
it applies only to indexes, not tables.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-I <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--exclude-index=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Exclude indexes matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>.
This option can be specified more than once.
</para>
<para>
This is similar to the <option>--exclude-relation</option> option,
except that it applies only to indexes, not tables.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-j <replaceable class="parameter">num</replaceable></option></term>
<term><option>--jobs=<replaceable class="parameter">num</replaceable></option></term>
<listitem>
<para>
Use <replaceable>num</replaceable> concurrent connections to the server,
or one per object to be checked, whichever is less.
</para>
<para>
The default is to use a single connection.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--maintenance-db=<replaceable class="parameter">dbname</replaceable></option></term>
<listitem>
<para>
Specifies a database or
<link linkend="libpq-connstring">connection string</link> to be
used to discover the list of databases to be checked. If neither
<option>--all</option> nor any option including a database pattern is
used, no such connection is required and this option does nothing.
Otherwise, any connection string parameters other than
the database name which are included in the value for this option
will also be used when connecting to the databases
being checked. If this option is omitted, the default is
<literal>postgres</literal> or, if that fails,
<literal>template1</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--no-dependent-indexes</option></term>
<listitem>
<para>
By default, if a table is checked, any btree indexes of that table
will also be checked, even if they are not explicitly selected by
an option such as <literal>--index</literal> or
<literal>--relation</literal>. This option suppresses that behavior.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--no-strict-names</option></term>
<listitem>
<para>
By default, if an argument to <literal>--database</literal>,
<literal>--table</literal>, <literal>--index</literal>,
or <literal>--relation</literal> matches no objects, it is a fatal
error. This option downgrades that error to a warning.
If this option is used with <literal>--quiet</literal>, the warning
will be supressed as well.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--no-dependent-toast</option></term>
<listitem>
<para>
By default, if a table is checked, its toast table, if any, will also
be checked, even if it is not explicitly selected by an option
such as <literal>--table</literal> or <literal>--relation</literal>.
This option suppresses that behavior.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--on-error-stop</option></term>
<listitem>
<para>
After reporting all corruptions on the first page of a table where
corruption is found, stop processing that table relation and move on
to the next table or index.
</para>
<para>
Note that index checking always stops after the first corrupt page.
This option only has meaning relative to table relations.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--parent-check</option></term>
<listitem>
<para>
For each btree index checked, use <xref linkend="amcheck"/>'s
<function>bt_index_parent_check</function> function, which performs
additional checks of parent/child relationships during index checking.
</para>
<para>
The default is to use <application>amcheck</application>'s
<function>bt_index_check</function> function, but note that use of the
<option>--rootdescend</option> option implicitly selects
<function>bt_index_parent_check</function>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-p <replaceable class="parameter">port</replaceable></option></term>
<term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
<listitem>
<para>
Specifies the TCP port or local Unix domain socket file extension on
which the server is listening for connections.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-P</option></term>
<term><option>--progress</option></term>
<listitem>
<para>
Show progress information. Progress information includes the number
of relations for which checking has been completed, and the total
size of those relations. It also includes the total number of relations
that will eventually be checked, and the estimated size of those
relations.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-q</option></term>
<term><option>--quiet</option></term>
<listitem>
<para>
Print fewer messages, and less detail regarding any server errors.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-r <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--relation=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Check relations matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>,
unless they are otherwise excluded.
This option can be specified more than once.
</para>
<para>
Patterns may be unqualified, e.g. <literal>myrel*</literal>, or they
may be schema-qualified, e.g. <literal>myschema*.myrel*</literal> or
database-qualified and schema-qualified, e.g.
<literal>mydb*.myscheam*.myrel*</literal>. A database-qualified
pattern will add matching databases to the list of databases to be
checked.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-R <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--exclude-relation=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Exclude relations matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>.
This option can be specified more than once.
</para>
<para>
As with <option>--relation</option>, the
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link> may be unqualified, schema-qualified,
or database- and schema-qualified.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--rootdescend</option></term>
<listitem>
<para>
For each index checked, re-find tuples on the leaf level by performing a
new search from the root page for each tuple using
<xref linkend="amcheck"/>'s <option>rootdescend</option> option.
</para>
<para>
Use of this option implicitly also selects the
<option>--parent-check</option> option.
</para>
<para>
This form of verification was originally written to help in the
development of btree index features. It may be of limited use or even
of no use in helping detect the kinds of corruption that occur in
practice. It may also cause corruption checking to take considerably
longer and consume considerably more resources on the server.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-s <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--schema=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Check tables and indexes in schemas matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>, unless they are otherwise excluded.
This option can be specified more than once.
</para>
<para>
To select only tables in schemas matching a particular pattern,
consider using something like
<literal>--table=SCHEMAPAT.* --no-dependent-indexes</literal>.
To select only indexes, consider using something like
<literal>--index=SCHEMAPAT.*</literal>.
</para>
<para>
A schema pattern may be database-qualified. For example, you may
write <literal>--schema=mydb*.myschema*</literal> to select
schemas matching <literal>myschema*</literal> in databases matching
<literal>mydb*</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-S <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--exclude-schema=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Exclude tables and indexes in schemas matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>.
This option can be specified more than once.
</para>
<para>
As with <option>--schema</option>, the pattern may be
database-qualified.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--skip=<replaceable class="parameter">option</replaceable></option></term>
<listitem>
<para>
If <literal>"all-frozen"</literal> is given, table corruption checks
will skip over pages in all tables that are marked as all frozen.
</para>
<para>
If <literal>all-visible</literal> is given, table corruption checks
will skip over pages in all tables that are marked as all visible.
</para>
<para>
By default, no pages are skipped. This can be specified as
<literal>none</literal>, but since this is the default, it need not be
mentioned.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--startblock=<replaceable class="parameter">block</replaceable></option></term>
<listitem>
<para>
Start checking at the specified block number. An error will occur if
the table relation being checked has fewer than this number of blocks.
This option does not apply to indexes, and is probably only useful
when checking a single table relation. See <literal>--endblock</literal>
for further caveats.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-t <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--table=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Check tables matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>,
unless they are otherwise excluded.
This option can be specified more than once.
</para>
<para>
This is similar to the <option>--relation</option> option, except that
it applies only to tables, not indexes.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-T <replaceable class="parameter">pattern</replaceable></option></term>
<term><option>--exclude-table=<replaceable class="parameter">pattern</replaceable></option></term>
<listitem>
<para>
Exclude tables matching the specified
<link linkend="app-psql-patterns"><replaceable class="parameter">pattern</replaceable></link>.
This option can be specified more than once.
</para>
<para>
This is similar to the <option>--exclude-relation</option> option,
except that it applies only to tables, not indexes.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-U</option></term>
<term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
<listitem>
<para>
User name to connect as.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-v</option></term>
<term><option>--verbose</option></term>
<listitem>
<para>
Print more messages. In particular, this will print a message for
each relation being checked, and will increase the level of detail
shown for server errors.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-V</option></term>
<term><option>--version</option></term>
<listitem>
<para>
Print the <application>pg_amcheck</application> version and exit.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-w</option></term>
<term><option>--no-password</option></term>
<listitem>
<para>
Never issue a password prompt. If the server requires password
authentication and a password is not available by other means such as
a <filename>.pgpass</filename> file, the connection attempt will fail.
This option can be useful in batch jobs and scripts where no user is
present to enter a password.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-W</option></term>
<term><option>--password</option></term>
<listitem>
<para>
Force <application>pg_amcheck</application> to prompt for a password
before connecting to a database.
</para>
<para>
This option is never essential, since
<application>pg_amcheck</application> will automatically prompt for a
password if the server demands password authentication. However,
<application>pg_amcheck</application> will waste a connection attempt
finding out that the server wants a password. In some cases it is
worth typing <option>-W</option> to avoid the extra connection attempt.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</refsect1>
<refsect1>
<title>Notes</title>
<para>
<application>pg_amcheck</application> is designed to work with
<productname>PostgreSQL</productname> 14.0 and later.
</para>
</refsect1>
<refsect1>
<title>See Also</title>
<simplelist type="inline">
<member><xref linkend="amcheck"/></member>
</simplelist>
</refsect1>
</refentry>
......@@ -246,6 +246,7 @@
&dropdb;
&dropuser;
&ecpgRef;
&pgamcheck;
&pgBasebackup;
&pgbench;
&pgConfig;
......
......@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
SUBDIRS = \
initdb \
pg_amcheck \
pg_archivecleanup \
pg_basebackup \
pg_checksums \
......
pg_amcheck
/tmp_check/
#-------------------------------------------------------------------------
#
# Makefile for src/bin/pg_amcheck
#
# Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
# Portions Copyright (c) 1994, Regents of the University of California
#
# src/bin/pg_amcheck/Makefile
#
#-------------------------------------------------------------------------
PGFILEDESC = "pg_amcheck - detect corruption within database relations"
PGAPPICON=win32
EXTRA_INSTALL=contrib/amcheck contrib/pageinspect
subdir = src/bin/pg_amcheck
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \
$(WIN32RES) \
pg_amcheck.o
all: pg_amcheck
pg_amcheck: $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_amcheck$(X) '$(DESTDIR)$(bindir)/pg_amcheck$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_amcheck$(X)'
clean distclean maintainer-clean:
rm -f pg_amcheck$(X) $(OBJS)
rm -rf tmp_check
check:
$(prove_check)
installcheck:
$(prove_installcheck)
/*-------------------------------------------------------------------------
*
* pg_amcheck.c
* Detects corruption within database relations.
*
* Copyright (c) 2017-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_amcheck/pg_amcheck.c
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <time.h>
#include "catalog/pg_am_d.h"
#include "catalog/pg_namespace_d.h"
#include "common/logging.h"
#include "common/username.h"
#include "fe_utils/cancel.h"
#include "fe_utils/option_utils.h"
#include "fe_utils/parallel_slot.h"
#include "fe_utils/query_utils.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "getopt_long.h" /* pgrminclude ignore */
#include "pgtime.h"
#include "storage/block.h"
typedef struct PatternInfo
{
const char *pattern; /* Unaltered pattern from the command line */
char *db_regex; /* Database regexp parsed from pattern, or
* NULL */
char *nsp_regex; /* Schema regexp parsed from pattern, or NULL */
char *rel_regex; /* Relation regexp parsed from pattern, or
* NULL */
bool heap_only; /* true if rel_regex should only match heap
* tables */
bool btree_only; /* true if rel_regex should only match btree
* indexes */
bool matched; /* true if the pattern matched in any database */
} PatternInfo;
typedef struct PatternInfoArray
{
PatternInfo *data;
size_t len;
} PatternInfoArray;
/* pg_amcheck command line options controlled by user flags */
typedef struct AmcheckOptions
{
bool dbpattern;
bool alldb;
bool echo;
bool quiet;
bool verbose;
bool strict_names;
bool show_progress;
int jobs;
/* Objects to check or not to check, as lists of PatternInfo structs. */
PatternInfoArray include;
PatternInfoArray exclude;
/*
* As an optimization, if any pattern in the exclude list applies to heap
* tables, or similarly if any such pattern applies to btree indexes, or
* to schemas, then these will be true, otherwise false. These should
* always agree with what you'd conclude by grep'ing through the exclude
* list.
*/
bool excludetbl;
bool excludeidx;
bool excludensp;
/*
* If any inclusion pattern exists, then we should only be checking
* matching relations rather than all relations, so this is true iff
* include is empty.
*/
bool allrel;
/* heap table checking options */
bool no_toast_expansion;
bool reconcile_toast;
bool on_error_stop;
int64 startblock;
int64 endblock;
const char *skip;
/* btree index checking options */
bool parent_check;
bool rootdescend;
bool heapallindexed;
/* heap and btree hybrid option */
bool no_btree_expansion;
} AmcheckOptions;
static AmcheckOptions opts = {
.dbpattern = false,
.alldb = false,
.echo = false,
.quiet = false,
.verbose = false,
.strict_names = true,
.show_progress = false,
.jobs = 1,
.include = {NULL, 0},
.exclude = {NULL, 0},
.excludetbl = false,
.excludeidx = false,
.excludensp = false,
.allrel = true,
.no_toast_expansion = false,
.reconcile_toast = true,
.on_error_stop = false,
.startblock = -1,
.endblock = -1,
.skip = "none",
.parent_check = false,
.rootdescend = false,
.heapallindexed = false,
.no_btree_expansion = false
};
static const char *progname = NULL;
/* Whether all relations have so far passed their corruption checks */
static bool all_checks_pass = true;
/* Time last progress report was displayed */
static pg_time_t last_progress_report = 0;
static bool progress_since_last_stderr = false;
typedef struct DatabaseInfo
{
char *datname;
char *amcheck_schema; /* escaped, quoted literal */
} DatabaseInfo;
typedef struct RelationInfo
{
const DatabaseInfo *datinfo; /* shared by other relinfos */
Oid reloid;
bool is_heap; /* true if heap, false if btree */
char *nspname;
char *relname;
int relpages;
int blocks_to_check;
char *sql; /* set during query run, pg_free'd after */
} RelationInfo;
/*
* Query for determining if contrib's amcheck is installed. If so, selects the
* namespace name where amcheck's functions can be found.
*/
static const char *amcheck_sql =
"SELECT n.nspname, x.extversion FROM pg_catalog.pg_extension x"
"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid"
"\nWHERE x.extname = 'amcheck'";
static void prepare_heap_command(PQExpBuffer sql, RelationInfo *rel,
PGconn *conn);
static void prepare_btree_command(PQExpBuffer sql, RelationInfo *rel,
PGconn *conn);
static void run_command(ParallelSlot *slot, const char *sql);
static bool verify_heap_slot_handler(PGresult *res, PGconn *conn,
void *context);
static bool verify_btree_slot_handler(PGresult *res, PGconn *conn, void *context);
static void help(const char *progname);
static void progress_report(uint64 relations_total, uint64 relations_checked,
uint64 relpages_total, uint64 relpages_checked,
const char *datname, bool force, bool finished);
static void append_database_pattern(PatternInfoArray *pia, const char *pattern,
int encoding);
static void append_schema_pattern(PatternInfoArray *pia, const char *pattern,
int encoding);
static void append_relation_pattern(PatternInfoArray *pia, const char *pattern,
int encoding);
static void append_heap_pattern(PatternInfoArray *pia, const char *pattern,
int encoding);
static void append_btree_pattern(PatternInfoArray *pia, const char *pattern,
int encoding);
static void compile_database_list(PGconn *conn, SimplePtrList *databases,
const char *initial_dbname);
static void compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
const DatabaseInfo *datinfo,
uint64 *pagecount);
#define log_no_match(...) do { \
if (opts.strict_names) \
pg_log_generic(PG_LOG_ERROR, __VA_ARGS__); \
else \
pg_log_generic(PG_LOG_WARNING, __VA_ARGS__); \
} while(0)
#define FREE_AND_SET_NULL(x) do { \
pg_free(x); \
(x) = NULL; \
} while (0)
int
main(int argc, char *argv[])
{
PGconn *conn = NULL;
SimplePtrListCell *cell;
SimplePtrList databases = {NULL, NULL};
SimplePtrList relations = {NULL, NULL};
bool failed = false;
const char *latest_datname;
int parallel_workers;
ParallelSlotArray *sa;
PQExpBufferData sql;
uint64 reltotal = 0;
uint64 pageschecked = 0;
uint64 pagestotal = 0;
uint64 relprogress = 0;
int pattern_id;
static struct option long_options[] = {
/* Connection options */
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"maintenance-db", required_argument, NULL, 1},
/* check options */
{"all", no_argument, NULL, 'a'},
{"database", required_argument, NULL, 'd'},
{"exclude-database", required_argument, NULL, 'D'},
{"echo", no_argument, NULL, 'e'},
{"index", required_argument, NULL, 'i'},
{"exclude-index", required_argument, NULL, 'I'},
{"jobs", required_argument, NULL, 'j'},
{"progress", no_argument, NULL, 'P'},
{"quiet", no_argument, NULL, 'q'},
{"relation", required_argument, NULL, 'r'},
{"exclude-relation", required_argument, NULL, 'R'},
{"schema", required_argument, NULL, 's'},
{"exclude-schema", required_argument, NULL, 'S'},
{"table", required_argument, NULL, 't'},
{"exclude-table", required_argument, NULL, 'T'},
{"verbose", no_argument, NULL, 'v'},
{"no-dependent-indexes", no_argument, NULL, 2},
{"no-dependent-toast", no_argument, NULL, 3},
{"exclude-toast-pointers", no_argument, NULL, 4},
{"on-error-stop", no_argument, NULL, 5},
{"skip", required_argument, NULL, 6},
{"startblock", required_argument, NULL, 7},
{"endblock", required_argument, NULL, 8},
{"rootdescend", no_argument, NULL, 9},
{"no-strict-names", no_argument, NULL, 10},
{"heapallindexed", no_argument, NULL, 11},
{"parent-check", no_argument, NULL, 12},
{NULL, 0, NULL, 0}
};
int optindex;
int c;
const char *db = NULL;
const char *maintenance_db = NULL;
const char *host = NULL;
const char *port = NULL;
const char *username = NULL;
enum trivalue prompt_password = TRI_DEFAULT;
int encoding = pg_get_encoding_from_locale(NULL, false);
ConnParams cparams;
pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_amcheck"));
handle_help_version_opts(argc, argv, progname, help);
/* process command-line options */
while ((c = getopt_long(argc, argv, "ad:D:eh:Hi:I:j:p:Pqr:R:s:S:t:T:U:wWv",
long_options, &optindex)) != -1)
{
char *endptr;
switch (c)
{
case 'a':
opts.alldb = true;
break;
case 'd':
opts.dbpattern = true;
append_database_pattern(&opts.include, optarg, encoding);
break;
case 'D':
opts.dbpattern = true;
append_database_pattern(&opts.exclude, optarg, encoding);
break;
case 'e':
opts.echo = true;
break;
case 'h':
host = pg_strdup(optarg);
break;
case 'i':
opts.allrel = false;
append_btree_pattern(&opts.include, optarg, encoding);
break;
case 'I':
opts.excludeidx = true;
append_btree_pattern(&opts.exclude, optarg, encoding);
break;
case 'j':
opts.jobs = atoi(optarg);
if (opts.jobs < 1)
{
fprintf(stderr,
"number of parallel jobs must be at least 1\n");
exit(1);
}
break;
case 'p':
port = pg_strdup(optarg);
break;
case 'P':
opts.show_progress = true;
break;
case 'q':
opts.quiet = true;
break;
case 'r':
opts.allrel = false;
append_relation_pattern(&opts.include, optarg, encoding);
break;
case 'R':
opts.excludeidx = true;
opts.excludetbl = true;
append_relation_pattern(&opts.exclude, optarg, encoding);
break;
case 's':
opts.allrel = false;
append_schema_pattern(&opts.include, optarg, encoding);
break;
case 'S':
opts.excludensp = true;
append_schema_pattern(&opts.exclude, optarg, encoding);
break;
case 't':
opts.allrel = false;
append_heap_pattern(&opts.include, optarg, encoding);
break;
case 'T':
opts.excludetbl = true;
append_heap_pattern(&opts.exclude, optarg, encoding);
break;
case 'U':
username = pg_strdup(optarg);
break;
case 'w':
prompt_password = TRI_NO;
break;
case 'W':
prompt_password = TRI_YES;
break;
case 'v':
opts.verbose = true;
pg_logging_increase_verbosity();
break;
case 1:
maintenance_db = pg_strdup(optarg);
break;
case 2:
opts.no_btree_expansion = true;
break;
case 3:
opts.no_toast_expansion = true;
break;
case 4:
opts.reconcile_toast = false;
break;
case 5:
opts.on_error_stop = true;
break;
case 6:
if (pg_strcasecmp(optarg, "all-visible") == 0)
opts.skip = "all visible";
else if (pg_strcasecmp(optarg, "all-frozen") == 0)
opts.skip = "all frozen";
else
{
fprintf(stderr, "invalid skip option\n");
exit(1);
}
break;
case 7:
opts.startblock = strtol(optarg, &endptr, 10);
if (*endptr != '\0')
{
fprintf(stderr,
"invalid start block\n");
exit(1);
}
if (opts.startblock > MaxBlockNumber || opts.startblock < 0)
{
fprintf(stderr,
"start block out of bounds\n");
exit(1);
}
break;
case 8:
opts.endblock = strtol(optarg, &endptr, 10);
if (*endptr != '\0')
{
fprintf(stderr,
"invalid end block\n");
exit(1);
}
if (opts.endblock > MaxBlockNumber || opts.endblock < 0)
{
fprintf(stderr,
"end block out of bounds\n");
exit(1);
}
break;
case 9:
opts.rootdescend = true;
opts.parent_check = true;
break;
case 10:
opts.strict_names = false;
break;
case 11:
opts.heapallindexed = true;
break;
case 12:
opts.parent_check = true;
break;
default:
fprintf(stderr,
"Try \"%s --help\" for more information.\n",
progname);
exit(1);
}
}
if (opts.endblock >= 0 && opts.endblock < opts.startblock)
{
fprintf(stderr,
"end block precedes start block\n");
exit(1);
}
/*
* A single non-option arguments specifies a database name or connection
* string.
*/
if (optind < argc)
{
db = argv[optind];
optind++;
}
if (optind < argc)
{
pg_log_error("too many command-line arguments (first is \"%s\")",
argv[optind]);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
}
/* fill cparams except for dbname, which is set below */
cparams.pghost = host;
cparams.pgport = port;
cparams.pguser = username;
cparams.prompt_password = prompt_password;
cparams.dbname = NULL;
cparams.override_dbname = NULL;
setup_cancel_handler(NULL);
/* choose the database for our initial connection */
if (opts.alldb)
{
if (db != NULL)
{
pg_log_error("cannot specify a database name with --all");
exit(1);
}
cparams.dbname = maintenance_db;
}
else if (db != NULL)
{
if (opts.dbpattern)
{
pg_log_error("cannot specify both a database name and database patterns");
exit(1);
}
cparams.dbname = db;
}
if (opts.alldb || opts.dbpattern)
{
conn = connectMaintenanceDatabase(&cparams, progname, opts.echo);
compile_database_list(conn, &databases, NULL);
}
else
{
if (cparams.dbname == NULL)
{
if (getenv("PGDATABASE"))
cparams.dbname = getenv("PGDATABASE");
else if (getenv("PGUSER"))
cparams.dbname = getenv("PGUSER");
else
cparams.dbname = get_user_name_or_exit(progname);
}
conn = connectDatabase(&cparams, progname, opts.echo, false, true);
compile_database_list(conn, &databases, PQdb(conn));
}
if (databases.head == NULL)
{
if (conn != NULL)
disconnectDatabase(conn);
pg_log_error("no databases to check");
exit(0);
}
/*
* Compile a list of all relations spanning all databases to be checked.
*/
for (cell = databases.head; cell; cell = cell->next)
{
PGresult *result;
int ntups;
const char *amcheck_schema = NULL;
DatabaseInfo *dat = (DatabaseInfo *) cell->ptr;
cparams.override_dbname = dat->datname;
if (conn == NULL || strcmp(PQdb(conn), dat->datname) != 0)
{
if (conn != NULL)
disconnectDatabase(conn);
conn = connectDatabase(&cparams, progname, opts.echo, false, true);
}
/*
* Verify that amcheck is installed for this next database. User
* error could result in a database not having amcheck that should
* have it, but we also could be iterating over multiple databases
* where not all of them have amcheck installed (for example,
* 'template1').
*/
result = executeQuery(conn, amcheck_sql, opts.echo);
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
/* Querying the catalog failed. */
pg_log_error("database \"%s\": %s",
PQdb(conn), PQerrorMessage(conn));
pg_log_info("query was: %s", amcheck_sql);
PQclear(result);
disconnectDatabase(conn);
exit(1);
}
ntups = PQntuples(result);
if (ntups == 0)
{
/* Querying the catalog succeeded, but amcheck is missing. */
pg_log_warning("skipping database \"%s\": amcheck is not installed",
PQdb(conn));
disconnectDatabase(conn);
conn = NULL;
continue;
}
amcheck_schema = PQgetvalue(result, 0, 0);
if (opts.verbose)
pg_log_info("in database \"%s\": using amcheck version \"%s\" in schema \"%s\"",
PQdb(conn), PQgetvalue(result, 0, 1), amcheck_schema);
dat->amcheck_schema = PQescapeIdentifier(conn, amcheck_schema,
strlen(amcheck_schema));
PQclear(result);
compile_relation_list_one_db(conn, &relations, dat, &pagestotal);
}
/*
* Check that all inclusion patterns matched at least one schema or
* relation that we can check.
*/
for (pattern_id = 0; pattern_id < opts.include.len; pattern_id++)
{
PatternInfo *pat = &opts.include.data[pattern_id];
if (!pat->matched && (pat->nsp_regex != NULL || pat->rel_regex != NULL))
{
failed = opts.strict_names;
if (!opts.quiet || failed)
{
if (pat->heap_only)
log_no_match("no heap tables to check matching \"%s\"",
pat->pattern);
else if (pat->btree_only)
log_no_match("no btree indexes to check matching \"%s\"",
pat->pattern);
else if (pat->rel_regex == NULL)
log_no_match("no relations to check in schemas matching \"%s\"",
pat->pattern);
else
log_no_match("no relations to check matching \"%s\"",
pat->pattern);
}
}
}
if (failed)
{
if (conn != NULL)
disconnectDatabase(conn);
exit(1);
}
/*
* Set parallel_workers to the lesser of opts.jobs and the number of
* relations.
*/
parallel_workers = 0;
for (cell = relations.head; cell; cell = cell->next)
{
reltotal++;
if (parallel_workers < opts.jobs)
parallel_workers++;
}
if (reltotal == 0)
{
if (conn != NULL)
disconnectDatabase(conn);
pg_log_error("no relations to check");
exit(1);
}
progress_report(reltotal, relprogress, pagestotal, pageschecked,
NULL, true, false);
/*
* Main event loop.
*
* We use server-side parallelism to check up to parallel_workers
* relations in parallel. The list of relations was computed in database
* order, which minimizes the number of connects and disconnects as we
* process the list.
*/
latest_datname = NULL;
sa = ParallelSlotsSetup(parallel_workers, &cparams, progname, opts.echo,
NULL);
if (conn != NULL)
{
ParallelSlotsAdoptConn(sa, conn);
conn = NULL;
}
initPQExpBuffer(&sql);
for (relprogress = 0, cell = relations.head; cell; cell = cell->next)
{
ParallelSlot *free_slot;
RelationInfo *rel;
rel = (RelationInfo *) cell->ptr;
if (CancelRequested)
{
failed = true;
break;
}
/*
* The list of relations is in database sorted order. If this next
* relation is in a different database than the last one seen, we are
* about to start checking this database. Note that other slots may
* still be working on relations from prior databases.
*/
latest_datname = rel->datinfo->datname;
progress_report(reltotal, relprogress, pagestotal, pageschecked,
latest_datname, false, false);
relprogress++;
pageschecked += rel->blocks_to_check;
/*
* Get a parallel slot for the next amcheck command, blocking if
* necessary until one is available, or until a previously issued slot
* command fails, indicating that we should abort checking the
* remaining objects.
*/
free_slot = ParallelSlotsGetIdle(sa, rel->datinfo->datname);
if (!free_slot)
{
/*
* Something failed. We don't need to know what it was, because
* the handler should already have emitted the necessary error
* messages.
*/
failed = true;
break;
}
if (opts.verbose)
PQsetErrorVerbosity(free_slot->connection, PQERRORS_VERBOSE);
else if (opts.quiet)
PQsetErrorVerbosity(free_slot->connection, PQERRORS_TERSE);
/*
* Execute the appropriate amcheck command for this relation using our
* slot's database connection. We do not wait for the command to
* complete, nor do we perform any error checking, as that is done by
* the parallel slots and our handler callback functions.
*/
if (rel->is_heap)
{
if (opts.verbose)
{
if (opts.show_progress && progress_since_last_stderr)
fprintf(stderr, "\n");
pg_log_info("checking heap table \"%s\".\"%s\".\"%s\"",
rel->datinfo->datname, rel->nspname, rel->relname);
progress_since_last_stderr = false;
}
prepare_heap_command(&sql, rel, free_slot->connection);
rel->sql = pstrdup(sql.data); /* pg_free'd after command */
ParallelSlotSetHandler(free_slot, verify_heap_slot_handler, rel);
run_command(free_slot, rel->sql);
}
else
{
if (opts.verbose)
{
if (opts.show_progress && progress_since_last_stderr)
fprintf(stderr, "\n");
pg_log_info("checking btree index \"%s\".\"%s\".\"%s\"",
rel->datinfo->datname, rel->nspname, rel->relname);
progress_since_last_stderr = false;
}
prepare_btree_command(&sql, rel, free_slot->connection);
rel->sql = pstrdup(sql.data); /* pg_free'd after command */
ParallelSlotSetHandler(free_slot, verify_btree_slot_handler, rel);
run_command(free_slot, rel->sql);
}
}
termPQExpBuffer(&sql);
if (!failed)
{
/*
* Wait for all slots to complete, or for one to indicate that an
* error occurred. Like above, we rely on the handler emitting the
* necessary error messages.
*/
if (sa && !ParallelSlotsWaitCompletion(sa))
failed = true;
progress_report(reltotal, relprogress, pagestotal, pageschecked, NULL, true, true);
}
if (sa)
{
ParallelSlotsTerminate(sa);
FREE_AND_SET_NULL(sa);
}
if (failed)
exit(1);
if (!all_checks_pass)
exit(2);
}
/*
* prepare_heap_command
*
* Creates a SQL command for running amcheck checking on the given heap
* relation. The command is phrased as a SQL query, with column order and
* names matching the expectations of verify_heap_slot_handler, which will
* receive and handle each row returned from the verify_heapam() function.
*
* sql: buffer into which the heap table checking command will be written
* rel: relation information for the heap table to be checked
* conn: the connection to be used, for string escaping purposes
*/
static void
prepare_heap_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
{
resetPQExpBuffer(sql);
appendPQExpBuffer(sql,
"SELECT blkno, offnum, attnum, msg FROM %s.verify_heapam("
"\nrelation := %u, on_error_stop := %s, check_toast := %s, skip := '%s'",
rel->datinfo->amcheck_schema,
rel->reloid,
opts.on_error_stop ? "true" : "false",
opts.reconcile_toast ? "true" : "false",
opts.skip);
if (opts.startblock >= 0)
appendPQExpBuffer(sql, ", startblock := " INT64_FORMAT, opts.startblock);
if (opts.endblock >= 0)
appendPQExpBuffer(sql, ", endblock := " INT64_FORMAT, opts.endblock);
appendPQExpBuffer(sql, ")");
}
/*
* prepare_btree_command
*
* Creates a SQL command for running amcheck checking on the given btree index
* relation. The command does not select any columns, as btree checking
* functions do not return any, but rather return corruption information by
* raising errors, which verify_btree_slot_handler expects.
*
* sql: buffer into which the heap table checking command will be written
* rel: relation information for the index to be checked
* conn: the connection to be used, for string escaping purposes
*/
static void
prepare_btree_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
{
resetPQExpBuffer(sql);
/*
* Embed the database, schema, and relation name in the query, so if the
* check throws an error, the user knows which relation the error came
* from.
*/
if (opts.parent_check)
appendPQExpBuffer(sql,
"SELECT * FROM %s.bt_index_parent_check("
"index := '%u'::regclass, heapallindexed := %s, "
"rootdescend := %s)",
rel->datinfo->amcheck_schema,
rel->reloid,
(opts.heapallindexed ? "true" : "false"),
(opts.rootdescend ? "true" : "false"));
else
appendPQExpBuffer(sql,
"SELECT * FROM %s.bt_index_check("
"index := '%u'::regclass, heapallindexed := %s)",
rel->datinfo->amcheck_schema,
rel->reloid,
(opts.heapallindexed ? "true" : "false"));
}
/*
* run_command
*
* Sends a command to the server without waiting for the command to complete.
* Logs an error if the command cannot be sent, but otherwise any errors are
* expected to be handled by a ParallelSlotHandler.
*
* If reconnecting to the database is necessary, the cparams argument may be
* modified.
*
* slot: slot with connection to the server we should use for the command
* sql: query to send
*/
static void
run_command(ParallelSlot *slot, const char *sql)
{
if (opts.echo)
printf("%s\n", sql);
if (PQsendQuery(slot->connection, sql) == 0)
{
pg_log_error("error sending command to database \"%s\": %s",
PQdb(slot->connection),
PQerrorMessage(slot->connection));
pg_log_error("command was: %s", sql);
exit(1);
}
}
/*
* should_processing_continue
*
* Checks a query result returned from a query (presumably issued on a slot's
* connection) to determine if parallel slots should continue issuing further
* commands.
*
* Note: Heap relation corruption is reported by verify_heapam() via the result
* set, rather than an ERROR, but running verify_heapam() on a corrupted heap
* table may still result in an error being returned from the server due to
* missing relation files, bad checksums, etc. The btree corruption checking
* functions always use errors to communicate corruption messages. We can't
* just abort processing because we got a mere ERROR.
*
* res: result from an executed sql query
*/
static bool
should_processing_continue(PGresult *res)
{
const char *severity;
switch (PQresultStatus(res))
{
/* These are expected and ok */
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_NONFATAL_ERROR:
break;
/* This is expected but requires closer scrutiny */
case PGRES_FATAL_ERROR:
severity = PQresultErrorField(res, PG_DIAG_SEVERITY_NONLOCALIZED);
if (strcmp(severity, "FATAL") == 0)
return false;
if (strcmp(severity, "PANIC") == 0)
return false;
break;
/* These are unexpected */
case PGRES_BAD_RESPONSE:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
return false;
}
return true;
}
/*
* Returns a copy of the argument string with all lines indented four spaces.
*
* The caller should pg_free the result when finished with it.
*/
static char *
indent_lines(const char *str)
{
PQExpBufferData buf;
const char *c;
char *result;
initPQExpBuffer(&buf);
appendPQExpBufferStr(&buf, " ");
for (c = str; *c; c++)
{
appendPQExpBufferChar(&buf, *c);
if (c[0] == '\n' && c[1] != '\0')
appendPQExpBufferStr(&buf, " ");
}
result = pstrdup(buf.data);
termPQExpBuffer(&buf);
return result;
}
/*
* verify_heap_slot_handler
*
* ParallelSlotHandler that receives results from a heap table checking command
* created by prepare_heap_command and outputs the results for the user.
*
* res: result from an executed sql query
* conn: connection on which the sql query was executed
* context: the sql query being handled, as a cstring
*/
static bool
verify_heap_slot_handler(PGresult *res, PGconn *conn, void *context)
{
RelationInfo *rel = (RelationInfo *) context;
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
int i;
int ntups = PQntuples(res);
if (ntups > 0)
all_checks_pass = false;
for (i = 0; i < ntups; i++)
{
const char *msg;
/* The message string should never be null, but check */
if (PQgetisnull(res, i, 3))
msg = "NO MESSAGE";
else
msg = PQgetvalue(res, i, 3);
if (!PQgetisnull(res, i, 2))
printf("heap table \"%s\".\"%s\".\"%s\", block %s, offset %s, attribute %s:\n %s\n",
rel->datinfo->datname, rel->nspname, rel->relname,
PQgetvalue(res, i, 0), /* blkno */
PQgetvalue(res, i, 1), /* offnum */
PQgetvalue(res, i, 2), /* attnum */
msg);
else if (!PQgetisnull(res, i, 1))
printf("heap table \"%s\".\"%s\".\"%s\", block %s, offset %s:\n %s\n",
rel->datinfo->datname, rel->nspname, rel->relname,
PQgetvalue(res, i, 0), /* blkno */
PQgetvalue(res, i, 1), /* offnum */
msg);
else if (!PQgetisnull(res, i, 0))
printf("heap table \"%s\".\"%s\".\"%s\", block %s:\n %s\n",
rel->datinfo->datname, rel->nspname, rel->relname,
PQgetvalue(res, i, 0), /* blkno */
msg);
else
printf("heap table \"%s\".\"%s\".\"%s\":\n %s\n",
rel->datinfo->datname, rel->nspname, rel->relname, msg);
}
}
else if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
char *msg = indent_lines(PQerrorMessage(conn));
all_checks_pass = false;
printf("heap table \"%s\".\"%s\".\"%s\":\n%s",
rel->datinfo->datname, rel->nspname, rel->relname, msg);
if (opts.verbose)
printf("query was: %s\n", rel->sql);
FREE_AND_SET_NULL(msg);
}
FREE_AND_SET_NULL(rel->sql);
FREE_AND_SET_NULL(rel->nspname);
FREE_AND_SET_NULL(rel->relname);
return should_processing_continue(res);
}
/*
* verify_btree_slot_handler
*
* ParallelSlotHandler that receives results from a btree checking command
* created by prepare_btree_command and outputs them for the user. The results
* from the btree checking command is assumed to be empty, but when the results
* are an error code, the useful information about the corruption is expected
* in the connection's error message.
*
* res: result from an executed sql query
* conn: connection on which the sql query was executed
* context: unused
*/
static bool
verify_btree_slot_handler(PGresult *res, PGconn *conn, void *context)
{
RelationInfo *rel = (RelationInfo *) context;
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
int ntups = PQntuples(res);
if (ntups != 1)
{
/*
* We expect the btree checking functions to return one void row
* each, so we should output some sort of warning if we get
* anything else, not because it indicates corruption, but because
* it suggests a mismatch between amcheck and pg_amcheck versions.
*
* In conjunction with --progress, anything written to stderr at
* this time would present strangely to the user without an extra
* newline, so we print one. If we were multithreaded, we'd have
* to avoid splitting this across multiple calls, but we're in an
* event loop, so it doesn't matter.
*/
if (opts.show_progress && progress_since_last_stderr)
fprintf(stderr, "\n");
pg_log_warning("btree index \"%s\".\"%s\".\"%s\": btree checking function returned unexpected number of rows: %d",
rel->datinfo->datname, rel->nspname, rel->relname, ntups);
if (opts.verbose)
pg_log_info("query was: %s", rel->sql);
pg_log_warning("are %s's and amcheck's versions compatible?",
progname);
progress_since_last_stderr = false;
}
}
else
{
char *msg = indent_lines(PQerrorMessage(conn));
all_checks_pass = false;
printf("btree index \"%s\".\"%s\".\"%s\":\n%s",
rel->datinfo->datname, rel->nspname, rel->relname, msg);
if (opts.verbose)
printf("query was: %s\n", rel->sql);
FREE_AND_SET_NULL(msg);
}
FREE_AND_SET_NULL(rel->sql);
FREE_AND_SET_NULL(rel->nspname);
FREE_AND_SET_NULL(rel->relname);
return should_processing_continue(res);
}
/*
* help
*
* Prints help page for the program
*
* progname: the name of the executed program, such as "pg_amcheck"
*/
static void
help(const char *progname)
{
printf("%s uses amcheck module to check objects in a PostgreSQL database for corruption.\n\n", progname);
printf("Usage:\n");
printf(" %s [OPTION]... [DBNAME]\n", progname);
printf("\nTarget Options:\n");
printf(" -a, --all check all databases\n");
printf(" -d, --database=PATTERN check matching database(s)\n");
printf(" -D, --exclude-database=PATTERN do NOT check matching database(s)\n");
printf(" -i, --index=PATTERN check matching index(es)\n");
printf(" -I, --exclude-index=PATTERN do NOT check matching index(es)\n");
printf(" -r, --relation=PATTERN check matching relation(s)\n");
printf(" -R, --exclude-relation=PATTERN do NOT check matching relation(s)\n");
printf(" -s, --schema=PATTERN check matching schema(s)\n");
printf(" -S, --exclude-schema=PATTERN do NOT check matching schema(s)\n");
printf(" -t, --table=PATTERN check matching table(s)\n");
printf(" -T, --exclude-table=PATTERN do NOT check matching table(s)\n");
printf(" --no-dependent-indexes do NOT expand list of relations to include indexes\n");
printf(" --no-dependent-toast do NOT expand list of relations to include toast\n");
printf(" --no-strict-names do NOT require patterns to match objects\n");
printf("\nTable Checking Options:\n");
printf(" --exclude-toast-pointers do NOT follow relation toast pointers\n");
printf(" --on-error-stop stop checking at end of first corrupt page\n");
printf(" --skip=OPTION do NOT check \"all-frozen\" or \"all-visible\" blocks\n");
printf(" --startblock=BLOCK begin checking table(s) at the given block number\n");
printf(" --endblock=BLOCK check table(s) only up to the given block number\n");
printf("\nBtree Index Checking Options:\n");
printf(" --heapallindexed check all heap tuples are found within indexes\n");
printf(" --parent-check check index parent/child relationships\n");
printf(" --rootdescend search from root page to refind tuples\n");
printf("\nConnection options:\n");
printf(" -h, --host=HOSTNAME database server host or socket directory\n");
printf(" -p, --port=PORT database server port\n");
printf(" -U, --username=USERNAME user name to connect as\n");
printf(" -w, --no-password never prompt for password\n");
printf(" -W, --password force password prompt\n");
printf(" --maintenance-db=DBNAME alternate maintenance database\n");
printf("\nOther Options:\n");
printf(" -e, --echo show the commands being sent to the server\n");
printf(" -j, --jobs=NUM use this many concurrent connections to the server\n");
printf(" -q, --quiet don't write any messages\n");
printf(" -v, --verbose write a lot of output\n");
printf(" -V, --version output version information, then exit\n");
printf(" -P, --progress show progress information\n");
printf(" -?, --help show this help, then exit\n");
printf("\nReport bugs to <%s>.\n", PACKAGE_BUGREPORT);
printf("%s home page: <%s>\n", PACKAGE_NAME, PACKAGE_URL);
}
/*
* Print a progress report based on the global variables.
*
* Progress report is written at maximum once per second, unless the force
* parameter is set to true.
*
* If finished is set to true, this is the last progress report. The cursor
* is moved to the next line.
*/
static void
progress_report(uint64 relations_total, uint64 relations_checked,
uint64 relpages_total, uint64 relpages_checked,
const char *datname, bool force, bool finished)
{
int percent_rel = 0;
int percent_pages = 0;
char checked_rel[32];
char total_rel[32];
char checked_pages[32];
char total_pages[32];
pg_time_t now;
if (!opts.show_progress)
return;
now = time(NULL);
if (now == last_progress_report && !force && !finished)
return; /* Max once per second */
last_progress_report = now;
if (relations_total)
percent_rel = (int) (relations_checked * 100 / relations_total);
if (relpages_total)
percent_pages = (int) (relpages_checked * 100 / relpages_total);
/*
* Separate step to keep platform-dependent format code out of fprintf
* calls. We only test for INT64_FORMAT availability in snprintf, not
* fprintf.
*/
snprintf(checked_rel, sizeof(checked_rel), INT64_FORMAT, relations_checked);
snprintf(total_rel, sizeof(total_rel), INT64_FORMAT, relations_total);
snprintf(checked_pages, sizeof(checked_pages), INT64_FORMAT, relpages_checked);
snprintf(total_pages, sizeof(total_pages), INT64_FORMAT, relpages_total);
#define VERBOSE_DATNAME_LENGTH 35
if (opts.verbose)
{
if (!datname)
/*
* No datname given, so clear the status line (used for first and
* last call)
*/
fprintf(stderr,
"%*s/%s relations (%d%%) %*s/%s pages (%d%%) %*s",
(int) strlen(total_rel),
checked_rel, total_rel, percent_rel,
(int) strlen(total_pages),
checked_pages, total_pages, percent_pages,
VERBOSE_DATNAME_LENGTH + 2, "");
else
{
bool truncate = (strlen(datname) > VERBOSE_DATNAME_LENGTH);
fprintf(stderr,
"%*s/%s relations (%d%%) %*s/%s pages (%d%%), (%s%-*.*s)",
(int) strlen(total_rel),
checked_rel, total_rel, percent_rel,
(int) strlen(total_pages),
checked_pages, total_pages, percent_pages,
/* Prefix with "..." if we do leading truncation */
truncate ? "..." : "",
truncate ? VERBOSE_DATNAME_LENGTH - 3 : VERBOSE_DATNAME_LENGTH,
truncate ? VERBOSE_DATNAME_LENGTH - 3 : VERBOSE_DATNAME_LENGTH,
/* Truncate datname at beginning if it's too long */
truncate ? datname + strlen(datname) - VERBOSE_DATNAME_LENGTH + 3 : datname);
}
}
else
fprintf(stderr,
"%*s/%s relations (%d%%) %*s/%s pages (%d%%)",
(int) strlen(total_rel),
checked_rel, total_rel, percent_rel,
(int) strlen(total_pages),
checked_pages, total_pages, percent_pages);
/*
* Stay on the same line if reporting to a terminal and we're not done
* yet.
*/
if (!finished && isatty(fileno(stderr)))
{
fputc('\r', stderr);
progress_since_last_stderr = true;
}
else
fputc('\n', stderr);
}
/*
* Extend the pattern info array to hold one additional initialized pattern
* info entry.
*
* Returns a pointer to the new entry.
*/
static PatternInfo *
extend_pattern_info_array(PatternInfoArray *pia)
{
PatternInfo *result;
pia->len++;
pia->data = (PatternInfo *) pg_realloc(pia->data, pia->len * sizeof(PatternInfo));
result = &pia->data[pia->len - 1];
memset(result, 0, sizeof(*result));
return result;
}
/*
* append_database_pattern
*
* Adds the given pattern interpreted as a database name pattern.
*
* pia: the pattern info array to be appended
* pattern: the database name pattern
* encoding: client encoding for parsing the pattern
*/
static void
append_database_pattern(PatternInfoArray *pia, const char *pattern, int encoding)
{
PQExpBufferData buf;
PatternInfo *info = extend_pattern_info_array(pia);
initPQExpBuffer(&buf);
patternToSQLRegex(encoding, NULL, NULL, &buf, pattern, false);
info->pattern = pattern;
info->db_regex = pstrdup(buf.data);
termPQExpBuffer(&buf);
}
/*
* append_schema_pattern
*
* Adds the given pattern interpreted as a schema name pattern.
*
* pia: the pattern info array to be appended
* pattern: the schema name pattern
* encoding: client encoding for parsing the pattern
*/
static void
append_schema_pattern(PatternInfoArray *pia, const char *pattern, int encoding)
{
PQExpBufferData dbbuf;
PQExpBufferData nspbuf;
PatternInfo *info = extend_pattern_info_array(pia);
initPQExpBuffer(&dbbuf);
initPQExpBuffer(&nspbuf);
patternToSQLRegex(encoding, NULL, &dbbuf, &nspbuf, pattern, false);
info->pattern = pattern;
if (dbbuf.data[0])
{
opts.dbpattern = true;
info->db_regex = pstrdup(dbbuf.data);
}
if (nspbuf.data[0])
info->nsp_regex = pstrdup(nspbuf.data);
termPQExpBuffer(&dbbuf);
termPQExpBuffer(&nspbuf);
}
/*
* append_relation_pattern_helper
*
* Adds to a list the given pattern interpreted as a relation pattern.
*
* pia: the pattern info array to be appended
* pattern: the relation name pattern
* encoding: client encoding for parsing the pattern
* heap_only: whether the pattern should only be matched against heap tables
* btree_only: whether the pattern should only be matched against btree indexes
*/
static void
append_relation_pattern_helper(PatternInfoArray *pia, const char *pattern,
int encoding, bool heap_only, bool btree_only)
{
PQExpBufferData dbbuf;
PQExpBufferData nspbuf;
PQExpBufferData relbuf;
PatternInfo *info = extend_pattern_info_array(pia);
initPQExpBuffer(&dbbuf);
initPQExpBuffer(&nspbuf);
initPQExpBuffer(&relbuf);
patternToSQLRegex(encoding, &dbbuf, &nspbuf, &relbuf, pattern, false);
info->pattern = pattern;
if (dbbuf.data[0])
{
opts.dbpattern = true;
info->db_regex = pstrdup(dbbuf.data);
}
if (nspbuf.data[0])
info->nsp_regex = pstrdup(nspbuf.data);
if (relbuf.data[0])
info->rel_regex = pstrdup(relbuf.data);
termPQExpBuffer(&dbbuf);
termPQExpBuffer(&nspbuf);
termPQExpBuffer(&relbuf);
info->heap_only = heap_only;
info->btree_only = btree_only;
}
/*
* append_relation_pattern
*
* Adds the given pattern interpreted as a relation pattern, to be matched
* against both heap tables and btree indexes.
*
* pia: the pattern info array to be appended
* pattern: the relation name pattern
* encoding: client encoding for parsing the pattern
*/
static void
append_relation_pattern(PatternInfoArray *pia, const char *pattern, int encoding)
{
append_relation_pattern_helper(pia, pattern, encoding, false, false);
}
/*
* append_heap_pattern
*
* Adds the given pattern interpreted as a relation pattern, to be matched only
* against heap tables.
*
* pia: the pattern info array to be appended
* pattern: the relation name pattern
* encoding: client encoding for parsing the pattern
*/
static void
append_heap_pattern(PatternInfoArray *pia, const char *pattern, int encoding)
{
append_relation_pattern_helper(pia, pattern, encoding, true, false);
}
/*
* append_btree_pattern
*
* Adds the given pattern interpreted as a relation pattern, to be matched only
* against btree indexes.
*
* pia: the pattern info array to be appended
* pattern: the relation name pattern
* encoding: client encoding for parsing the pattern
*/
static void
append_btree_pattern(PatternInfoArray *pia, const char *pattern, int encoding)
{
append_relation_pattern_helper(pia, pattern, encoding, false, true);
}
/*
* append_db_pattern_cte
*
* Appends to the buffer the body of a Common Table Expression (CTE) containing
* the database portions filtered from the list of patterns expressed as two
* columns:
*
* pattern_id: the index of this pattern in pia->data[]
* rgx: the database regular expression parsed from the pattern
*
* Patterns without a database portion are skipped. Patterns with more than
* just a database portion are optionally skipped, depending on argument
* 'inclusive'.
*
* buf: the buffer to be appended
* pia: the array of patterns to be inserted into the CTE
* conn: the database connection
* inclusive: whether to include patterns with schema and/or relation parts
*
* Returns whether any database patterns were appended.
*/
static bool
append_db_pattern_cte(PQExpBuffer buf, const PatternInfoArray *pia,
PGconn *conn, bool inclusive)
{
int pattern_id;
const char *comma;
bool have_values;
comma = "";
have_values = false;
for (pattern_id = 0; pattern_id < pia->len; pattern_id++)
{
PatternInfo *info = &pia->data[pattern_id];
if (info->db_regex != NULL &&
(inclusive || (info->nsp_regex == NULL && info->rel_regex == NULL)))
{
if (!have_values)
appendPQExpBufferStr(buf, "\nVALUES");
have_values = true;
appendPQExpBuffer(buf, "%s\n(%d, ", comma, pattern_id);
appendStringLiteralConn(buf, info->db_regex, conn);
appendPQExpBufferStr(buf, ")");
comma = ",";
}
}
if (!have_values)
appendPQExpBufferStr(buf, "\nSELECT NULL, NULL, NULL WHERE false");
return have_values;
}
/*
* compile_database_list
*
* If any database patterns exist, or if --all was given, compiles a distinct
* list of databases to check using a SQL query based on the patterns plus the
* literal initial database name, if given. If no database patterns exist and
* --all was not given, the query is not necessary, and only the initial
* database name (if any) is added to the list.
*
* conn: connection to the initial database
* databases: the list onto which databases should be appended
* initial_dbname: an optional extra database name to include in the list
*/
static void
compile_database_list(PGconn *conn, SimplePtrList *databases,
const char *initial_dbname)
{
PGresult *res;
PQExpBufferData sql;
int ntups;
int i;
bool fatal;
if (initial_dbname)
{
DatabaseInfo *dat = (DatabaseInfo *) pg_malloc0(sizeof(DatabaseInfo));
/* This database is included. Add to list */
if (opts.verbose)
pg_log_info("including database: \"%s\"", initial_dbname);
dat->datname = pstrdup(initial_dbname);
simple_ptr_list_append(databases, dat);
}
initPQExpBuffer(&sql);
/* Append the include patterns CTE. */
appendPQExpBufferStr(&sql, "WITH include_raw (pattern_id, rgx) AS (");
if (!append_db_pattern_cte(&sql, &opts.include, conn, true) &&
!opts.alldb)
{
/*
* None of the inclusion patterns (if any) contain database portions,
* so there is no need to query the database to resolve database
* patterns.
*
* Since we're also not operating under --all, we don't need to query
* the exhaustive list of connectable databases, either.
*/
termPQExpBuffer(&sql);
return;
}
/* Append the exclude patterns CTE. */
appendPQExpBufferStr(&sql, "),\nexclude_raw (pattern_id, rgx) AS (");
append_db_pattern_cte(&sql, &opts.exclude, conn, false);
appendPQExpBufferStr(&sql, "),");
/*
* Append the database CTE, which includes whether each database is
* connectable and also joins against exclude_raw to determine whether
* each database is excluded.
*/
appendPQExpBufferStr(&sql,
"\ndatabase (datname) AS ("
"\nSELECT d.datname "
"FROM pg_catalog.pg_database d "
"LEFT OUTER JOIN exclude_raw e "
"ON d.datname ~ e.rgx "
"\nWHERE d.datallowconn "
"AND e.pattern_id IS NULL"
"),"
/*
* Append the include_pat CTE, which joins the include_raw CTE against the
* databases CTE to determine if all the inclusion patterns had matches,
* and whether each matched pattern had the misfortune of only matching
* excluded or unconnectable databases.
*/
"\ninclude_pat (pattern_id, checkable) AS ("
"\nSELECT i.pattern_id, "
"COUNT(*) FILTER ("
"WHERE d IS NOT NULL"
") AS checkable"
"\nFROM include_raw i "
"LEFT OUTER JOIN database d "
"ON d.datname ~ i.rgx"
"\nGROUP BY i.pattern_id"
"),"
/*
* Append the filtered_databases CTE, which selects from the database CTE
* optionally joined against the include_raw CTE to only select databases
* that match an inclusion pattern. This appears to duplicate what the
* include_pat CTE already did above, but here we want only databases, and
* there we wanted patterns.
*/
"\nfiltered_databases (datname) AS ("
"\nSELECT DISTINCT d.datname "
"FROM database d");
if (!opts.alldb)
appendPQExpBufferStr(&sql,
" INNER JOIN include_raw i "
"ON d.datname ~ i.rgx");
appendPQExpBufferStr(&sql,
")"
/*
* Select the checkable databases and the unmatched inclusion patterns.
*/
"\nSELECT pattern_id, datname FROM ("
"\nSELECT pattern_id, NULL::TEXT AS datname "
"FROM include_pat "
"WHERE checkable = 0 "
"UNION ALL"
"\nSELECT NULL, datname "
"FROM filtered_databases"
") AS combined_records"
"\nORDER BY pattern_id NULLS LAST, datname");
res = executeQuery(conn, sql.data, opts.echo);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("query failed: %s", PQerrorMessage(conn));
pg_log_info("query was: %s", sql.data);
disconnectDatabase(conn);
exit(1);
}
termPQExpBuffer(&sql);
ntups = PQntuples(res);
for (fatal = false, i = 0; i < ntups; i++)
{
int pattern_id = -1;
const char *datname = NULL;
if (!PQgetisnull(res, i, 0))
pattern_id = atoi(PQgetvalue(res, i, 0));
if (!PQgetisnull(res, i, 1))
datname = PQgetvalue(res, i, 1);
if (pattern_id >= 0)
{
/*
* Current record pertains to an inclusion pattern that matched no
* checkable databases.
*/
fatal = opts.strict_names;
if (pattern_id >= opts.include.len)
{
pg_log_error("internal error: received unexpected database pattern_id %d",
pattern_id);
exit(1);
}
log_no_match("no connectable databases to check matching \"%s\"",
opts.include.data[pattern_id].pattern);
}
else
{
/* Current record pertains to a database */
Assert(datname != NULL);
/* Avoid entering a duplicate entry matching the initial_dbname */
if (initial_dbname != NULL && strcmp(initial_dbname, datname) == 0)
continue;
DatabaseInfo *dat = (DatabaseInfo *) pg_malloc0(sizeof(DatabaseInfo));
/* This database is included. Add to list */
if (opts.verbose)
pg_log_info("including database: \"%s\"", datname);
dat->datname = pstrdup(datname);
simple_ptr_list_append(databases, dat);
}
}
PQclear(res);
if (fatal)
{
if (conn != NULL)
disconnectDatabase(conn);
exit(1);
}
}
/*
* append_rel_pattern_raw_cte
*
* Appends to the buffer the body of a Common Table Expression (CTE) containing
* the given patterns as six columns:
*
* pattern_id: the index of this pattern in pia->data[]
* db_regex: the database regexp parsed from the pattern, or NULL if the
* pattern had no database part
* nsp_regex: the namespace regexp parsed from the pattern, or NULL if the
* pattern had no namespace part
* rel_regex: the relname regexp parsed from the pattern, or NULL if the
* pattern had no relname part
* heap_only: true if the pattern applies only to heap tables (not indexes)
* btree_only: true if the pattern applies only to btree indexes (not tables)
*
* buf: the buffer to be appended
* patterns: the array of patterns to be inserted into the CTE
* conn: the database connection
*/
static void
append_rel_pattern_raw_cte(PQExpBuffer buf, const PatternInfoArray *pia,
PGconn *conn)
{
int pattern_id;
const char *comma;
bool have_values;
comma = "";
have_values = false;
for (pattern_id = 0; pattern_id < pia->len; pattern_id++)
{
PatternInfo *info = &pia->data[pattern_id];
if (!have_values)
appendPQExpBufferStr(buf, "\nVALUES");
have_values = true;
appendPQExpBuffer(buf, "%s\n(%d::INTEGER, ", comma, pattern_id);
if (info->db_regex == NULL)
appendPQExpBufferStr(buf, "NULL");
else
appendStringLiteralConn(buf, info->db_regex, conn);
appendPQExpBufferStr(buf, "::TEXT, ");
if (info->nsp_regex == NULL)
appendPQExpBufferStr(buf, "NULL");
else
appendStringLiteralConn(buf, info->nsp_regex, conn);
appendPQExpBufferStr(buf, "::TEXT, ");
if (info->rel_regex == NULL)
appendPQExpBufferStr(buf, "NULL");
else
appendStringLiteralConn(buf, info->rel_regex, conn);
if (info->heap_only)
appendPQExpBufferStr(buf, "::TEXT, true::BOOLEAN");
else
appendPQExpBufferStr(buf, "::TEXT, false::BOOLEAN");
if (info->btree_only)
appendPQExpBufferStr(buf, ", true::BOOLEAN");
else
appendPQExpBufferStr(buf, ", false::BOOLEAN");
appendPQExpBufferStr(buf, ")");
comma = ",";
}
if (!have_values)
appendPQExpBufferStr(buf,
"\nSELECT NULL::INTEGER, NULL::TEXT, NULL::TEXT, "
"NULL::TEXT, NULL::BOOLEAN, NULL::BOOLEAN "
"WHERE false");
}
/*
* append_rel_pattern_filtered_cte
*
* Appends to the buffer a Common Table Expression (CTE) which selects
* all patterns from the named raw CTE, filtered by database. All patterns
* which have no database portion or whose database portion matches our
* connection's database name are selected, with other patterns excluded.
*
* The basic idea here is that if we're connected to database "foo" and we have
* patterns "foo.bar.baz", "alpha.beta" and "one.two.three", we only want to
* use the first two while processing relations in this database, as the third
* one is not relevant.
*
* buf: the buffer to be appended
* raw: the name of the CTE to select from
* filtered: the name of the CTE to create
* conn: the database connection
*/
static void
append_rel_pattern_filtered_cte(PQExpBuffer buf, const char *raw,
const char *filtered, PGconn *conn)
{
appendPQExpBuffer(buf,
"\n%s (pattern_id, nsp_regex, rel_regex, heap_only, btree_only) AS ("
"\nSELECT pattern_id, nsp_regex, rel_regex, heap_only, btree_only "
"FROM %s r"
"\nWHERE (r.db_regex IS NULL "
"OR ",
filtered, raw);
appendStringLiteralConn(buf, PQdb(conn), conn);
appendPQExpBufferStr(buf, " ~ r.db_regex)");
appendPQExpBufferStr(buf,
" AND (r.nsp_regex IS NOT NULL"
" OR r.rel_regex IS NOT NULL)"
"),");
}
/*
* compile_relation_list_one_db
*
* Compiles a list of relations to check within the currently connected
* database based on the user supplied options, sorted by descending size,
* and appends them to the given list of relations.
*
* The cells of the constructed list contain all information about the relation
* necessary to connect to the database and check the object, including which
* database to connect to, where contrib/amcheck is installed, and the Oid and
* type of object (heap table vs. btree index). Rather than duplicating the
* database details per relation, the relation structs use references to the
* same database object, provided by the caller.
*
* conn: connection to this next database, which should be the same as in 'dat'
* relations: list onto which the relations information should be appended
* dat: the database info struct for use by each relation
* pagecount: gets incremented by the number of blocks to check in all
* relations added
*/
static void
compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
const DatabaseInfo *dat,
uint64 *pagecount)
{
PGresult *res;
PQExpBufferData sql;
int ntups;
int i;
initPQExpBuffer(&sql);
appendPQExpBufferStr(&sql, "WITH");
/* Append CTEs for the relation inclusion patterns, if any */
if (!opts.allrel)
{
appendPQExpBufferStr(&sql,
" include_raw (pattern_id, db_regex, nsp_regex, rel_regex, heap_only, btree_only) AS (");
append_rel_pattern_raw_cte(&sql, &opts.include, conn);
appendPQExpBufferStr(&sql, "\n),");
append_rel_pattern_filtered_cte(&sql, "include_raw", "include_pat", conn);
}
/* Append CTEs for the relation exclusion patterns, if any */
if (opts.excludetbl || opts.excludeidx || opts.excludensp)
{
appendPQExpBufferStr(&sql,
" exclude_raw (pattern_id, db_regex, nsp_regex, rel_regex, heap_only, btree_only) AS (");
append_rel_pattern_raw_cte(&sql, &opts.exclude, conn);
appendPQExpBufferStr(&sql, "\n),");
append_rel_pattern_filtered_cte(&sql, "exclude_raw", "exclude_pat", conn);
}
/* Append the relation CTE. */
appendPQExpBufferStr(&sql,
" relation (pattern_id, oid, nspname, relname, reltoastrelid, relpages, is_heap, is_btree) AS ("
"\nSELECT DISTINCT ON (c.oid");
if (!opts.allrel)
appendPQExpBufferStr(&sql, ", ip.pattern_id) ip.pattern_id,");
else
appendPQExpBufferStr(&sql, ") NULL::INTEGER AS pattern_id,");
appendPQExpBuffer(&sql,
"\nc.oid, n.nspname, c.relname, c.reltoastrelid, c.relpages, "
"c.relam = %u AS is_heap, "
"c.relam = %u AS is_btree"
"\nFROM pg_catalog.pg_class c "
"INNER JOIN pg_catalog.pg_namespace n "
"ON c.relnamespace = n.oid",
HEAP_TABLE_AM_OID, BTREE_AM_OID);
if (!opts.allrel)
appendPQExpBuffer(&sql,
"\nINNER JOIN include_pat ip"
"\nON (n.nspname ~ ip.nsp_regex OR ip.nsp_regex IS NULL)"
"\nAND (c.relname ~ ip.rel_regex OR ip.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ip.heap_only)"
"\nAND (c.relam = %u OR NOT ip.btree_only)",
HEAP_TABLE_AM_OID, BTREE_AM_OID);
if (opts.excludetbl || opts.excludeidx || opts.excludensp)
appendPQExpBuffer(&sql,
"\nLEFT OUTER JOIN exclude_pat ep"
"\nON (n.nspname ~ ep.nsp_regex OR ep.nsp_regex IS NULL)"
"\nAND (c.relname ~ ep.rel_regex OR ep.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ep.heap_only OR ep.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ep.btree_only OR ep.rel_regex IS NULL)",
HEAP_TABLE_AM_OID, BTREE_AM_OID);
if (opts.excludetbl || opts.excludeidx || opts.excludensp)
appendPQExpBufferStr(&sql, "\nWHERE ep.pattern_id IS NULL");
else
appendPQExpBufferStr(&sql, "\nWHERE true");
/*
* We need to be careful not to break the --no-dependent-toast and
* --no-dependent-indexes options. By default, the btree indexes, toast
* tables, and toast table btree indexes associated with primary heap
* tables are included, using their own CTEs below. We implement the
* --exclude-* options by not creating those CTEs, but that's no use if
* we've already selected the toast and indexes here. On the other hand,
* we want inclusion patterns that match indexes or toast tables to be
* honored. So, if inclusion patterns were given, we want to select all
* tables, toast tables, or indexes that match the patterns. But if no
* inclusion patterns were given, and we're simply matching all relations,
* then we only want to match the primary tables here.
*/
if (opts.allrel)
appendPQExpBuffer(&sql,
" AND c.relam = %u "
"AND c.relkind IN ('r', 'm', 't') "
"AND c.relnamespace != %u",
HEAP_TABLE_AM_OID, PG_TOAST_NAMESPACE);
else
appendPQExpBuffer(&sql,
" AND c.relam IN (%u, %u)"
"AND c.relkind IN ('r', 'm', 't', 'i') "
"AND ((c.relam = %u AND c.relkind IN ('r', 'm', 't')) OR "
"(c.relam = %u AND c.relkind = 'i'))",
HEAP_TABLE_AM_OID, BTREE_AM_OID,
HEAP_TABLE_AM_OID, BTREE_AM_OID);
appendPQExpBufferStr(&sql,
"\nORDER BY c.oid)");
if (!opts.no_toast_expansion)
{
/*
* Include a CTE for toast tables associated with primary heap tables
* selected above, filtering by exclusion patterns (if any) that match
* toast table names.
*/
appendPQExpBufferStr(&sql,
", toast (oid, nspname, relname, relpages) AS ("
"\nSELECT t.oid, 'pg_toast', t.relname, t.relpages"
"\nFROM pg_catalog.pg_class t "
"INNER JOIN relation r "
"ON r.reltoastrelid = t.oid");
if (opts.excludetbl || opts.excludensp)
appendPQExpBufferStr(&sql,
"\nLEFT OUTER JOIN exclude_pat ep"
"\nON ('pg_toast' ~ ep.nsp_regex OR ep.nsp_regex IS NULL)"
"\nAND (t.relname ~ ep.rel_regex OR ep.rel_regex IS NULL)"
"\nAND ep.heap_only"
"\nWHERE ep.pattern_id IS NULL");
appendPQExpBufferStr(&sql,
"\n)");
}
if (!opts.no_btree_expansion)
{
/*
* Include a CTE for btree indexes associated with primary heap tables
* selected above, filtering by exclusion patterns (if any) that match
* btree index names.
*/
appendPQExpBuffer(&sql,
", index (oid, nspname, relname, relpages) AS ("
"\nSELECT c.oid, r.nspname, c.relname, c.relpages "
"FROM relation r"
"\nINNER JOIN pg_catalog.pg_index i "
"ON r.oid = i.indrelid "
"INNER JOIN pg_catalog.pg_class c "
"ON i.indexrelid = c.oid");
if (opts.excludeidx || opts.excludensp)
appendPQExpBufferStr(&sql,
"\nINNER JOIN pg_catalog.pg_namespace n "
"ON c.relnamespace = n.oid"
"\nLEFT OUTER JOIN exclude_pat ep "
"ON (n.nspname ~ ep.nsp_regex OR ep.nsp_regex IS NULL) "
"AND (c.relname ~ ep.rel_regex OR ep.rel_regex IS NULL) "
"AND ep.btree_only"
"\nWHERE ep.pattern_id IS NULL");
else
appendPQExpBufferStr(&sql,
"\nWHERE true");
appendPQExpBuffer(&sql,
" AND c.relam = %u "
"AND c.relkind = 'i'",
BTREE_AM_OID);
if (opts.no_toast_expansion)
appendPQExpBuffer(&sql,
" AND c.relnamespace != %u",
PG_TOAST_NAMESPACE);
appendPQExpBufferStr(&sql, "\n)");
}
if (!opts.no_toast_expansion && !opts.no_btree_expansion)
{
/*
* Include a CTE for btree indexes associated with toast tables of
* primary heap tables selected above, filtering by exclusion patterns
* (if any) that match the toast index names.
*/
appendPQExpBuffer(&sql,
", toast_index (oid, nspname, relname, relpages) AS ("
"\nSELECT c.oid, 'pg_toast', c.relname, c.relpages "
"FROM toast t "
"INNER JOIN pg_catalog.pg_index i "
"ON t.oid = i.indrelid"
"\nINNER JOIN pg_catalog.pg_class c "
"ON i.indexrelid = c.oid");
if (opts.excludeidx)
appendPQExpBufferStr(&sql,
"\nLEFT OUTER JOIN exclude_pat ep "
"ON ('pg_toast' ~ ep.nsp_regex OR ep.nsp_regex IS NULL) "
"AND (c.relname ~ ep.rel_regex OR ep.rel_regex IS NULL) "
"AND ep.btree_only "
"WHERE ep.pattern_id IS NULL");
else
appendPQExpBufferStr(&sql,
"\nWHERE true");
appendPQExpBuffer(&sql,
" AND c.relam = %u"
" AND c.relkind = 'i')",
BTREE_AM_OID);
}
/*
* Roll-up distinct rows from CTEs.
*
* Relations that match more than one pattern may occur more than once in
* the list, and indexes and toast for primary relations may also have
* matched in their own right, so we rely on UNION to deduplicate the
* list.
*/
appendPQExpBuffer(&sql,
"\nSELECT pattern_id, is_heap, is_btree, oid, nspname, relname, relpages "
"FROM (");
appendPQExpBufferStr(&sql,
/* Inclusion patterns that failed to match */
"\nSELECT pattern_id, is_heap, is_btree, "
"NULL::OID AS oid, "
"NULL::TEXT AS nspname, "
"NULL::TEXT AS relname, "
"NULL::INTEGER AS relpages"
"\nFROM relation "
"WHERE pattern_id IS NOT NULL "
"UNION"
/* Primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, "
"is_heap, is_btree, oid, nspname, relname, relpages "
"FROM relation");
if (!opts.no_toast_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Toast tables for primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, TRUE AS is_heap, "
"FALSE AS is_btree, oid, nspname, relname, relpages "
"FROM toast");
if (!opts.no_btree_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Indexes for primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, "
"TRUE AS is_btree, oid, nspname, relname, relpages "
"FROM index");
if (!opts.no_toast_expansion && !opts.no_btree_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Indexes for toast relations */
"\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, "
"TRUE AS is_btree, oid, nspname, relname, relpages "
"FROM toast_index");
appendPQExpBufferStr(&sql,
"\n) AS combined_records "
"ORDER BY relpages DESC NULLS FIRST, oid");
res = executeQuery(conn, sql.data, opts.echo);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("query failed: %s", PQerrorMessage(conn));
pg_log_info("query was: %s", sql.data);
disconnectDatabase(conn);
exit(1);
}
termPQExpBuffer(&sql);
ntups = PQntuples(res);
for (i = 0; i < ntups; i++)
{
int pattern_id = -1;
bool is_heap = false;
bool is_btree = false;
Oid oid = InvalidOid;
const char *nspname = NULL;
const char *relname = NULL;
int relpages = 0;
if (!PQgetisnull(res, i, 0))
pattern_id = atoi(PQgetvalue(res, i, 0));
if (!PQgetisnull(res, i, 1))
is_heap = (PQgetvalue(res, i, 1)[0] == 't');
if (!PQgetisnull(res, i, 2))
is_btree = (PQgetvalue(res, i, 2)[0] == 't');
if (!PQgetisnull(res, i, 3))
oid = atooid(PQgetvalue(res, i, 3));
if (!PQgetisnull(res, i, 4))
nspname = PQgetvalue(res, i, 4);
if (!PQgetisnull(res, i, 5))
relname = PQgetvalue(res, i, 5);
if (!PQgetisnull(res, i, 6))
relpages = atoi(PQgetvalue(res, i, 6));
if (pattern_id >= 0)
{
/*
* Current record pertains to an inclusion pattern. Record that
* it matched.
*/
if (pattern_id >= opts.include.len)
{
pg_log_error("internal error: received unexpected relation pattern_id %d",
pattern_id);
exit(1);
}
opts.include.data[pattern_id].matched = true;
}
else
{
/* Current record pertains to a relation */
RelationInfo *rel = (RelationInfo *) pg_malloc0(sizeof(RelationInfo));
Assert(OidIsValid(oid));
Assert((is_heap && !is_btree) || (is_btree && !is_heap));
rel->datinfo = dat;
rel->reloid = oid;
rel->is_heap = is_heap;
rel->nspname = pstrdup(nspname);
rel->relname = pstrdup(relname);
rel->relpages = relpages;
rel->blocks_to_check = relpages;
if (is_heap && (opts.startblock >= 0 || opts.endblock >= 0))
{
/*
* We apply --startblock and --endblock to heap tables, but
* not btree indexes, and for progress purposes we need to
* track how many blocks we expect to check.
*/
if (opts.endblock >= 0 && rel->blocks_to_check > opts.endblock)
rel->blocks_to_check = opts.endblock + 1;
if (opts.startblock >= 0)
{
if (rel->blocks_to_check > opts.startblock)
rel->blocks_to_check -= opts.startblock;
else
rel->blocks_to_check = 0;
}
}
*pagecount += rel->blocks_to_check;
simple_ptr_list_append(relations, rel);
}
}
PQclear(res);
}
use strict;
use warnings;
use TestLib;
use Test::More tests => 8;
program_help_ok('pg_amcheck');
program_version_ok('pg_amcheck');
program_options_handling_ok('pg_amcheck');
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 76;
# Test set-up
my ($node, $port);
$node = get_new_node('test');
$node->init;
$node->start;
$port = $node->port;
# Load the amcheck extension, upon which pg_amcheck depends
$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
#########################################
# Test non-existent databases
# Failing to connect to the initial database is an error.
$node->command_checks_all(
[ 'pg_amcheck', 'qqq' ],
1,
[ qr/^$/ ],
[ qr/FATAL: database "qqq" does not exist/ ],
'checking a non-existent database');
# Failing to resolve a database pattern is an error by default.
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'qqq', '-d', 'postgres' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no connectable databases to check matching "qqq"/ ],
'checking an unresolvable database pattern');
# But only a warning under --no-strict-names
$node->command_checks_all(
[ 'pg_amcheck', '--no-strict-names', '-d', 'qqq', '-d', 'postgres' ],
0,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: no connectable databases to check matching "qqq"/ ],
'checking an unresolvable database pattern under --no-strict-names');
# Check that a substring of an existent database name does not get interpreted
# as a matching pattern.
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'post', '-d', 'postgres' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no connectable databases to check matching "post"/ ],
'checking an unresolvable database pattern (substring of existent database)');
# Check that a superstring of an existent database name does not get interpreted
# as a matching pattern.
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'postgresql', '-d', 'postgres' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no connectable databases to check matching "postgresql"/ ],
'checking an unresolvable database pattern (superstring of existent database)');
#########################################
# Test connecting with a non-existent user
# Failing to connect to the initial database due to bad username is an error.
$node->command_checks_all(
[ 'pg_amcheck', '-U', 'no_such_user', 'postgres' ],
1,
[ qr/^$/ ],
[ qr/role "no_such_user" does not exist/ ],
'checking with a non-existent user');
# Failing to connect to the initial database due to bad username is an still an
# error under --no-strict-names.
$node->command_checks_all(
[ 'pg_amcheck', '--no-strict-names', '-U', 'no_such_user', 'postgres' ],
1,
[ qr/^$/ ],
[ qr/role "no_such_user" does not exist/ ],
'checking with a non-existent user under --no-strict-names');
#########################################
# Test checking databases without amcheck installed
# Attempting to check a database by name where amcheck is not installed should
# raise a warning. If all databases are skipped, having no relations to check
# raises an error.
$node->command_checks_all(
[ 'pg_amcheck', 'template1' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/,
qr/pg_amcheck: error: no relations to check/ ],
'checking a database by name without amcheck installed, no other databases');
# Again, but this time with another database to check, so no error is raised.
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'template1', '-d', 'postgres' ],
0,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/ ],
'checking a database by name without amcheck installed, with other databases');
# Again, but by way of checking all databases
$node->command_checks_all(
[ 'pg_amcheck', '--all' ],
0,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/ ],
'checking a database by pattern without amcheck installed, with other databases');
#########################################
# Test unreasonable patterns
# Check three-part unreasonable pattern that has zero-length names
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'postgres', '-t', '..' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no connectable databases to check matching "\.\."/ ],
'checking table pattern ".."');
# Again, but with non-trivial schema and relation parts
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'postgres', '-t', '.foo.bar' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no connectable databases to check matching "\.foo\.bar"/ ],
'checking table pattern ".foo.bar"');
# Check two-part unreasonable pattern that has zero-length names
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'postgres', '-t', '.' ],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: error: no heap tables to check matching "\."/ ],
'checking table pattern "."');
#########################################
# Test checking non-existent databases, schemas, tables, and indexes
# Use --no-strict-names and a single existent table so we only get warnings
# about the failed pattern matches
$node->command_checks_all(
[ 'pg_amcheck', '--no-strict-names',
'-t', 'no_such_table',
'-t', 'no*such*table',
'-i', 'no_such_index',
'-i', 'no*such*index',
'-r', 'no_such_relation',
'-r', 'no*such*relation',
'-d', 'no_such_database',
'-d', 'no*such*database',
'-r', 'none.none',
'-r', 'none.none.none',
'-r', 'this.is.a.really.long.dotted.string',
'-r', 'postgres.none.none',
'-r', 'postgres.long.dotted.string',
'-r', 'postgres.pg_catalog.none',
'-r', 'postgres.none.pg_class',
'-t', 'postgres.pg_catalog.pg_class', # This exists
],
0,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: no heap tables to check matching "no_such_table"/,
qr/pg_amcheck: warning: no heap tables to check matching "no\*such\*table"/,
qr/pg_amcheck: warning: no btree indexes to check matching "no_such_index"/,
qr/pg_amcheck: warning: no btree indexes to check matching "no\*such\*index"/,
qr/pg_amcheck: warning: no relations to check matching "no_such_relation"/,
qr/pg_amcheck: warning: no relations to check matching "no\*such\*relation"/,
qr/pg_amcheck: warning: no heap tables to check matching "no\*such\*table"/,
qr/pg_amcheck: warning: no connectable databases to check matching "no_such_database"/,
qr/pg_amcheck: warning: no connectable databases to check matching "no\*such\*database"/,
qr/pg_amcheck: warning: no relations to check matching "none\.none"/,
qr/pg_amcheck: warning: no connectable databases to check matching "none\.none\.none"/,
qr/pg_amcheck: warning: no connectable databases to check matching "this\.is\.a\.really\.long\.dotted\.string"/,
qr/pg_amcheck: warning: no relations to check matching "postgres\.none\.none"/,
qr/pg_amcheck: warning: no relations to check matching "postgres\.long\.dotted\.string"/,
qr/pg_amcheck: warning: no relations to check matching "postgres\.pg_catalog\.none"/,
qr/pg_amcheck: warning: no relations to check matching "postgres\.none\.pg_class"/,
],
'many unmatched patterns and one matched pattern under --no-strict-names');
#########################################
# Test checking otherwise existent objects but in databases where they do not exist
$node->safe_psql('postgres', q(
CREATE TABLE public.foo (f integer);
CREATE INDEX foo_idx ON foo(f);
));
$node->safe_psql('postgres', q(CREATE DATABASE another_db));
$node->command_checks_all(
[ 'pg_amcheck', '-d', 'postgres', '--no-strict-names',
'-t', 'template1.public.foo',
'-t', 'another_db.public.foo',
'-t', 'no_such_database.public.foo',
'-i', 'template1.public.foo_idx',
'-i', 'another_db.public.foo_idx',
'-i', 'no_such_database.public.foo_idx',
],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/,
qr/pg_amcheck: warning: no heap tables to check matching "template1\.public\.foo"/,
qr/pg_amcheck: warning: no heap tables to check matching "another_db\.public\.foo"/,
qr/pg_amcheck: warning: no connectable databases to check matching "no_such_database\.public\.foo"/,
qr/pg_amcheck: warning: no btree indexes to check matching "template1\.public\.foo_idx"/,
qr/pg_amcheck: warning: no btree indexes to check matching "another_db\.public\.foo_idx"/,
qr/pg_amcheck: warning: no connectable databases to check matching "no_such_database\.public\.foo_idx"/,
qr/pg_amcheck: error: no relations to check/,
],
'checking otherwise existent objets in the wrong databases');
#########################################
# Test schema exclusion patterns
# Check with only schema exclusion patterns
$node->command_checks_all(
[ 'pg_amcheck', '--all', '--no-strict-names',
'-S', 'public',
'-S', 'pg_catalog',
'-S', 'pg_toast',
'-S', 'information_schema',
],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/,
qr/pg_amcheck: error: no relations to check/ ],
'schema exclusion patterns exclude all relations');
# Check with schema exclusion patterns overriding relation and schema inclusion patterns
$node->command_checks_all(
[ 'pg_amcheck', '--all', '--no-strict-names',
'-s', 'public',
'-s', 'pg_catalog',
'-s', 'pg_toast',
'-s', 'information_schema',
'-t', 'pg_catalog.pg_class',
'-S', '*'
],
1,
[ qr/^$/ ],
[ qr/pg_amcheck: warning: skipping database "template1": amcheck is not installed/,
qr/pg_amcheck: error: no relations to check/ ],
'schema exclusion pattern overrides all inclusion patterns');
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 60;
my ($node, $port, %corrupt_page, %remove_relation);
# Returns the filesystem path for the named relation.
#
# Assumes the test node is running
sub relation_filepath($$)
{
my ($dbname, $relname) = @_;
my $pgdata = $node->data_dir;
my $rel = $node->safe_psql($dbname,
qq(SELECT pg_relation_filepath('$relname')));
die "path not found for relation $relname" unless defined $rel;
return "$pgdata/$rel";
}
# Returns the name of the toast relation associated with the named relation.
#
# Assumes the test node is running
sub relation_toast($$)
{
my ($dbname, $relname) = @_;
my $rel = $node->safe_psql($dbname, qq(
SELECT c.reltoastrelid::regclass
FROM pg_catalog.pg_class c
WHERE c.oid = '$relname'::regclass
AND c.reltoastrelid != 0
));
return undef unless defined $rel;
return $rel;
}
# Adds the relation file for the given (dbname, relname) to the list
# to be corrupted by means of overwriting junk in the first page.
#
# Assumes the test node is running.
sub plan_to_corrupt_first_page($$)
{
my ($dbname, $relname) = @_;
my $relpath = relation_filepath($dbname, $relname);
$corrupt_page{$relpath} = 1;
}
# Adds the relation file for the given (dbname, relname) to the list
# to be corrupted by means of removing the file..
#
# Assumes the test node is running
sub plan_to_remove_relation_file($$)
{
my ($dbname, $relname) = @_;
my $relpath = relation_filepath($dbname, $relname);
$remove_relation{$relpath} = 1;
}
# For the given (dbname, relname), if a corresponding toast table
# exists, adds that toast table's relation file to the list to be
# corrupted by means of removing the file.
#
# Assumes the test node is running.
sub plan_to_remove_toast_file($$)
{
my ($dbname, $relname) = @_;
my $toastname = relation_toast($dbname, $relname);
plan_to_remove_relation_file($dbname, $toastname) if ($toastname);
}
# Corrupts the first page of the given file path
sub corrupt_first_page($)
{
my ($relpath) = @_;
my $fh;
open($fh, '+<', $relpath)
or BAIL_OUT("open failed: $!");
binmode $fh;
# Corrupt some line pointers. The values are chosen to hit the
# various line-pointer-corruption checks in verify_heapam.c
# on both little-endian and big-endian architectures.
seek($fh, 32, 0)
or BAIL_OUT("seek failed: $!");
syswrite(
$fh,
pack("L*",
0xAAA15550, 0xAAA0D550, 0x00010000,
0x00008000, 0x0000800F, 0x001e8000,
0xFFFFFFFF)
) or BAIL_OUT("syswrite failed: $!");
close($fh)
or BAIL_OUT("close failed: $!");
}
# Stops the node, performs all the corruptions previously planned, and
# starts the node again.
#
sub perform_all_corruptions()
{
$node->stop();
for my $relpath (keys %corrupt_page)
{
corrupt_first_page($relpath);
}
for my $relpath (keys %remove_relation)
{
unlink($relpath);
}
$node->start;
}
# Test set-up
$node = get_new_node('test');
$node->init;
$node->start;
$port = $node->port;
for my $dbname (qw(db1 db2 db3))
{
# Create the database
$node->safe_psql('postgres', qq(CREATE DATABASE $dbname));
# Load the amcheck extension, upon which pg_amcheck depends. Put the
# extension in an unexpected location to test that pg_amcheck finds it
# correctly. Create tables with names that look like pg_catalog names to
# check that pg_amcheck does not get confused by them. Create functions in
# schema public that look like amcheck functions to check that pg_amcheck
# does not use them.
$node->safe_psql($dbname, q(
CREATE SCHEMA amcheck_schema;
CREATE EXTENSION amcheck WITH SCHEMA amcheck_schema;
CREATE TABLE amcheck_schema.pg_database (junk text);
CREATE TABLE amcheck_schema.pg_namespace (junk text);
CREATE TABLE amcheck_schema.pg_class (junk text);
CREATE TABLE amcheck_schema.pg_operator (junk text);
CREATE TABLE amcheck_schema.pg_proc (junk text);
CREATE TABLE amcheck_schema.pg_tablespace (junk text);
CREATE FUNCTION public.bt_index_check(index regclass,
heapallindexed boolean default false)
RETURNS VOID AS $$
BEGIN
RAISE EXCEPTION 'Invoked wrong bt_index_check!';
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION public.bt_index_parent_check(index regclass,
heapallindexed boolean default false,
rootdescend boolean default false)
RETURNS VOID AS $$
BEGIN
RAISE EXCEPTION 'Invoked wrong bt_index_parent_check!';
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION public.verify_heapam(relation regclass,
on_error_stop boolean default false,
check_toast boolean default false,
skip text default 'none',
startblock bigint default null,
endblock bigint default null,
blkno OUT bigint,
offnum OUT integer,
attnum OUT integer,
msg OUT text)
RETURNS SETOF record AS $$
BEGIN
RAISE EXCEPTION 'Invoked wrong verify_heapam!';
END;
$$ LANGUAGE plpgsql;
));
# Create schemas, tables and indexes in five separate
# schemas. The schemas are all identical to start, but
# we will corrupt them differently later.
#
for my $schema (qw(s1 s2 s3 s4 s5))
{
$node->safe_psql($dbname, qq(
CREATE SCHEMA $schema;
CREATE SEQUENCE $schema.seq1;
CREATE SEQUENCE $schema.seq2;
CREATE TABLE $schema.t1 (
i INTEGER,
b BOX,
ia int4[],
ir int4range,
t TEXT
);
CREATE TABLE $schema.t2 (
i INTEGER,
b BOX,
ia int4[],
ir int4range,
t TEXT
);
CREATE VIEW $schema.t2_view AS (
SELECT i*2, t FROM $schema.t2
);
ALTER TABLE $schema.t2
ALTER COLUMN t
SET STORAGE EXTERNAL;
INSERT INTO $schema.t1 (i, b, ia, ir, t)
(SELECT gs::INTEGER AS i,
box(point(gs,gs+5),point(gs*2,gs*3)) AS b,
array[gs, gs + 1]::int4[] AS ia,
int4range(gs, gs+100) AS ir,
repeat('foo', gs) AS t
FROM generate_series(1,10000,3000) AS gs);
INSERT INTO $schema.t2 (i, b, ia, ir, t)
(SELECT gs::INTEGER AS i,
box(point(gs,gs+5),point(gs*2,gs*3)) AS b,
array[gs, gs + 1]::int4[] AS ia,
int4range(gs, gs+100) AS ir,
repeat('foo', gs) AS t
FROM generate_series(1,10000,3000) AS gs);
CREATE MATERIALIZED VIEW $schema.t1_mv AS SELECT * FROM $schema.t1;
CREATE MATERIALIZED VIEW $schema.t2_mv AS SELECT * FROM $schema.t2;
create table $schema.p1 (a int, b int) PARTITION BY list (a);
create table $schema.p2 (a int, b int) PARTITION BY list (a);
create table $schema.p1_1 partition of $schema.p1 for values in (1, 2, 3);
create table $schema.p1_2 partition of $schema.p1 for values in (4, 5, 6);
create table $schema.p2_1 partition of $schema.p2 for values in (1, 2, 3);
create table $schema.p2_2 partition of $schema.p2 for values in (4, 5, 6);
CREATE INDEX t1_btree ON $schema.t1 USING BTREE (i);
CREATE INDEX t2_btree ON $schema.t2 USING BTREE (i);
CREATE INDEX t1_hash ON $schema.t1 USING HASH (i);
CREATE INDEX t2_hash ON $schema.t2 USING HASH (i);
CREATE INDEX t1_brin ON $schema.t1 USING BRIN (i);
CREATE INDEX t2_brin ON $schema.t2 USING BRIN (i);
CREATE INDEX t1_gist ON $schema.t1 USING GIST (b);
CREATE INDEX t2_gist ON $schema.t2 USING GIST (b);
CREATE INDEX t1_gin ON $schema.t1 USING GIN (ia);
CREATE INDEX t2_gin ON $schema.t2 USING GIN (ia);
CREATE INDEX t1_spgist ON $schema.t1 USING SPGIST (ir);
CREATE INDEX t2_spgist ON $schema.t2 USING SPGIST (ir);
));
}
}
# Database 'db1' corruptions
#
# Corrupt indexes in schema "s1"
plan_to_remove_relation_file('db1', 's1.t1_btree');
plan_to_corrupt_first_page('db1', 's1.t2_btree');
# Corrupt tables in schema "s2"
plan_to_remove_relation_file('db1', 's2.t1');
plan_to_corrupt_first_page('db1', 's2.t2');
# Corrupt tables, partitions, matviews, and btrees in schema "s3"
plan_to_remove_relation_file('db1', 's3.t1');
plan_to_corrupt_first_page('db1', 's3.t2');
plan_to_remove_relation_file('db1', 's3.t1_mv');
plan_to_remove_relation_file('db1', 's3.p1_1');
plan_to_corrupt_first_page('db1', 's3.t2_mv');
plan_to_corrupt_first_page('db1', 's3.p2_1');
plan_to_remove_relation_file('db1', 's3.t1_btree');
plan_to_corrupt_first_page('db1', 's3.t2_btree');
# Corrupt toast table, partitions, and materialized views in schema "s4"
plan_to_remove_toast_file('db1', 's4.t2');
# Corrupt all other object types in schema "s5". We don't have amcheck support
# for these types, but we check that their corruption does not trigger any
# errors in pg_amcheck
plan_to_remove_relation_file('db1', 's5.seq1');
plan_to_remove_relation_file('db1', 's5.t1_hash');
plan_to_remove_relation_file('db1', 's5.t1_gist');
plan_to_remove_relation_file('db1', 's5.t1_gin');
plan_to_remove_relation_file('db1', 's5.t1_brin');
plan_to_remove_relation_file('db1', 's5.t1_spgist');
plan_to_corrupt_first_page('db1', 's5.seq2');
plan_to_corrupt_first_page('db1', 's5.t2_hash');
plan_to_corrupt_first_page('db1', 's5.t2_gist');
plan_to_corrupt_first_page('db1', 's5.t2_gin');
plan_to_corrupt_first_page('db1', 's5.t2_brin');
plan_to_corrupt_first_page('db1', 's5.t2_spgist');
# Database 'db2' corruptions
#
plan_to_remove_relation_file('db2', 's1.t1');
plan_to_remove_relation_file('db2', 's1.t1_btree');
# Leave 'db3' uncorrupted
#
# Perform the corruptions we planned above using only a single database restart.
#
perform_all_corruptions();
# Standard first arguments to TestLib functions
my @cmd = ('pg_amcheck', '--quiet', '-p', $port);
# Regular expressions to match various expected output
my $no_output_re = qr/^$/;
my $line_pointer_corruption_re = qr/line pointer/;
my $missing_file_re = qr/could not open file ".*": No such file or directory/;
my $index_missing_relation_fork_re = qr/index ".*" lacks a main relation fork/;
# Checking databases with amcheck installed and corrupt relations, pg_amcheck
# command itself should return exit status = 2, because tables and indexes are
# corrupt, not exit status = 1, which would mean the pg_amcheck command itself
# failed. Corruption messages should go to stdout, and nothing to stderr.
#
$node->command_checks_all(
[ @cmd, 'db1' ],
2,
[ $index_missing_relation_fork_re,
$line_pointer_corruption_re,
$missing_file_re,
],
[ $no_output_re ],
'pg_amcheck all schemas, tables and indexes in database db1');
$node->command_checks_all(
[ @cmd, '-d', 'db1', '-d', 'db2', '-d', 'db3' ],
2,
[ $index_missing_relation_fork_re,
$line_pointer_corruption_re,
$missing_file_re,
],
[ $no_output_re ],
'pg_amcheck all schemas, tables and indexes in databases db1, db2, and db3');
# Scans of indexes in s1 should detect the specific corruption that we created
# above. For missing relation forks, we know what the error message looks
# like. For corrupted index pages, the error might vary depending on how the
# page was formatted on disk, including variations due to alignment differences
# between platforms, so we accept any non-empty error message.
#
# If we don't limit the check to databases with amcheck installed, we expect
# complaint on stderr, but otherwise stderr should be quiet.
#
$node->command_checks_all(
[ @cmd, '--all', '-s', 's1', '-i', 't1_btree' ],
2,
[ $index_missing_relation_fork_re ],
[ qr/pg_amcheck: warning: skipping database "postgres": amcheck is not installed/ ],
'pg_amcheck index s1.t1_btree reports missing main relation fork');
$node->command_checks_all(
[ @cmd, '-d', 'db1', '-s', 's1', '-i', 't2_btree' ],
2,
[ qr/.+/ ], # Any non-empty error message is acceptable
[ $no_output_re ],
'pg_amcheck index s1.s2 reports index corruption');
# Checking db1.s1 with indexes excluded should show no corruptions because we
# did not corrupt any tables in db1.s1. Verify that both stdout and stderr
# are quiet.
#
$node->command_checks_all(
[ @cmd, 'db1', '-t', 's1.*', '--no-dependent-indexes' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck of db1.s1 excluding indexes');
# Checking db2.s1 should show table corruptions if indexes are excluded
#
$node->command_checks_all(
[ @cmd, 'db2', '-t', 's1.*', '--no-dependent-indexes' ],
2,
[ $missing_file_re ],
[ $no_output_re ],
'pg_amcheck of db2.s1 excluding indexes');
# In schema db1.s3, the tables and indexes are both corrupt. We should see
# corruption messages on stdout, and nothing on stderr.
#
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's3' ],
2,
[ $index_missing_relation_fork_re,
$line_pointer_corruption_re,
$missing_file_re,
],
[ $no_output_re ],
'pg_amcheck schema s3 reports table and index errors');
# In schema db1.s4, only toast tables are corrupt. Check that under default
# options the toast corruption is reported, but when excluding toast we get no
# error reports.
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's4' ],
2,
[ $missing_file_re ],
[ $no_output_re ],
'pg_amcheck in schema s4 reports toast corruption');
$node->command_checks_all(
[ @cmd, '--no-dependent-toast', '--exclude-toast-pointers', 'db1', '-s', 's4' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck in schema s4 excluding toast reports no corruption');
# Check that no corruption is reported in schema db1.s5
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's5' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck over schema s5 reports no corruption');
# In schema db1.s1, only indexes are corrupt. Verify that when we exclude
# the indexes, no corruption is reported about the schema.
#
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's1', '-I', 't1_btree', '-I', 't2_btree' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck over schema s1 with corrupt indexes excluded reports no corruption');
# In schema db1.s1, only indexes are corrupt. Verify that when we provide only
# table inclusions, and disable index expansion, no corruption is reported
# about the schema.
#
$node->command_checks_all(
[ @cmd, 'db1', '-t', 's1.*', '--no-dependent-indexes' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck over schema s1 with all indexes excluded reports no corruption');
# In schema db1.s2, only tables are corrupt. Verify that when we exclude those
# tables that no corruption is reported.
#
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's2', '-T', 't1', '-T', 't2' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck over schema s2 with corrupt tables excluded reports no corruption');
# Check errors about bad block range command line arguments. We use schema s5
# to avoid getting messages about corrupt tables or indexes.
#
command_fails_like(
[ @cmd, 'db1', '-s', 's5', '--startblock', 'junk' ],
qr/invalid start block/,
'pg_amcheck rejects garbage startblock');
command_fails_like(
[ @cmd, 'db1', '-s', 's5', '--endblock', '1234junk' ],
qr/invalid end block/,
'pg_amcheck rejects garbage endblock');
command_fails_like(
[ @cmd, 'db1', '-s', 's5', '--startblock', '5', '--endblock', '4' ],
qr/end block precedes start block/,
'pg_amcheck rejects invalid block range');
# Check bt_index_parent_check alternates. We don't create any index corruption
# that would behave differently under these modes, so just smoke test that the
# arguments are handled sensibly.
#
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's1', '-i', 't1_btree', '--parent-check' ],
2,
[ $index_missing_relation_fork_re ],
[ $no_output_re ],
'pg_amcheck smoke test --parent-check');
$node->command_checks_all(
[ @cmd, 'db1', '-s', 's1', '-i', 't1_btree', '--heapallindexed', '--rootdescend' ],
2,
[ $index_missing_relation_fork_re ],
[ $no_output_re ],
'pg_amcheck smoke test --heapallindexed --rootdescend');
$node->command_checks_all(
[ @cmd, '-d', 'db1', '-d', 'db2', '-d', 'db3', '-S', 's*' ],
0,
[ $no_output_re ],
[ $no_output_re ],
'pg_amcheck excluding all corrupt schemas');
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More;
# This regression test demonstrates that the pg_amcheck binary correctly
# identifies specific kinds of corruption within pages. To test this, we need
# a mechanism to create corrupt pages with predictable, repeatable corruption.
# The postgres backend cannot be expected to help us with this, as its design
# is not consistent with the goal of intentionally corrupting pages.
#
# Instead, we create a table to corrupt, and with careful consideration of how
# postgresql lays out heap pages, we seek to offsets within the page and
# overwrite deliberately chosen bytes with specific values calculated to
# corrupt the page in expected ways. We then verify that pg_amcheck reports
# the corruption, and that it runs without crashing. Note that the backend
# cannot simply be started to run queries against the corrupt table, as the
# backend will crash, at least for some of the corruption types we generate.
#
# Autovacuum potentially touching the table in the background makes the exact
# behavior of this test harder to reason about. We turn it off to keep things
# simpler. We use a "belt and suspenders" approach, turning it off for the
# system generally in postgresql.conf, and turning it off specifically for the
# test table.
#
# This test depends on the table being written to the heap file exactly as we
# expect it to be, so we take care to arrange the columns of the table, and
# insert rows of the table, that give predictable sizes and locations within
# the table page.
#
# The HeapTupleHeaderData has 23 bytes of fixed size fields before the variable
# length t_bits[] array. We have exactly 3 columns in the table, so natts = 3,
# t_bits is 1 byte long, and t_hoff = MAXALIGN(23 + 1) = 24.
#
# We're not too fussy about which datatypes we use for the test, but we do care
# about some specific properties. We'd like to test both fixed size and
# varlena types. We'd like some varlena data inline and some toasted. And
# we'd like the layout of the table such that the datums land at predictable
# offsets within the tuple. We choose a structure without padding on all
# supported architectures:
#
# a BIGINT
# b TEXT
# c TEXT
#
# We always insert a 7-ascii character string into field 'b', which with a
# 1-byte varlena header gives an 8 byte inline value. We always insert a long
# text string in field 'c', long enough to force toast storage.
#
# We choose to read and write binary copies of our table's tuples, using perl's
# pack() and unpack() functions. Perl uses a packing code system in which:
#
# L = "Unsigned 32-bit Long",
# S = "Unsigned 16-bit Short",
# C = "Unsigned 8-bit Octet",
# c = "signed 8-bit octet",
# q = "signed 64-bit quadword"
#
# Each tuple in our table has a layout as follows:
#
# xx xx xx xx t_xmin: xxxx offset = 0 L
# xx xx xx xx t_xmax: xxxx offset = 4 L
# xx xx xx xx t_field3: xxxx offset = 8 L
# xx xx bi_hi: xx offset = 12 S
# xx xx bi_lo: xx offset = 14 S
# xx xx ip_posid: xx offset = 16 S
# xx xx t_infomask2: xx offset = 18 S
# xx xx t_infomask: xx offset = 20 S
# xx t_hoff: x offset = 22 C
# xx t_bits: x offset = 23 C
# xx xx xx xx xx xx xx xx 'a': xxxxxxxx offset = 24 q
# xx xx xx xx xx xx xx xx 'b': xxxxxxxx offset = 32 Cccccccc
# xx xx xx xx xx xx xx xx 'c': xxxxxxxx offset = 40 SSSS
# xx xx xx xx xx xx xx xx : xxxxxxxx ...continued SSSS
# xx xx : xx ...continued S
#
# We could choose to read and write columns 'b' and 'c' in other ways, but
# it is convenient enough to do it this way. We define packing code
# constants here, where they can be compared easily against the layout.
use constant HEAPTUPLE_PACK_CODE => 'LLLSSSSSCCqCcccccccSSSSSSSSS';
use constant HEAPTUPLE_PACK_LENGTH => 58; # Total size
# Read a tuple of our table from a heap page.
#
# Takes an open filehandle to the heap file, and the offset of the tuple.
#
# Rather than returning the binary data from the file, unpacks the data into a
# perl hash with named fields. These fields exactly match the ones understood
# by write_tuple(), below. Returns a reference to this hash.
#
sub read_tuple ($$)
{
my ($fh, $offset) = @_;
my ($buffer, %tup);
seek($fh, $offset, 0)
or BAIL_OUT("seek failed: $!");
defined(sysread($fh, $buffer, HEAPTUPLE_PACK_LENGTH))
or BAIL_OUT("sysread failed: $!");
@_ = unpack(HEAPTUPLE_PACK_CODE, $buffer);
%tup = (t_xmin => shift,
t_xmax => shift,
t_field3 => shift,
bi_hi => shift,
bi_lo => shift,
ip_posid => shift,
t_infomask2 => shift,
t_infomask => shift,
t_hoff => shift,
t_bits => shift,
a => shift,
b_header => shift,
b_body1 => shift,
b_body2 => shift,
b_body3 => shift,
b_body4 => shift,
b_body5 => shift,
b_body6 => shift,
b_body7 => shift,
c1 => shift,
c2 => shift,
c3 => shift,
c4 => shift,
c5 => shift,
c6 => shift,
c7 => shift,
c8 => shift,
c9 => shift);
# Stitch together the text for column 'b'
$tup{b} = join('', map { chr($tup{"b_body$_"}) } (1..7));
return \%tup;
}
# Write a tuple of our table to a heap page.
#
# Takes an open filehandle to the heap file, the offset of the tuple, and a
# reference to a hash with the tuple values, as returned by read_tuple().
# Writes the tuple fields from the hash into the heap file.
#
# The purpose of this function is to write a tuple back to disk with some
# subset of fields modified. The function does no error checking. Use
# cautiously.
#
sub write_tuple($$$)
{
my ($fh, $offset, $tup) = @_;
my $buffer = pack(HEAPTUPLE_PACK_CODE,
$tup->{t_xmin},
$tup->{t_xmax},
$tup->{t_field3},
$tup->{bi_hi},
$tup->{bi_lo},
$tup->{ip_posid},
$tup->{t_infomask2},
$tup->{t_infomask},
$tup->{t_hoff},
$tup->{t_bits},
$tup->{a},
$tup->{b_header},
$tup->{b_body1},
$tup->{b_body2},
$tup->{b_body3},
$tup->{b_body4},
$tup->{b_body5},
$tup->{b_body6},
$tup->{b_body7},
$tup->{c1},
$tup->{c2},
$tup->{c3},
$tup->{c4},
$tup->{c5},
$tup->{c6},
$tup->{c7},
$tup->{c8},
$tup->{c9});
seek($fh, $offset, 0)
or BAIL_OUT("seek failed: $!");
defined(syswrite($fh, $buffer, HEAPTUPLE_PACK_LENGTH))
or BAIL_OUT("syswrite failed: $!");;
return;
}
# Set umask so test directories and files are created with default permissions
umask(0077);
# Set up the node. Once we create and corrupt the table,
# autovacuum workers visiting the table could crash the backend.
# Disable autovacuum so that won't happen.
my $node = get_new_node('test');
$node->init;
$node->append_conf('postgresql.conf', 'autovacuum=off');
# Start the node and load the extensions. We depend on both
# amcheck and pageinspect for this test.
$node->start;
my $port = $node->port;
my $pgdata = $node->data_dir;
$node->safe_psql('postgres', "CREATE EXTENSION amcheck");
$node->safe_psql('postgres', "CREATE EXTENSION pageinspect");
# Get a non-zero datfrozenxid
$node->safe_psql('postgres', qq(VACUUM FREEZE));
# Create the test table with precisely the schema that our corruption function
# expects.
$node->safe_psql(
'postgres', qq(
CREATE TABLE public.test (a BIGINT, b TEXT, c TEXT);
ALTER TABLE public.test SET (autovacuum_enabled=false);
ALTER TABLE public.test ALTER COLUMN c SET STORAGE EXTERNAL;
CREATE INDEX test_idx ON public.test(a, b);
));
# We want (0 < datfrozenxid < test.relfrozenxid). To achieve this, we freeze
# an otherwise unused table, public.junk, prior to inserting data and freezing
# public.test
$node->safe_psql(
'postgres', qq(
CREATE TABLE public.junk AS SELECT 'junk'::TEXT AS junk_column;
ALTER TABLE public.junk SET (autovacuum_enabled=false);
VACUUM FREEZE public.junk
));
my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.test')));
my $relpath = "$pgdata/$rel";
# Insert data and freeze public.test
use constant ROWCOUNT => 16;
$node->safe_psql('postgres', qq(
INSERT INTO public.test (a, b, c)
VALUES (
12345678,
'abcdefg',
repeat('w', 10000)
);
VACUUM FREEZE public.test
)) for (1..ROWCOUNT);
my $relfrozenxid = $node->safe_psql('postgres',
q(select relfrozenxid from pg_class where relname = 'test'));
my $datfrozenxid = $node->safe_psql('postgres',
q(select datfrozenxid from pg_database where datname = 'postgres'));
# Sanity check that our 'test' table has a relfrozenxid newer than the
# datfrozenxid for the database, and that the datfrozenxid is greater than the
# first normal xid. We rely on these invariants in some of our tests.
if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
{
$node->clean_node;
plan skip_all => "Xid thresholds not as expected: got datfrozenxid = $datfrozenxid, relfrozenxid = $relfrozenxid";
exit;
}
# Find where each of the tuples is located on the page.
my @lp_off;
for my $tup (0..ROWCOUNT-1)
{
push (@lp_off, $node->safe_psql('postgres', qq(
select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
offset $tup limit 1)));
}
# Sanity check that our 'test' table on disk layout matches expectations. If
# this is not so, we will have to skip the test until somebody updates the test
# to work on this platform.
$node->stop;
my $file;
open($file, '+<', $relpath)
or BAIL_OUT("open failed: $!");
binmode $file;
for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
{
my $offnum = $tupidx + 1; # offnum is 1-based, not zero-based
my $offset = $lp_off[$tupidx];
my $tup = read_tuple($file, $offset);
# Sanity-check that the data appears on the page where we expect.
my $a = $tup->{a};
my $b = $tup->{b};
if ($a ne '12345678' || $b ne 'abcdefg')
{
close($file); # ignore errors on close; we're exiting anyway
$node->clean_node;
plan skip_all => qq(Page layout differs from our expectations: expected (12345678, "abcdefg"), got ($a, "$b"));
exit;
}
}
close($file)
or BAIL_OUT("close failed: $!");
$node->start;
# Ok, Xids and page layout look ok. We can run corruption tests.
plan tests => 20;
# Check that pg_amcheck runs against the uncorrupted table without error.
$node->command_ok(['pg_amcheck', '-p', $port, 'postgres'],
'pg_amcheck test table, prior to corruption');
# Check that pg_amcheck runs against the uncorrupted table and index without error.
$node->command_ok(['pg_amcheck', '-p', $port, 'postgres'],
'pg_amcheck test table and index, prior to corruption');
$node->stop;
# Some #define constants from access/htup_details.h for use while corrupting.
use constant HEAP_HASNULL => 0x0001;
use constant HEAP_XMAX_LOCK_ONLY => 0x0080;
use constant HEAP_XMIN_COMMITTED => 0x0100;
use constant HEAP_XMIN_INVALID => 0x0200;
use constant HEAP_XMAX_COMMITTED => 0x0400;
use constant HEAP_XMAX_INVALID => 0x0800;
use constant HEAP_NATTS_MASK => 0x07FF;
use constant HEAP_XMAX_IS_MULTI => 0x1000;
use constant HEAP_KEYS_UPDATED => 0x2000;
# Helper function to generate a regular expression matching the header we
# expect verify_heapam() to return given which fields we expect to be non-null.
sub header
{
my ($blkno, $offnum, $attnum) = @_;
return qr/heap table "postgres"\."public"\."test", block $blkno, offset $offnum, attribute $attnum:\s+/ms
if (defined $attnum);
return qr/heap table "postgres"\."public"\."test", block $blkno, offset $offnum:\s+/ms
if (defined $offnum);
return qr/heap table "postgres"\."public"\."test", block $blkno:\s+/ms
if (defined $blkno);
return qr/heap table "postgres"\."public"\."test":\s+/ms;
}
# Corrupt the tuples, one type of corruption per tuple. Some types of
# corruption cause verify_heapam to skip to the next tuple without
# performing any remaining checks, so we can't exercise the system properly if
# we focus all our corruption on a single tuple.
#
my @expected;
open($file, '+<', $relpath)
or BAIL_OUT("open failed: $!");
binmode $file;
for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
{
my $offnum = $tupidx + 1; # offnum is 1-based, not zero-based
my $offset = $lp_off[$tupidx];
my $tup = read_tuple($file, $offset);
my $header = header(0, $offnum, undef);
if ($offnum == 1)
{
# Corruptly set xmin < relfrozenxid
my $xmin = $relfrozenxid - 1;
$tup->{t_xmin} = $xmin;
$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
# Expected corruption report
push @expected,
qr/${header}xmin $xmin precedes relation freeze threshold 0:\d+/;
}
if ($offnum == 2)
{
# Corruptly set xmin < datfrozenxid
my $xmin = 3;
$tup->{t_xmin} = $xmin;
$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
push @expected,
qr/${$header}xmin $xmin precedes oldest valid transaction ID 0:\d+/;
}
elsif ($offnum == 3)
{
# Corruptly set xmin < datfrozenxid, further back, noting circularity
# of xid comparison. For a new cluster with epoch = 0, the corrupt
# xmin will be interpreted as in the future
$tup->{t_xmin} = 4026531839;
$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
push @expected,
qr/${$header}xmin 4026531839 equals or exceeds next valid transaction ID 0:\d+/;
}
elsif ($offnum == 4)
{
# Corruptly set xmax < relminmxid;
$tup->{t_xmax} = 4026531839;
$tup->{t_infomask} &= ~HEAP_XMAX_INVALID;
push @expected,
qr/${$header}xmax 4026531839 equals or exceeds next valid transaction ID 0:\d+/;
}
elsif ($offnum == 5)
{
# Corrupt the tuple t_hoff, but keep it aligned properly
$tup->{t_hoff} += 128;
push @expected,
qr/${$header}data begins at offset 152 beyond the tuple length 58/,
qr/${$header}tuple data should begin at byte 24, but actually begins at byte 152 \(3 attributes, no nulls\)/;
}
elsif ($offnum == 6)
{
# Corrupt the tuple t_hoff, wrong alignment
$tup->{t_hoff} += 3;
push @expected,
qr/${$header}tuple data should begin at byte 24, but actually begins at byte 27 \(3 attributes, no nulls\)/;
}
elsif ($offnum == 7)
{
# Corrupt the tuple t_hoff, underflow but correct alignment
$tup->{t_hoff} -= 8;
push @expected,
qr/${$header}tuple data should begin at byte 24, but actually begins at byte 16 \(3 attributes, no nulls\)/;
}
elsif ($offnum == 8)
{
# Corrupt the tuple t_hoff, underflow and wrong alignment
$tup->{t_hoff} -= 3;
push @expected,
qr/${$header}tuple data should begin at byte 24, but actually begins at byte 21 \(3 attributes, no nulls\)/;
}
elsif ($offnum == 9)
{
# Corrupt the tuple to look like it has lots of attributes, not just 3
$tup->{t_infomask2} |= HEAP_NATTS_MASK;
push @expected,
qr/${$header}number of attributes 2047 exceeds maximum expected for table 3/;
}
elsif ($offnum == 10)
{
# Corrupt the tuple to look like it has lots of attributes, some of
# them null. This falsely creates the impression that the t_bits
# array is longer than just one byte, but t_hoff still says otherwise.
$tup->{t_infomask} |= HEAP_HASNULL;
$tup->{t_infomask2} |= HEAP_NATTS_MASK;
$tup->{t_bits} = 0xAA;
push @expected,
qr/${$header}tuple data should begin at byte 280, but actually begins at byte 24 \(2047 attributes, has nulls\)/;
}
elsif ($offnum == 11)
{
# Same as above, but this time t_hoff plays along
$tup->{t_infomask} |= HEAP_HASNULL;
$tup->{t_infomask2} |= (HEAP_NATTS_MASK & 0x40);
$tup->{t_bits} = 0xAA;
$tup->{t_hoff} = 32;
push @expected,
qr/${$header}number of attributes 67 exceeds maximum expected for table 3/;
}
elsif ($offnum == 12)
{
# Corrupt the bits in column 'b' 1-byte varlena header
$tup->{b_header} = 0x80;
$header = header(0, $offnum, 1);
push @expected,
qr/${header}attribute 1 with length 4294967295 ends at offset 416848000 beyond total tuple length 58/;
}
elsif ($offnum == 13)
{
# Corrupt the bits in column 'c' toast pointer
$tup->{c6} = 41;
$tup->{c7} = 41;
$header = header(0, $offnum, 2);
push @expected,
qr/${header}final toast chunk number 0 differs from expected value 6/,
qr/${header}toasted value for attribute 2 missing from toast table/;
}
elsif ($offnum == 14)
{
# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
$tup->{t_infomask} |= HEAP_XMAX_IS_MULTI;
$tup->{t_xmax} = 4;
push @expected,
qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
}
elsif ($offnum == 15) # Last offnum must equal ROWCOUNT
{
# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
$tup->{t_infomask} |= HEAP_XMAX_IS_MULTI;
$tup->{t_xmax} = 4000000000;
push @expected,
qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
}
write_tuple($file, $offset, $tup);
}
close($file)
or BAIL_OUT("close failed: $!");
$node->start;
# Run pg_amcheck against the corrupt table with epoch=0, comparing actual
# corruption messages against the expected messages
$node->command_checks_all(
['pg_amcheck', '--no-dependent-indexes', '-p', $port, 'postgres'],
2,
[ @expected ],
[ ],
'Expected corruption message output');
$node->teardown_node;
$node->clean_node;
# This regression test checks the behavior of the btree validation in the
# presence of breaking sort order changes.
#
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 5;
my $node = get_new_node('test');
$node->init;
$node->start;
# Create a custom operator class and an index which uses it.
$node->safe_psql('postgres', q(
CREATE EXTENSION amcheck;
CREATE FUNCTION int4_asc_cmp (a int4, b int4) RETURNS int LANGUAGE sql AS $$
SELECT CASE WHEN $1 = $2 THEN 0 WHEN $1 > $2 THEN 1 ELSE -1 END; $$;
CREATE OPERATOR CLASS int4_fickle_ops FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 int4_asc_cmp(int4, int4);
CREATE TABLE int4tbl (i int4);
INSERT INTO int4tbl (SELECT * FROM generate_series(1,1000) gs);
CREATE INDEX fickleidx ON int4tbl USING btree (i int4_fickle_ops);
));
# We have not yet broken the index, so we should get no corruption
$node->command_like(
[ 'pg_amcheck', '--quiet', '-p', $node->port, 'postgres' ],
qr/^$/,
'pg_amcheck all schemas, tables and indexes reports no corruption');
# Change the operator class to use a function which sorts in a different
# order to corrupt the btree index
$node->safe_psql('postgres', q(
CREATE FUNCTION int4_desc_cmp (int4, int4) RETURNS int LANGUAGE sql AS $$
SELECT CASE WHEN $1 = $2 THEN 0 WHEN $1 > $2 THEN -1 ELSE 1 END; $$;
UPDATE pg_catalog.pg_amproc
SET amproc = 'int4_desc_cmp'::regproc
WHERE amproc = 'int4_asc_cmp'::regproc
));
# Index corruption should now be reported
$node->command_checks_all(
[ 'pg_amcheck', '-p', $node->port, 'postgres' ],
2,
[ qr/item order invariant violated for index "fickleidx"/ ],
[ ],
'pg_amcheck all schemas, tables and indexes reports fickleidx corruption'
);
......@@ -20,12 +20,12 @@ our (@ISA, @EXPORT_OK);
my $insttype;
my @client_contribs = ('oid2name', 'pgbench', 'vacuumlo');
my @client_program_files = (
'clusterdb', 'createdb', 'createuser', 'dropdb',
'dropuser', 'ecpg', 'libecpg', 'libecpg_compat',
'libpgtypes', 'libpq', 'pg_basebackup', 'pg_config',
'pg_dump', 'pg_dumpall', 'pg_isready', 'pg_receivewal',
'pg_recvlogical', 'pg_restore', 'psql', 'reindexdb',
'vacuumdb', @client_contribs);
'clusterdb', 'createdb', 'createuser', 'dropdb',
'dropuser', 'ecpg', 'libecpg', 'libecpg_compat',
'libpgtypes', 'libpq', 'pg_amcheck', 'pg_basebackup',
'pg_config', 'pg_dump', 'pg_dumpall', 'pg_isready',
'pg_receivewal', 'pg_recvlogical', 'pg_restore', 'psql',
'reindexdb', 'vacuumdb', @client_contribs);
sub lcopy
{
......
......@@ -54,17 +54,18 @@ my @contrib_excludes = (
# Set of variables for frontend modules
my $frontend_defines = { 'initdb' => 'FRONTEND' };
my @frontend_uselibpq = ('pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb');
my @frontend_uselibpq = ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb');
my @frontend_uselibpgport = (
'pg_archivecleanup', 'pg_test_fsync',
'pg_amcheck', 'pg_archivecleanup', 'pg_test_fsync',
'pg_test_timing', 'pg_upgrade',
'pg_waldump', 'pgbench');
my @frontend_uselibpgcommon = (
'pg_archivecleanup', 'pg_test_fsync',
'pg_amcheck', 'pg_archivecleanup', 'pg_test_fsync',
'pg_test_timing', 'pg_upgrade',
'pg_waldump', 'pgbench');
my $frontend_extralibs = {
'initdb' => ['ws2_32.lib'],
'pg_amcheck' => ['ws2_32.lib'],
'pg_restore' => ['ws2_32.lib'],
'pgbench' => ['ws2_32.lib'],
'psql' => ['ws2_32.lib']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment