Commit 9710d3d4 authored by Alvaro Herrera's avatar Alvaro Herrera

Fix TRUNCATE .. CASCADE on partitions

When running TRUNCATE CASCADE on a child of a partitioned table
referenced by another partitioned table, the truncate was not applied to
partitions of the referencing table; this could leave rows violating the
constraint in the referencing partitioned table.  Repair by walking the
pg_constraint chain all the way up to the topmost referencing table.

Note: any partitioned tables containing FKs that reference other
partitioned tables should be checked for possible violating rows, if
TRUNCATE has occurred in partitions of the referenced table.

Reported-by: Christophe Courtois
Author: Jehan-Guillaume de Rorthais
Discussion: https://postgr.es/m/20200204183906.115f693e@firost
parent cb5b2861
......@@ -124,6 +124,9 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [
option can be used to automatically include all dependent tables &mdash;
but be very careful when using this option, or else you might lose data you
did not intend to!
Note in particular that when the table to be truncated is a partition,
siblings partitions are left untouched, but cascading occurs to all
referencing tables and all their partitions with no distinction.
</para>
<para>
......
......@@ -3396,9 +3396,16 @@ List *
heap_truncate_find_FKs(List *relationIds)
{
List *result = NIL;
List *oids = list_copy(relationIds);
List *parent_cons;
ListCell *cell;
ScanKeyData key;
Relation fkeyRel;
SysScanDesc fkeyScan;
HeapTuple tuple;
bool restart;
oids = list_copy(relationIds);
/*
* Must scan pg_constraint. Right now, it is a seqscan because there is
......@@ -3406,6 +3413,10 @@ heap_truncate_find_FKs(List *relationIds)
*/
fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
restart:
restart = false;
parent_cons = NIL;
fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
NULL, 0, NULL);
......@@ -3418,16 +3429,85 @@ heap_truncate_find_FKs(List *relationIds)
continue;
/* Not referencing one of our list of tables */
if (!list_member_oid(relationIds, con->confrelid))
if (!list_member_oid(oids, con->confrelid))
continue;
/* Add referencer to result, unless present in input list */
/*
* If this constraint has a parent constraint which we have not seen
* yet, keep track of it for the second loop, below. Tracking parent
* constraints allows us to climb up to the top-level level constraint
* and look for all possible relations referencing the partitioned
* table.
*/
if (OidIsValid(con->conparentid) &&
!list_member_oid(parent_cons, con->conparentid))
parent_cons = lappend_oid(parent_cons, con->conparentid);
/*
* Add referencer to result, unless present in input list. (Don't
* worry about dupes: we'll fix that below).
*/
if (!list_member_oid(relationIds, con->conrelid))
result = lappend_oid(result, con->conrelid);
}
systable_endscan(fkeyScan);
/*
* Process each parent constraint we found to add the list of referenced
* relations by them to the oids list. If we do add any new such
* relations, redo the first loop above. Also, if we see that the parent
* constraint in turn has a parent, add that so that we process all
* relations in a single additional pass.
*/
foreach(cell, parent_cons)
{
Oid parent = lfirst_oid(cell);
ScanKeyInit(&key,
Anum_pg_constraint_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(parent));
fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
true, NULL, 1, &key);
tuple = systable_getnext(fkeyScan);
if (HeapTupleIsValid(tuple))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
/*
* pg_constraint rows always appear for partitioned hierarchies
* this way: on the each side of the constraint, one row appears
* for each partition that points to the top-most table on the
* other side.
*
* Because of this arrangement, we can correctly catch all
* relevant relations by adding to 'parent_cons' all rows with
* valid conparentid, and to the 'oids' list all rows with a
* zero conparentid. If any oids are added to 'oids', redo the
* first loop above by setting 'restart'.
*/
if (OidIsValid(con->conparentid))
parent_cons = list_append_unique_oid(parent_cons,
con->conparentid);
else if (!list_member_oid(oids, con->confrelid))
{
oids = lappend_oid(oids, con->confrelid);
restart = true;
}
}
systable_endscan(fkeyScan);
}
list_free(parent_cons);
if (restart)
goto restart;
table_close(fkeyRel, AccessShareLock);
list_free(oids);
/* Now sort and de-duplicate the result list */
list_sort(result, list_oid_cmp);
......
......@@ -542,3 +542,53 @@ SELECT * FROM tp_chk_data();
DROP TABLE truncprim, truncpart;
DROP FUNCTION tp_ins_data(), tp_chk_data();
-- test cascade when referencing a partitioned table
CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
PARTITION BY RANGE (a);
CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
-- truncate a partition cascading to a table
CREATE TABLE ref_b (
b INT PRIMARY KEY,
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
);
INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
TRUNCATE TABLE trunc_a1 CASCADE;
NOTICE: truncate cascades to table "ref_b"
SELECT a FROM ref_b;
a
---
(0 rows)
DROP TABLE ref_b;
-- truncate a partition cascading to a partitioned table
CREATE TABLE ref_c (
c INT PRIMARY KEY,
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
) PARTITION BY RANGE (c);
CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
TRUNCATE TABLE trunc_a21 CASCADE;
NOTICE: truncate cascades to table "ref_c"
NOTICE: truncate cascades to table "ref_c1"
NOTICE: truncate cascades to table "ref_c2"
SELECT a as "from table ref_c" FROM ref_c;
from table ref_c
------------------
(0 rows)
SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
from table trunc_a
--------------------
15
20
25
(3 rows)
DROP TABLE trunc_a, ref_c;
......@@ -289,3 +289,41 @@ TRUNCATE TABLE truncpart;
SELECT * FROM tp_chk_data();
DROP TABLE truncprim, truncpart;
DROP FUNCTION tp_ins_data(), tp_chk_data();
-- test cascade when referencing a partitioned table
CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
PARTITION BY RANGE (a);
CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
-- truncate a partition cascading to a table
CREATE TABLE ref_b (
b INT PRIMARY KEY,
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
);
INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
TRUNCATE TABLE trunc_a1 CASCADE;
SELECT a FROM ref_b;
DROP TABLE ref_b;
-- truncate a partition cascading to a partitioned table
CREATE TABLE ref_c (
c INT PRIMARY KEY,
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
) PARTITION BY RANGE (c);
CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
TRUNCATE TABLE trunc_a21 CASCADE;
SELECT a as "from table ref_c" FROM ref_c;
SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
DROP TABLE trunc_a, ref_c;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment