Commit 841c29c8 authored by Kevin Grittner's avatar Kevin Grittner

Various cleanups for REFRESH MATERIALIZED VIEW CONCURRENTLY.

Open and lock each index before checking definition in RMVC.  The
ExclusiveLock on the related table is not viewed as sufficient to
ensure that no changes are made to the index definition, and
invalidation messages from other backends might have been missed.
Additionally, use RelationGetIndexExpressions() and check for NIL
rather than doing our own loop.

Protect against redefinition of tid and rowvar operators in RMVC.
While working on this, noticed that the fixes for bugs found during
the CF made the UPDATE statement useless, since no rows could
qualify for that treatment any more.  Ripping out code to support
the UPDATE statement simplified the operator cleanups.

Change slightly confusing local field name.

Use meaningful alias names on queries in refresh_by_match_merge().

Per concerns of raised by Andres Freund and comments and
suggestions from Noah Misch.  Some additional issues remain, which
will be addressed separately.
parent 221e92f6
...@@ -496,19 +496,14 @@ mv_GenerateOper(StringInfo buf, Oid opoid) ...@@ -496,19 +496,14 @@ mv_GenerateOper(StringInfo buf, Oid opoid)
* columns equal. The behavior of NULLs on equality tests and on UNIQUE * columns equal. The behavior of NULLs on equality tests and on UNIQUE
* indexes turns out to be quite convenient here; the tests we need to make * indexes turns out to be quite convenient here; the tests we need to make
* are consistent with default behavior. If there is at least one UNIQUE * are consistent with default behavior. If there is at least one UNIQUE
* index on the materialized view, we have exactly the guarantee we need. By * index on the materialized view, we have exactly the guarantee we need.
* joining based on equality on all columns which are part of any unique
* index, we identify the rows on which we can use UPDATE without any problem.
* If any column is NULL in either the old or new version of a row (or both),
* we must use DELETE and INSERT, since there could be multiple rows which are
* NOT DISTINCT FROM each other, and we could otherwise end up with the wrong
* number of occurrences in the updated relation. The temporary table used to
* hold the diff results contains just the TID of the old record (if matched)
* and the ROW from the new table as a single column of complex record type
* (if matched).
* *
* Once we have the diff table, we perform set-based DELETE, UPDATE, and * The temporary table used to hold the diff results contains just the TID of
* INSERT operations against the materialized view, and discard both temporary * the old record (if matched) and the ROW from the new table as a single
* column of complex record type (if matched).
*
* Once we have the diff table, we perform set-based DELETE and INSERT
* operations against the materialized view, and discard both temporary
* tables. * tables.
* *
* Everything from the generation of the new data to applying the differences * Everything from the generation of the new data to applying the differences
...@@ -567,9 +562,12 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -567,9 +562,12 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
*/ */
resetStringInfo(&querybuf); resetStringInfo(&querybuf);
appendStringInfo(&querybuf, appendStringInfo(&querybuf,
"SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS " "SELECT newdata FROM %s newdata "
"(SELECT * FROM %s y WHERE y IS NOT NULL " "WHERE newdata IS NOT NULL AND EXISTS "
"AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1", "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
"AND newdata2 OPERATOR(pg_catalog.=) newdata "
"AND newdata2.ctid OPERATOR(pg_catalog.<>) "
"newdata.ctid) LIMIT 1",
tempname, tempname); tempname, tempname);
if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data); elog(ERROR, "SPI_exec failed: %s", querybuf.data);
...@@ -587,7 +585,8 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -587,7 +585,8 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
resetStringInfo(&querybuf); resetStringInfo(&querybuf);
appendStringInfo(&querybuf, appendStringInfo(&querybuf,
"CREATE TEMP TABLE %s AS " "CREATE TEMP TABLE %s AS "
"SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON (", "SELECT mv.ctid AS tid, newdata "
"FROM %s mv FULL JOIN %s newdata ON (",
diffname, matviewname, tempname); diffname, matviewname, tempname);
/* /*
...@@ -603,52 +602,45 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -603,52 +602,45 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
foreach(indexoidscan, indexoidlist) foreach(indexoidscan, indexoidlist)
{ {
Oid indexoid = lfirst_oid(indexoidscan); Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
HeapTuple indexTuple; HeapTuple indexTuple;
Form_pg_index index; Form_pg_index indexStruct;
indexRel = index_open(indexoid, RowExclusiveLock);
indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
if (!HeapTupleIsValid(indexTuple)) /* should not happen */ if (!HeapTupleIsValid(indexTuple)) /* should not happen */
elog(ERROR, "cache lookup failed for index %u", indexoid); elog(ERROR, "cache lookup failed for index %u", indexoid);
index = (Form_pg_index) GETSTRUCT(indexTuple); indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
/* We're only interested if it is unique and valid. */ /* We're only interested if it is unique and valid. */
if (index->indisunique && IndexIsValid(index)) if (indexStruct->indisunique && IndexIsValid(indexStruct))
{ {
int numatts = index->indnatts; int numatts = indexStruct->indnatts;
int i; int i;
bool expr = false;
Relation indexRel;
/* Skip any index on an expression. */ /* Skip any index on an expression. */
for (i = 0; i < numatts; i++) if (RelationGetIndexExpressions(indexRel) != NIL)
{
if (index->indkey.values[i] == 0)
{
expr = true;
break;
}
}
if (expr)
{ {
index_close(indexRel, NoLock);
ReleaseSysCache(indexTuple); ReleaseSysCache(indexTuple);
continue; continue;
} }
/* Skip partial indexes. */ /* Skip partial indexes. */
indexRel = index_open(index->indexrelid, RowExclusiveLock);
if (RelationGetIndexPredicate(indexRel) != NIL) if (RelationGetIndexPredicate(indexRel) != NIL)
{ {
index_close(indexRel, NoLock); index_close(indexRel, NoLock);
ReleaseSysCache(indexTuple); ReleaseSysCache(indexTuple);
continue; continue;
} }
/* Hold the locks, since we're about to run DML which needs them. */ /* Hold the locks, since we're about to run DML which needs them. */
index_close(indexRel, NoLock); index_close(indexRel, NoLock);
/* Add quals for all columns from this index. */ /* Add quals for all columns from this index. */
for (i = 0; i < numatts; i++) for (i = 0; i < numatts; i++)
{ {
int attnum = index->indkey.values[i]; int attnum = indexStruct->indkey.values[i];
Oid type; Oid type;
Oid op; Oid op;
const char *colname; const char *colname;
...@@ -671,11 +663,11 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -671,11 +663,11 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
appendStringInfoString(&querybuf, " AND "); appendStringInfoString(&querybuf, " AND ");
colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname)); colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
appendStringInfo(&querybuf, "y.%s ", colname); appendStringInfo(&querybuf, "newdata.%s ", colname);
type = attnumTypeId(matviewRel, attnum); type = attnumTypeId(matviewRel, attnum);
op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr; op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
mv_GenerateOper(&querybuf, op); mv_GenerateOper(&querybuf, op);
appendStringInfo(&querybuf, " x.%s", colname); appendStringInfo(&querybuf, " mv.%s", colname);
foundUniqueIndex = true; foundUniqueIndex = true;
} }
...@@ -693,7 +685,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -693,7 +685,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view."))); errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view.")));
appendStringInfoString(&querybuf, appendStringInfoString(&querybuf,
" AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*)" " AND newdata = mv) WHERE newdata IS NULL OR mv IS NULL"
" ORDER BY tid"); " ORDER BY tid");
/* Create the temporary "diff" table. */ /* Create the temporary "diff" table. */
...@@ -726,56 +718,19 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid) ...@@ -726,56 +718,19 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid)
/* Deletes must come before inserts; do them first. */ /* Deletes must come before inserts; do them first. */
resetStringInfo(&querybuf); resetStringInfo(&querybuf);
appendStringInfo(&querybuf, appendStringInfo(&querybuf,
"DELETE FROM %s WHERE ctid IN " "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
"(SELECT d.tid FROM %s d " "(SELECT diff.tid FROM %s diff "
"WHERE d.tid IS NOT NULL " "WHERE diff.tid IS NOT NULL "
"AND (d.y) IS NOT DISTINCT FROM NULL)", "AND diff.newdata IS NULL)",
matviewname, diffname); matviewname, diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data); elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Updates before inserts gives a better chance at HOT updates. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "UPDATE %s x SET ", matviewname);
{
int i;
bool targetColFound = false;
for (i = 0; i < tupdesc->natts; i++)
{
const char *colname;
if (tupdesc->attrs[i]->attisdropped)
continue;
if (usedForQual[i])
continue;
if (targetColFound)
appendStringInfoString(&querybuf, ", ");
targetColFound = true;
colname = quote_identifier(NameStr((tupdesc->attrs[i])->attname));
appendStringInfo(&querybuf, "%s = (d.y).%s", colname, colname);
}
if (targetColFound)
{
appendStringInfo(&querybuf,
" FROM %s d "
"WHERE d.tid IS NOT NULL AND x.ctid = d.tid",
diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
}
}
/* Inserts go last. */ /* Inserts go last. */
resetStringInfo(&querybuf); resetStringInfo(&querybuf);
appendStringInfo(&querybuf, appendStringInfo(&querybuf,
"INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL", "INSERT INTO %s SELECT (diff.newdata).* "
"FROM %s diff WHERE tid IS NULL",
matviewname, diffname); matviewname, diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data); elog(ERROR, "SPI_exec failed: %s", querybuf.data);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment