Commit f841ceb2 authored by Michael Paquier's avatar Michael Paquier

Improve TRUNCATE by avoiding early lock queue

A caller of TRUNCATE could previously queue for an access exclusive lock
on a relation it may not have permission to truncate, potentially
interfering with users authorized to work on it.  This can be very
intrusive depending on the lock attempted to be taken.  For example,
pg_authid could be blocked, preventing any authentication attempt to
happen on a PostgreSQL instance.

This commit fixes the case of TRUNCATE so as RangeVarGetRelidExtended is
used with a callback doing the necessary ACL checks at an earlier stage,
avoiding lock queuing issues, so as an immediate failure happens for
unprivileged users instead of waiting on a lock that would not be
taken.

This is rather similar to the type of work done in cbe24a6d for CLUSTER,
and the code of TRUNCATE is this time refactored so as there is no
user-facing changes.  As the commit for CLUSTER, no back-patch is done.

Reported-by: Lloyd Albin, Jeremy Schneider
Author: Michael Paquier
Reviewed by: Nathan Bossart, Kyotaro Horiguchi
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526317@wrigleys.postgresql.org
Discussion: https://postgr.es/m/20180806165816.GA19883@paquier.xyz
parent 2b13702d
...@@ -300,7 +300,10 @@ struct DropRelationCallbackState ...@@ -300,7 +300,10 @@ struct DropRelationCallbackState
#define child_dependency_type(child_is_partition) \ #define child_dependency_type(child_is_partition) \
((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL) ((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL)
static void truncate_check_rel(Relation rel); static void truncate_check_rel(Oid relid, Form_pg_class reltuple);
static void truncate_check_activity(Relation rel);
static void RangeVarCallbackForTruncate(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static List *MergeAttributes(List *schema, List *supers, char relpersistence, static List *MergeAttributes(List *schema, List *supers, char relpersistence,
bool is_partition, List **supOids, List **supconstr, bool is_partition, List **supOids, List **supconstr,
int *supOidCount); int *supOidCount);
...@@ -1336,15 +1339,26 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1336,15 +1339,26 @@ ExecuteTruncate(TruncateStmt *stmt)
bool recurse = rv->inh; bool recurse = rv->inh;
Oid myrelid; Oid myrelid;
rel = heap_openrv(rv, AccessExclusiveLock); myrelid = RangeVarGetRelidExtended(rv, AccessExclusiveLock,
myrelid = RelationGetRelid(rel); 0, RangeVarCallbackForTruncate,
NULL);
/* open the relation, we already hold a lock on it */
rel = heap_open(myrelid, NoLock);
/* don't throw error for "TRUNCATE foo, foo" */ /* don't throw error for "TRUNCATE foo, foo" */
if (list_member_oid(relids, myrelid)) if (list_member_oid(relids, myrelid))
{ {
heap_close(rel, AccessExclusiveLock); heap_close(rel, AccessExclusiveLock);
continue; continue;
} }
truncate_check_rel(rel);
/*
* RangeVarGetRelidExtended() has done most checks with its callback,
* but other checks with the now-opened Relation remain.
*/
truncate_check_activity(rel);
rels = lappend(rels, rel); rels = lappend(rels, rel);
relids = lappend_oid(relids, myrelid); relids = lappend_oid(relids, myrelid);
/* Log this relation only if needed for logical decoding */ /* Log this relation only if needed for logical decoding */
...@@ -1367,7 +1381,9 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1367,7 +1381,9 @@ ExecuteTruncate(TruncateStmt *stmt)
/* find_all_inheritors already got lock */ /* find_all_inheritors already got lock */
rel = heap_open(childrelid, NoLock); rel = heap_open(childrelid, NoLock);
truncate_check_rel(rel); truncate_check_rel(RelationGetRelid(rel), rel->rd_rel);
truncate_check_activity(rel);
rels = lappend(rels, rel); rels = lappend(rels, rel);
relids = lappend_oid(relids, childrelid); relids = lappend_oid(relids, childrelid);
/* Log this relation only if needed for logical decoding */ /* Log this relation only if needed for logical decoding */
...@@ -1450,7 +1466,8 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, ...@@ -1450,7 +1466,8 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
ereport(NOTICE, ereport(NOTICE,
(errmsg("truncate cascades to table \"%s\"", (errmsg("truncate cascades to table \"%s\"",
RelationGetRelationName(rel)))); RelationGetRelationName(rel))));
truncate_check_rel(rel); truncate_check_rel(relid, rel->rd_rel);
truncate_check_activity(rel);
rels = lappend(rels, rel); rels = lappend(rels, rel);
relids = lappend_oid(relids, relid); relids = lappend_oid(relids, relid);
/* Log this relation only if needed for logical decoding */ /* Log this relation only if needed for logical decoding */
...@@ -1700,38 +1717,47 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, ...@@ -1700,38 +1717,47 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
} }
/* /*
* Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate * Check that a given relation is safe to truncate. Subroutine for
* ExecuteTruncate() and RangeVarCallbackForTruncate().
*/ */
static void static void
truncate_check_rel(Relation rel) truncate_check_rel(Oid relid, Form_pg_class reltuple)
{ {
AclResult aclresult; AclResult aclresult;
char *relname = NameStr(reltuple->relname);
/* /*
* Only allow truncate on regular tables and partitioned tables (although, * Only allow truncate on regular tables and partitioned tables (although,
* the latter are only being included here for the following checks; no * the latter are only being included here for the following checks; no
* physical truncation will occur in their case.) * physical truncation will occur in their case.)
*/ */
if (rel->rd_rel->relkind != RELKIND_RELATION && if (reltuple->relkind != RELKIND_RELATION &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) reltuple->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE), (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table", errmsg("\"%s\" is not a table", relname)));
RelationGetRelationName(rel))));
/* Permissions checks */ /* Permissions checks */
aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_TRUNCATE);
ACL_TRUNCATE);
if (aclresult != ACLCHECK_OK) if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind), aclcheck_error(aclresult, get_relkind_objtype(reltuple->relkind),
RelationGetRelationName(rel)); relname);
if (!allowSystemTableMods && IsSystemRelation(rel)) if (!allowSystemTableMods && IsSystemClass(relid, reltuple))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog", errmsg("permission denied: \"%s\" is a system catalog",
RelationGetRelationName(rel)))); relname)));
}
/*
* Set of extra sanity checks to check if a given relation is safe to
* truncate. This is split with truncate_check_rel() as
* RangeVarCallbackForTruncate() cannot open a Relation yet.
*/
static void
truncate_check_activity(Relation rel)
{
/* /*
* Don't allow truncate on temp tables of other backends ... their local * Don't allow truncate on temp tables of other backends ... their local
* buffer manager is not going to cope. * buffer manager is not going to cope.
...@@ -13419,6 +13445,28 @@ RangeVarCallbackOwnsTable(const RangeVar *relation, ...@@ -13419,6 +13445,28 @@ RangeVarCallbackOwnsTable(const RangeVar *relation,
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname); aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname);
} }
/*
* Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
*/
static void
RangeVarCallbackForTruncate(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg)
{
HeapTuple tuple;
/* Nothing to do if the relation was not found. */
if (!OidIsValid(relId))
return;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
if (!HeapTupleIsValid(tuple)) /* should not happen */
elog(ERROR, "cache lookup failed for relation %u", relId);
truncate_check_rel(relId, (Form_pg_class) GETSTRUCT(tuple));
ReleaseSysCache(tuple);
}
/* /*
* Callback to RangeVarGetRelidExtended(), similar to * Callback to RangeVarGetRelidExtended(), similar to
* RangeVarCallbackOwnsTable() but without checks on the type of the relation. * RangeVarCallbackOwnsTable() but without checks on the type of the relation.
......
Parsed test spec with 2 sessions
starting permutation: s1_begin s1_tab_lookup s2_auth s2_truncate s1_commit s2_reset
step s1_begin: BEGIN;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab;
ERROR: permission denied for table truncate_tab
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
starting permutation: s1_begin s2_auth s2_truncate s1_tab_lookup s1_commit s2_reset
step s1_begin: BEGIN;
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab;
ERROR: permission denied for table truncate_tab
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
starting permutation: s1_begin s2_auth s1_tab_lookup s2_truncate s1_commit s2_reset
step s1_begin: BEGIN;
step s2_auth: SET ROLE regress_truncate_conflict;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s2_truncate: TRUNCATE truncate_tab;
ERROR: permission denied for table truncate_tab
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
starting permutation: s2_auth s2_truncate s1_begin s1_tab_lookup s1_commit s2_reset
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab;
ERROR: permission denied for table truncate_tab
step s1_begin: BEGIN;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
starting permutation: s1_begin s1_tab_lookup s2_grant s2_auth s2_truncate s1_commit s2_reset
step s1_begin: BEGIN;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab; <waiting ...>
step s1_commit: COMMIT;
step s2_truncate: <... completed>
step s2_reset: RESET ROLE;
starting permutation: s1_begin s2_grant s2_auth s2_truncate s1_tab_lookup s1_commit s2_reset
step s1_begin: BEGIN;
step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
starting permutation: s1_begin s2_grant s2_auth s1_tab_lookup s2_truncate s1_commit s2_reset
step s1_begin: BEGIN;
step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
step s2_auth: SET ROLE regress_truncate_conflict;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s2_truncate: TRUNCATE truncate_tab; <waiting ...>
step s1_commit: COMMIT;
step s2_truncate: <... completed>
step s2_reset: RESET ROLE;
starting permutation: s2_grant s2_auth s2_truncate s1_begin s1_tab_lookup s1_commit s2_reset
step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
step s2_auth: SET ROLE regress_truncate_conflict;
step s2_truncate: TRUNCATE truncate_tab;
step s1_begin: BEGIN;
step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
?column?
t
step s1_commit: COMMIT;
step s2_reset: RESET ROLE;
...@@ -76,3 +76,4 @@ test: partition-key-update-2 ...@@ -76,3 +76,4 @@ test: partition-key-update-2
test: partition-key-update-3 test: partition-key-update-3
test: partition-key-update-4 test: partition-key-update-4
test: plpgsql-toast test: plpgsql-toast
test: truncate-conflict
# Tests for locking conflicts with TRUNCATE commands.
setup
{
CREATE ROLE regress_truncate_conflict;
CREATE TABLE truncate_tab (a int);
}
teardown
{
DROP TABLE truncate_tab;
DROP ROLE regress_truncate_conflict;
}
session "s1"
step "s1_begin" { BEGIN; }
step "s1_tab_lookup" { SELECT count(*) >= 0 FROM truncate_tab; }
step "s1_commit" { COMMIT; }
session "s2"
step "s2_grant" { GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict; }
step "s2_auth" { SET ROLE regress_truncate_conflict; }
step "s2_truncate" { TRUNCATE truncate_tab; }
step "s2_reset" { RESET ROLE; }
# The role doesn't have privileges to truncate the table, so TRUNCATE should
# immediately fail without waiting for a lock.
permutation "s1_begin" "s1_tab_lookup" "s2_auth" "s2_truncate" "s1_commit" "s2_reset"
permutation "s1_begin" "s2_auth" "s2_truncate" "s1_tab_lookup" "s1_commit" "s2_reset"
permutation "s1_begin" "s2_auth" "s1_tab_lookup" "s2_truncate" "s1_commit" "s2_reset"
permutation "s2_auth" "s2_truncate" "s1_begin" "s1_tab_lookup" "s1_commit" "s2_reset"
# The role has privileges to truncate the table, TRUNCATE will block if
# another session holds a lock on the table and succeed in all cases.
permutation "s1_begin" "s1_tab_lookup" "s2_grant" "s2_auth" "s2_truncate" "s1_commit" "s2_reset"
permutation "s1_begin" "s2_grant" "s2_auth" "s2_truncate" "s1_tab_lookup" "s1_commit" "s2_reset"
permutation "s1_begin" "s2_grant" "s2_auth" "s1_tab_lookup" "s2_truncate" "s1_commit" "s2_reset"
permutation "s2_grant" "s2_auth" "s2_truncate" "s1_begin" "s1_tab_lookup" "s1_commit" "s2_reset"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment