Commit 64c60489 authored by Tom Lane's avatar Tom Lane

Improve the -l (limit) option recently added to contrib/vacuumlo.

Instead of just stopping after removing an arbitrary subset of orphaned
large objects, commit and start a new transaction after each -l objects.
This is just as effective as the original patch at limiting the number of
locks used, and it doesn't require doing the OID collection process
repeatedly to get everything.  Since the option no longer changes the
fundamental behavior of vacuumlo, and it avoids a known server-side
limitation, enable it by default (with a default limit of 1000 LOs per
transaction).

In passing, be more careful about properly quoting the names of tables
and fields, and do some other cosmetic cleanup.
parent 9d23a70d
...@@ -29,8 +29,7 @@ ...@@ -29,8 +29,7 @@
extern char *optarg; extern char *optarg;
extern int optind, extern int optind,
opterr, opterr;
optopt;
enum trivalue enum trivalue
{ {
...@@ -50,16 +49,16 @@ struct _param ...@@ -50,16 +49,16 @@ struct _param
long transaction_limit; long transaction_limit;
}; };
int vacuumlo(char *, struct _param *); static int vacuumlo(const char *database, const struct _param * param);
void usage(const char *progname); static void usage(const char *progname);
/* /*
* This vacuums LOs of one database. It returns 0 on success, -1 on failure. * This vacuums LOs of one database. It returns 0 on success, -1 on failure.
*/ */
int static int
vacuumlo(char *database, struct _param * param) vacuumlo(const char *database, const struct _param * param)
{ {
PGconn *conn; PGconn *conn;
PGresult *res, PGresult *res,
...@@ -72,6 +71,7 @@ vacuumlo(char *database, struct _param * param) ...@@ -72,6 +71,7 @@ vacuumlo(char *database, struct _param * param)
bool new_pass; bool new_pass;
bool success = true; bool success = true;
/* Note: password can be carried over from a previous call */
if (param->pg_prompt == TRI_YES && password == NULL) if (param->pg_prompt == TRI_YES && password == NULL)
password = simple_prompt("Password: ", 100, false); password = simple_prompt("Password: ", 100, false);
...@@ -119,7 +119,7 @@ vacuumlo(char *database, struct _param * param) ...@@ -119,7 +119,7 @@ vacuumlo(char *database, struct _param * param)
if (param->verbose) if (param->verbose)
{ {
fprintf(stdout, "Connected to %s\n", database); fprintf(stdout, "Connected to database \"%s\"\n", database);
if (param->dry_run) if (param->dry_run)
fprintf(stdout, "Test run: no large objects will be removed!\n"); fprintf(stdout, "Test run: no large objects will be removed!\n");
} }
...@@ -220,9 +220,21 @@ vacuumlo(char *database, struct _param * param) ...@@ -220,9 +220,21 @@ vacuumlo(char *database, struct _param * param)
if (param->verbose) if (param->verbose)
fprintf(stdout, "Checking %s in %s.%s\n", field, schema, table); fprintf(stdout, "Checking %s in %s.%s\n", field, schema, table);
schema = PQescapeIdentifier(conn, schema, strlen(schema));
table = PQescapeIdentifier(conn, table, strlen(table));
field = PQescapeIdentifier(conn, field, strlen(field));
if (!schema || !table || !field)
{
fprintf(stderr, "Out of memory\n");
PQclear(res);
PQfinish(conn);
return -1;
}
snprintf(buf, BUFSIZE, snprintf(buf, BUFSIZE,
"DELETE FROM vacuum_l " "DELETE FROM vacuum_l "
"WHERE lo IN (SELECT \"%s\" FROM \"%s\".\"%s\")", "WHERE lo IN (SELECT %s FROM %s.%s)",
field, schema, table); field, schema, table);
res2 = PQexec(conn, buf); res2 = PQexec(conn, buf);
if (PQresultStatus(res2) != PGRES_COMMAND_OK) if (PQresultStatus(res2) != PGRES_COMMAND_OK)
...@@ -236,23 +248,35 @@ vacuumlo(char *database, struct _param * param) ...@@ -236,23 +248,35 @@ vacuumlo(char *database, struct _param * param)
return -1; return -1;
} }
PQclear(res2); PQclear(res2);
PQfreemem(schema);
PQfreemem(table);
PQfreemem(field);
} }
PQclear(res); PQclear(res);
/* /*
* Run the actual deletes in a single transaction. Note that this would * Now, those entries remaining in vacuum_l are orphans. Delete 'em.
* be a bad idea in pre-7.1 Postgres releases (since rolling back a table *
* delete used to cause problems), but it should be safe now. * We don't want to run each delete as an individual transaction, because
* the commit overhead would be high. However, since 9.0 the backend will
* acquire a lock per deleted LO, so deleting too many LOs per transaction
* risks running out of room in the shared-memory lock table.
* Accordingly, we delete up to transaction_limit LOs per transaction.
*/ */
res = PQexec(conn, "begin"); res = PQexec(conn, "begin");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to start transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res); PQclear(res);
/*
* Finally, those entries remaining in vacuum_l are orphans.
*/
buf[0] = '\0'; buf[0] = '\0';
strcat(buf, "SELECT lo "); strcat(buf, "SELECT lo FROM vacuum_l");
strcat(buf, "FROM vacuum_l");
res = PQexec(conn, buf); res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
...@@ -292,15 +316,47 @@ vacuumlo(char *database, struct _param * param) ...@@ -292,15 +316,47 @@ vacuumlo(char *database, struct _param * param)
} }
else else
deleted++; deleted++;
if (param->transaction_limit != 0 && deleted >= param->transaction_limit) if (param->transaction_limit > 0 &&
break; (deleted % param->transaction_limit) == 0)
{
res2 = PQexec(conn, "commit");
if (PQresultStatus(res2) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to commit transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res2);
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res2);
res2 = PQexec(conn, "begin");
if (PQresultStatus(res2) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to start transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res2);
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res2);
}
} }
PQclear(res); PQclear(res);
/* /*
* That's all folks! * That's all folks!
*/ */
res = PQexec(conn, "end"); res = PQexec(conn, "commit");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to commit transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res); PQclear(res);
PQfinish(conn); PQfinish(conn);
...@@ -308,28 +364,28 @@ vacuumlo(char *database, struct _param * param) ...@@ -308,28 +364,28 @@ vacuumlo(char *database, struct _param * param)
if (param->verbose) if (param->verbose)
{ {
if (param->dry_run) if (param->dry_run)
fprintf(stdout, "\rWould remove %ld large objects from %s.\n", fprintf(stdout, "\rWould remove %ld large objects from database \"%s\".\n",
deleted, database); deleted, database);
else if (success) else if (success)
fprintf(stdout, fprintf(stdout,
"\rSuccessfully removed %ld large objects from %s.\n", "\rSuccessfully removed %ld large objects from database \"%s\".\n",
deleted, database); deleted, database);
else else
fprintf(stdout, "\rRemoval from %s failed at object %ld of %ld.\n", fprintf(stdout, "\rRemoval from database \"%s\" failed at object %ld of %ld.\n",
database, deleted, matched); database, deleted, matched);
} }
return ((param->dry_run || success) ? 0 : -1); return ((param->dry_run || success) ? 0 : -1);
} }
void static void
usage(const char *progname) usage(const char *progname)
{ {
printf("%s removes unreferenced large objects from databases.\n\n", progname); printf("%s removes unreferenced large objects from databases.\n\n", progname);
printf("Usage:\n %s [OPTION]... DBNAME...\n\n", progname); printf("Usage:\n %s [OPTION]... DBNAME...\n\n", progname);
printf("Options:\n"); printf("Options:\n");
printf(" -h HOSTNAME database server host or socket directory\n"); printf(" -h HOSTNAME database server host or socket directory\n");
printf(" -l LIMIT stop after removing LIMIT large objects\n"); printf(" -l LIMIT commit after removing each LIMIT large objects\n");
printf(" -n don't remove large objects, just show what would be done\n"); printf(" -n don't remove large objects, just show what would be done\n");
printf(" -p PORT database server port\n"); printf(" -p PORT database server port\n");
printf(" -U USERNAME user name to connect as\n"); printf(" -U USERNAME user name to connect as\n");
...@@ -354,15 +410,16 @@ main(int argc, char **argv) ...@@ -354,15 +410,16 @@ main(int argc, char **argv)
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
/* Parameter handling */ /* Set default parameter values */
param.pg_user = NULL; param.pg_user = NULL;
param.pg_prompt = TRI_DEFAULT; param.pg_prompt = TRI_DEFAULT;
param.pg_host = NULL; param.pg_host = NULL;
param.pg_port = NULL; param.pg_port = NULL;
param.verbose = 0; param.verbose = 0;
param.dry_run = 0; param.dry_run = 0;
param.transaction_limit = 0; param.transaction_limit = 1000;
/* Process command-line arguments */
if (argc > 1) if (argc > 1)
{ {
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
...@@ -397,6 +454,16 @@ main(int argc, char **argv) ...@@ -397,6 +454,16 @@ main(int argc, char **argv)
param.dry_run = 1; param.dry_run = 1;
param.verbose = 1; param.verbose = 1;
break; break;
case 'l':
param.transaction_limit = strtol(optarg, NULL, 10);
if (param.transaction_limit < 0)
{
fprintf(stderr,
"%s: transaction limit must not be negative (0 disables)\n",
progname);
exit(1);
}
break;
case 'U': case 'U':
param.pg_user = strdup(optarg); param.pg_user = strdup(optarg);
break; break;
...@@ -415,16 +482,6 @@ main(int argc, char **argv) ...@@ -415,16 +482,6 @@ main(int argc, char **argv)
} }
param.pg_port = strdup(optarg); param.pg_port = strdup(optarg);
break; break;
case 'l':
param.transaction_limit = strtol(optarg, NULL, 10);
if (param.transaction_limit < 0)
{
fprintf(stderr,
"%s: transaction limit must not be negative (0 disables)\n",
progname);
exit(1);
}
break;
case 'h': case 'h':
param.pg_host = strdup(optarg); param.pg_host = strdup(optarg);
break; break;
...@@ -435,7 +492,7 @@ main(int argc, char **argv) ...@@ -435,7 +492,7 @@ main(int argc, char **argv)
if (optind >= argc) if (optind >= argc)
{ {
fprintf(stderr, "vacuumlo: missing required argument: database name\n"); fprintf(stderr, "vacuumlo: missing required argument: database name\n");
fprintf(stderr, "Try 'vacuumlo -?' for help.\n"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1); exit(1);
} }
......
...@@ -50,19 +50,22 @@ vacuumlo [options] database [database2 ... databaseN] ...@@ -50,19 +50,22 @@ vacuumlo [options] database [database2 ... databaseN]
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><option>-U</option> <replaceable>username</></term> <term><option>-l</option> <replaceable>limit</></term>
<listitem> <listitem>
<para>User name to connect as.</para> <para>
Remove no more than <replaceable>limit</> large objects per
transaction (default 1000). Since the server acquires a lock per LO
removed, removing too many LOs in one transaction risks exceeding
<xref linkend="guc-max-locks-per-transaction">. Set the limit to
zero if you want all removals done in a single transaction.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><option>-l</option> <replaceable>limit</></term> <term><option>-U</option> <replaceable>username</></term>
<listitem> <listitem>
<para> <para>User name to connect as.</para>
Stop after removing LIMIT large objects. Useful to avoid
exceeding <xref linkend="guc-max-locks-per-transaction">.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -120,18 +123,19 @@ vacuumlo [options] database [database2 ... databaseN] ...@@ -120,18 +123,19 @@ vacuumlo [options] database [database2 ... databaseN]
<title>Method</title> <title>Method</title>
<para> <para>
First, it builds a temporary table which contains all of the OIDs of the First, <application>vacuumlo</> builds a temporary table which contains all
large objects in that database. of the OIDs of the large objects in the selected database.
</para> </para>
<para> <para>
It then scans through all columns in the database that are of type It then scans through all columns in the database that are of type
<type>oid</> or <type>lo</>, and removes matching entries from the <type>oid</> or <type>lo</>, and removes matching entries from the
temporary table. temporary table. (Note: only types with these names are considered;
in particular, domains over them are not considered.)
</para> </para>
<para> <para>
The remaining entries in the temp table identify orphaned LOs. The remaining entries in the temporary table identify orphaned LOs.
These are removed. These are removed.
</para> </para>
</sect2> </sect2>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment