Commit 842faa71 authored by Stephen Frost's avatar Stephen Frost

Make security barrier views automatically updatable

Views which are marked as security_barrier must have their quals
applied before any user-defined quals are called, to prevent
user-defined functions from being able to see rows which the
security barrier view is intended to prevent them from seeing.

Remove the restriction on security barrier views being automatically
updatable by adding a new securityQuals list to the RTE structure
which keeps track of the quals from security barrier views at each
level, independently of the user-supplied quals.  When RTEs are
later discovered which have securityQuals populated, they are turned
into subquery RTEs which are marked as security_barrier to prevent
any user-supplied quals being pushed down (modulo LEAKPROOF quals).

Dean Rasheed, reviewed by Craig Ringer, Simon Riggs, KaiGai Kohei
parent 9d229f39
...@@ -323,12 +323,6 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello; ...@@ -323,12 +323,6 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello;
or set-returning functions. or set-returning functions.
</para> </para>
</listitem> </listitem>
<listitem>
<para>
The view must not have the <literal>security_barrier</> property.
</para>
</listitem>
</itemizedlist> </itemizedlist>
</para> </para>
...@@ -361,6 +355,19 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello; ...@@ -361,6 +355,19 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello;
such rows that are not visible through the view. such rows that are not visible through the view.
</para> </para>
<para>
If an automatically updatable view is marked with the
<literal>security_barrier</> property then all the view's <literal>WHERE</>
conditions (and any conditions using operators which are marked as LEAKPROOF)
will always be evaluated before any conditions that a user of the view has
added. See <xref linkend="rules-privileges"> for full details. Note that,
due to this, rows which are not ultimately returned (because they do not
pass the user's <literal>WHERE</> conditions) may still end up being locked.
<command>EXPLAIN</command> can be used to see which conditions are
applied at the relation level (and therefore do not lock rows) and which are
not.
</para>
<para> <para>
A more complex view that does not satisfy all these conditions is A more complex view that does not satisfy all these conditions is
read-only by default: the system will not allow an insert, update, or read-only by default: the system will not allow an insert, update, or
......
...@@ -8910,7 +8910,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, ...@@ -8910,7 +8910,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
List *view_options = untransformRelOptions(newOptions); List *view_options = untransformRelOptions(newOptions);
ListCell *cell; ListCell *cell;
bool check_option = false; bool check_option = false;
bool security_barrier = false;
foreach(cell, view_options) foreach(cell, view_options)
{ {
...@@ -8918,8 +8917,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, ...@@ -8918,8 +8917,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
if (pg_strcasecmp(defel->defname, "check_option") == 0) if (pg_strcasecmp(defel->defname, "check_option") == 0)
check_option = true; check_option = true;
if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
security_barrier = defGetBoolean(defel);
} }
/* /*
...@@ -8929,8 +8926,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, ...@@ -8929,8 +8926,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
if (check_option) if (check_option)
{ {
const char *view_updatable_error = const char *view_updatable_error =
view_query_is_auto_updatable(view_query, view_query_is_auto_updatable(view_query, true);
security_barrier, true);
if (view_updatable_error) if (view_updatable_error)
ereport(ERROR, ereport(ERROR,
......
...@@ -396,7 +396,6 @@ DefineView(ViewStmt *stmt, const char *queryString) ...@@ -396,7 +396,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
RangeVar *view; RangeVar *view;
ListCell *cell; ListCell *cell;
bool check_option; bool check_option;
bool security_barrier;
/* /*
* Run parse analysis to convert the raw parse tree to a Query. Note this * Run parse analysis to convert the raw parse tree to a Query. Note this
...@@ -451,7 +450,6 @@ DefineView(ViewStmt *stmt, const char *queryString) ...@@ -451,7 +450,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
* specified. * specified.
*/ */
check_option = false; check_option = false;
security_barrier = false;
foreach(cell, stmt->options) foreach(cell, stmt->options)
{ {
...@@ -459,8 +457,6 @@ DefineView(ViewStmt *stmt, const char *queryString) ...@@ -459,8 +457,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
if (pg_strcasecmp(defel->defname, "check_option") == 0) if (pg_strcasecmp(defel->defname, "check_option") == 0)
check_option = true; check_option = true;
if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
security_barrier = defGetBoolean(defel);
} }
/* /*
...@@ -470,7 +466,7 @@ DefineView(ViewStmt *stmt, const char *queryString) ...@@ -470,7 +466,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
if (check_option) if (check_option)
{ {
const char *view_updatable_error = const char *view_updatable_error =
view_query_is_auto_updatable(viewParse, security_barrier, true); view_query_is_auto_updatable(viewParse, true);
if (view_updatable_error) if (view_updatable_error)
ereport(ERROR, ereport(ERROR,
......
...@@ -1998,6 +1998,7 @@ _copyRangeTblEntry(const RangeTblEntry *from) ...@@ -1998,6 +1998,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
COPY_SCALAR_FIELD(checkAsUser); COPY_SCALAR_FIELD(checkAsUser);
COPY_BITMAPSET_FIELD(selectedCols); COPY_BITMAPSET_FIELD(selectedCols);
COPY_BITMAPSET_FIELD(modifiedCols); COPY_BITMAPSET_FIELD(modifiedCols);
COPY_NODE_FIELD(securityQuals);
return newnode; return newnode;
} }
......
...@@ -2296,6 +2296,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b) ...@@ -2296,6 +2296,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
COMPARE_SCALAR_FIELD(checkAsUser); COMPARE_SCALAR_FIELD(checkAsUser);
COMPARE_BITMAPSET_FIELD(selectedCols); COMPARE_BITMAPSET_FIELD(selectedCols);
COMPARE_BITMAPSET_FIELD(modifiedCols); COMPARE_BITMAPSET_FIELD(modifiedCols);
COMPARE_NODE_FIELD(securityQuals);
return true; return true;
} }
......
...@@ -2020,6 +2020,9 @@ range_table_walker(List *rtable, ...@@ -2020,6 +2020,9 @@ range_table_walker(List *rtable,
return true; return true;
break; break;
} }
if (walker(rte->securityQuals, context))
return true;
} }
return false; return false;
} }
...@@ -2755,6 +2758,7 @@ range_table_mutator(List *rtable, ...@@ -2755,6 +2758,7 @@ range_table_mutator(List *rtable,
MUTATE(newrte->values_lists, rte->values_lists, List *); MUTATE(newrte->values_lists, rte->values_lists, List *);
break; break;
} }
MUTATE(newrte->securityQuals, rte->securityQuals, List *);
newrt = lappend(newrt, newrte); newrt = lappend(newrt, newrte);
} }
return newrt; return newrt;
......
...@@ -2409,6 +2409,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node) ...@@ -2409,6 +2409,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
WRITE_OID_FIELD(checkAsUser); WRITE_OID_FIELD(checkAsUser);
WRITE_BITMAPSET_FIELD(selectedCols); WRITE_BITMAPSET_FIELD(selectedCols);
WRITE_BITMAPSET_FIELD(modifiedCols); WRITE_BITMAPSET_FIELD(modifiedCols);
WRITE_NODE_FIELD(securityQuals);
} }
static void static void
......
...@@ -1252,6 +1252,7 @@ _readRangeTblEntry(void) ...@@ -1252,6 +1252,7 @@ _readRangeTblEntry(void)
READ_OID_FIELD(checkAsUser); READ_OID_FIELD(checkAsUser);
READ_BITMAPSET_FIELD(selectedCols); READ_BITMAPSET_FIELD(selectedCols);
READ_BITMAPSET_FIELD(modifiedCols); READ_BITMAPSET_FIELD(modifiedCols);
READ_NODE_FIELD(securityQuals);
READ_DONE(); READ_DONE();
} }
......
...@@ -915,6 +915,12 @@ inheritance_planner(PlannerInfo *root) ...@@ -915,6 +915,12 @@ inheritance_planner(PlannerInfo *root)
/* Generate plan */ /* Generate plan */
subplan = grouping_planner(&subroot, 0.0 /* retrieve all tuples */ ); subplan = grouping_planner(&subroot, 0.0 /* retrieve all tuples */ );
/*
* Planning may have modified the query result relation (if there
* were security barrier quals on the result RTE).
*/
appinfo->child_relid = subroot.parse->resultRelation;
/* /*
* If this child rel was excluded by constraint exclusion, exclude it * If this child rel was excluded by constraint exclusion, exclude it
* from the result plan. * from the result plan.
...@@ -932,9 +938,40 @@ inheritance_planner(PlannerInfo *root) ...@@ -932,9 +938,40 @@ inheritance_planner(PlannerInfo *root)
if (final_rtable == NIL) if (final_rtable == NIL)
final_rtable = subroot.parse->rtable; final_rtable = subroot.parse->rtable;
else else
final_rtable = list_concat(final_rtable, {
List *tmp_rtable = NIL;
ListCell *cell1, *cell2;
/*
* Check to see if any of the original RTEs were turned into
* subqueries during planning. Currently, this should only ever
* happen due to securityQuals being involved which push a
* relation down under a subquery, to ensure that the security
* barrier quals are evaluated first.
*
* When this happens, we want to use the new subqueries in the
* final rtable.
*/
forboth(cell1, final_rtable, cell2, subroot.parse->rtable)
{
RangeTblEntry *rte1 = (RangeTblEntry *) lfirst(cell1);
RangeTblEntry *rte2 = (RangeTblEntry *) lfirst(cell2);
if (rte1->rtekind == RTE_RELATION &&
rte2->rtekind == RTE_SUBQUERY)
{
/* Should only be when there are securityQuals today */
Assert(rte1->securityQuals != NIL);
tmp_rtable = lappend(tmp_rtable, rte2);
}
else
tmp_rtable = lappend(tmp_rtable, rte1);
}
final_rtable = list_concat(tmp_rtable,
list_copy_tail(subroot.parse->rtable, list_copy_tail(subroot.parse->rtable,
list_length(final_rtable))); list_length(final_rtable)));
}
/* /*
* We need to collect all the RelOptInfos from all child plans into * We need to collect all the RelOptInfos from all child plans into
...@@ -1162,6 +1199,12 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) ...@@ -1162,6 +1199,12 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
/* Preprocess targetlist */ /* Preprocess targetlist */
tlist = preprocess_targetlist(root, tlist); tlist = preprocess_targetlist(root, tlist);
/*
* Expand any rangetable entries that have security barrier quals.
* This may add new security barrier subquery RTEs to the rangetable.
*/
expand_security_quals(root, tlist);
/* /*
* Locate any window functions in the tlist. (We don't need to look * Locate any window functions in the tlist. (We don't need to look
* anywhere else, since expressions used in ORDER BY will be in there * anywhere else, since expressions used in ORDER BY will be in there
......
...@@ -12,6 +12,6 @@ subdir = src/backend/optimizer/prep ...@@ -12,6 +12,6 @@ subdir = src/backend/optimizer/prep
top_builddir = ../../../.. top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
OBJS = prepjointree.o prepqual.o preptlist.o prepunion.o OBJS = prepjointree.o prepqual.o prepsecurity.o preptlist.o prepunion.o
include $(top_srcdir)/src/backend/common.mk include $(top_srcdir)/src/backend/common.mk
This diff is collapsed.
...@@ -55,6 +55,7 @@ typedef struct ...@@ -55,6 +55,7 @@ typedef struct
{ {
PlannerInfo *root; PlannerInfo *root;
AppendRelInfo *appinfo; AppendRelInfo *appinfo;
int sublevels_up;
} adjust_appendrel_attrs_context; } adjust_appendrel_attrs_context;
static Plan *recurse_set_operations(Node *setOp, PlannerInfo *root, static Plan *recurse_set_operations(Node *setOp, PlannerInfo *root,
...@@ -1580,8 +1581,9 @@ translate_col_privs(const Bitmapset *parent_privs, ...@@ -1580,8 +1581,9 @@ translate_col_privs(const Bitmapset *parent_privs,
* child rel instead. We also update rtindexes appearing outside Vars, * child rel instead. We also update rtindexes appearing outside Vars,
* such as resultRelation and jointree relids. * such as resultRelation and jointree relids.
* *
* Note: this is only applied after conversion of sublinks to subplans, * Note: this is applied after conversion of sublinks to subplans in the
* so we don't need to cope with recursion into sub-queries. * query jointree, but there may still be sublinks in the security barrier
* quals of RTEs, so we do need to cope with recursion into sub-queries.
* *
* Note: this is not hugely different from what pullup_replace_vars() does; * Note: this is not hugely different from what pullup_replace_vars() does;
* maybe we should try to fold the two routines together. * maybe we should try to fold the two routines together.
...@@ -1594,9 +1596,12 @@ adjust_appendrel_attrs(PlannerInfo *root, Node *node, AppendRelInfo *appinfo) ...@@ -1594,9 +1596,12 @@ adjust_appendrel_attrs(PlannerInfo *root, Node *node, AppendRelInfo *appinfo)
context.root = root; context.root = root;
context.appinfo = appinfo; context.appinfo = appinfo;
context.sublevels_up = 0;
/* /*
* Must be prepared to start with a Query or a bare expression tree. * Must be prepared to start with a Query or a bare expression tree; if
* it's a Query, go straight to query_tree_walker to make sure that
* sublevels_up doesn't get incremented prematurely.
*/ */
if (node && IsA(node, Query)) if (node && IsA(node, Query))
{ {
...@@ -1635,7 +1640,7 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1635,7 +1640,7 @@ adjust_appendrel_attrs_mutator(Node *node,
{ {
Var *var = (Var *) copyObject(node); Var *var = (Var *) copyObject(node);
if (var->varlevelsup == 0 && if (var->varlevelsup == context->sublevels_up &&
var->varno == appinfo->parent_relid) var->varno == appinfo->parent_relid)
{ {
var->varno = appinfo->child_relid; var->varno = appinfo->child_relid;
...@@ -1652,6 +1657,7 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1652,6 +1657,7 @@ adjust_appendrel_attrs_mutator(Node *node,
if (newnode == NULL) if (newnode == NULL)
elog(ERROR, "attribute %d of relation \"%s\" does not exist", elog(ERROR, "attribute %d of relation \"%s\" does not exist",
var->varattno, get_rel_name(appinfo->parent_reloid)); var->varattno, get_rel_name(appinfo->parent_reloid));
((Var *) newnode)->varlevelsup += context->sublevels_up;
return newnode; return newnode;
} }
else if (var->varattno == 0) else if (var->varattno == 0)
...@@ -1694,10 +1700,16 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1694,10 +1700,16 @@ adjust_appendrel_attrs_mutator(Node *node,
RowExpr *rowexpr; RowExpr *rowexpr;
List *fields; List *fields;
RangeTblEntry *rte; RangeTblEntry *rte;
ListCell *lc;
rte = rt_fetch(appinfo->parent_relid, rte = rt_fetch(appinfo->parent_relid,
context->root->parse->rtable); context->root->parse->rtable);
fields = (List *) copyObject(appinfo->translated_vars); fields = (List *) copyObject(appinfo->translated_vars);
foreach(lc, fields)
{
Var *field = (Var *) lfirst(lc);
field->varlevelsup += context->sublevels_up;
}
rowexpr = makeNode(RowExpr); rowexpr = makeNode(RowExpr);
rowexpr->args = fields; rowexpr->args = fields;
rowexpr->row_typeid = var->vartype; rowexpr->row_typeid = var->vartype;
...@@ -1716,7 +1728,8 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1716,7 +1728,8 @@ adjust_appendrel_attrs_mutator(Node *node,
{ {
CurrentOfExpr *cexpr = (CurrentOfExpr *) copyObject(node); CurrentOfExpr *cexpr = (CurrentOfExpr *) copyObject(node);
if (cexpr->cvarno == appinfo->parent_relid) if (context->sublevels_up == 0 &&
cexpr->cvarno == appinfo->parent_relid)
cexpr->cvarno = appinfo->child_relid; cexpr->cvarno = appinfo->child_relid;
return (Node *) cexpr; return (Node *) cexpr;
} }
...@@ -1724,7 +1737,8 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1724,7 +1737,8 @@ adjust_appendrel_attrs_mutator(Node *node,
{ {
RangeTblRef *rtr = (RangeTblRef *) copyObject(node); RangeTblRef *rtr = (RangeTblRef *) copyObject(node);
if (rtr->rtindex == appinfo->parent_relid) if (context->sublevels_up == 0 &&
rtr->rtindex == appinfo->parent_relid)
rtr->rtindex = appinfo->child_relid; rtr->rtindex = appinfo->child_relid;
return (Node *) rtr; return (Node *) rtr;
} }
...@@ -1737,7 +1751,8 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1737,7 +1751,8 @@ adjust_appendrel_attrs_mutator(Node *node,
adjust_appendrel_attrs_mutator, adjust_appendrel_attrs_mutator,
(void *) context); (void *) context);
/* now fix JoinExpr's rtindex (probably never happens) */ /* now fix JoinExpr's rtindex (probably never happens) */
if (j->rtindex == appinfo->parent_relid) if (context->sublevels_up == 0 &&
j->rtindex == appinfo->parent_relid)
j->rtindex = appinfo->child_relid; j->rtindex = appinfo->child_relid;
return (Node *) j; return (Node *) j;
} }
...@@ -1750,7 +1765,7 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1750,7 +1765,7 @@ adjust_appendrel_attrs_mutator(Node *node,
adjust_appendrel_attrs_mutator, adjust_appendrel_attrs_mutator,
(void *) context); (void *) context);
/* now fix PlaceHolderVar's relid sets */ /* now fix PlaceHolderVar's relid sets */
if (phv->phlevelsup == 0) if (phv->phlevelsup == context->sublevels_up)
phv->phrels = adjust_relid_set(phv->phrels, phv->phrels = adjust_relid_set(phv->phrels,
appinfo->parent_relid, appinfo->parent_relid,
appinfo->child_relid); appinfo->child_relid);
...@@ -1822,12 +1837,29 @@ adjust_appendrel_attrs_mutator(Node *node, ...@@ -1822,12 +1837,29 @@ adjust_appendrel_attrs_mutator(Node *node,
return (Node *) newinfo; return (Node *) newinfo;
} }
/* if (IsA(node, Query))
* NOTE: we do not need to recurse into sublinks, because they should {
* already have been converted to subplans before we see them. /*
*/ * Recurse into sublink subqueries. This should only be possible in
Assert(!IsA(node, SubLink)); * security barrier quals of top-level RTEs. All other sublinks should
Assert(!IsA(node, Query)); * have already been converted to subplans during expression
* preprocessing, but this doesn't happen for security barrier quals,
* since they are destined to become quals of a subquery RTE, which
* will be recursively planned, and so should not be preprocessed at
* this stage.
*
* We don't explicitly Assert() for securityQuals here simply because
* it's not trivial to do so.
*/
Query *newnode;
context->sublevels_up++;
newnode = query_tree_mutator((Query *) node,
adjust_appendrel_attrs_mutator,
(void *) context, 0);
context->sublevels_up--;
return (Node *) newnode;
}
return expression_tree_mutator(node, adjust_appendrel_attrs_mutator, return expression_tree_mutator(node, adjust_appendrel_attrs_mutator,
(void *) context); (void *) context);
......
...@@ -2023,8 +2023,7 @@ view_col_is_auto_updatable(RangeTblRef *rtr, TargetEntry *tle) ...@@ -2023,8 +2023,7 @@ view_col_is_auto_updatable(RangeTblRef *rtr, TargetEntry *tle)
* updatable. * updatable.
*/ */
const char * const char *
view_query_is_auto_updatable(Query *viewquery, bool security_barrier, view_query_is_auto_updatable(Query *viewquery, bool check_cols)
bool check_cols)
{ {
RangeTblRef *rtr; RangeTblRef *rtr;
RangeTblEntry *base_rte; RangeTblEntry *base_rte;
...@@ -2097,14 +2096,6 @@ view_query_is_auto_updatable(Query *viewquery, bool security_barrier, ...@@ -2097,14 +2096,6 @@ view_query_is_auto_updatable(Query *viewquery, bool security_barrier,
if (expression_returns_set((Node *) viewquery->targetList)) if (expression_returns_set((Node *) viewquery->targetList))
return gettext_noop("Views that return set-returning functions are not automatically updatable."); return gettext_noop("Views that return set-returning functions are not automatically updatable.");
/*
* For now, we also don't support security-barrier views, because of the
* difficulty of keeping upper-level qual expressions away from
* lower-level data. This might get relaxed in the future.
*/
if (security_barrier)
return gettext_noop("Security-barrier views are not automatically updatable.");
/* /*
* The view query should select from a single base relation, which must be * The view query should select from a single base relation, which must be
* a table or another view. * a table or another view.
...@@ -2353,9 +2344,7 @@ relation_is_updatable(Oid reloid, ...@@ -2353,9 +2344,7 @@ relation_is_updatable(Oid reloid,
{ {
Query *viewquery = get_view_query(rel); Query *viewquery = get_view_query(rel);
if (view_query_is_auto_updatable(viewquery, if (view_query_is_auto_updatable(viewquery, false) == NULL)
RelationIsSecurityView(rel),
false) == NULL)
{ {
Bitmapset *updatable_cols; Bitmapset *updatable_cols;
int auto_events; int auto_events;
...@@ -2510,7 +2499,6 @@ rewriteTargetView(Query *parsetree, Relation view) ...@@ -2510,7 +2499,6 @@ rewriteTargetView(Query *parsetree, Relation view)
auto_update_detail = auto_update_detail =
view_query_is_auto_updatable(viewquery, view_query_is_auto_updatable(viewquery,
RelationIsSecurityView(view),
parsetree->commandType != CMD_DELETE); parsetree->commandType != CMD_DELETE);
if (auto_update_detail) if (auto_update_detail)
...@@ -2713,6 +2701,14 @@ rewriteTargetView(Query *parsetree, Relation view) ...@@ -2713,6 +2701,14 @@ rewriteTargetView(Query *parsetree, Relation view)
new_rte->modifiedCols = adjust_view_column_set(view_rte->modifiedCols, new_rte->modifiedCols = adjust_view_column_set(view_rte->modifiedCols,
view_targetlist); view_targetlist);
/*
* Move any security barrier quals from the view RTE onto the new target
* RTE. Any such quals should now apply to the new target RTE and will not
* reference the original view RTE in the rewritten query.
*/
new_rte->securityQuals = view_rte->securityQuals;
view_rte->securityQuals = NIL;
/* /*
* For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk
* TLE for the view to the end of the targetlist, which we no longer need. * TLE for the view to the end of the targetlist, which we no longer need.
...@@ -2793,6 +2789,10 @@ rewriteTargetView(Query *parsetree, Relation view) ...@@ -2793,6 +2789,10 @@ rewriteTargetView(Query *parsetree, Relation view)
* only adjust their varnos to reference the new target (just the same as * only adjust their varnos to reference the new target (just the same as
* we did with the view targetlist). * we did with the view targetlist).
* *
* Note that there is special-case handling for the quals of a security
* barrier view, since they need to be kept separate from any user-supplied
* quals, so these quals are kept on the new target RTE.
*
* For INSERT, the view's quals can be ignored in the main query. * For INSERT, the view's quals can be ignored in the main query.
*/ */
if (parsetree->commandType != CMD_INSERT && if (parsetree->commandType != CMD_INSERT &&
...@@ -2801,7 +2801,25 @@ rewriteTargetView(Query *parsetree, Relation view) ...@@ -2801,7 +2801,25 @@ rewriteTargetView(Query *parsetree, Relation view)
Node *viewqual = (Node *) copyObject(viewquery->jointree->quals); Node *viewqual = (Node *) copyObject(viewquery->jointree->quals);
ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0); ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0);
AddQual(parsetree, (Node *) viewqual);
if (RelationIsSecurityView(view))
{
/*
* Note: the parsetree has been mutated, so the new_rte pointer is
* stale and needs to be re-computed.
*/
new_rte = rt_fetch(new_rt_index, parsetree->rtable);
new_rte->securityQuals = lcons(viewqual, new_rte->securityQuals);
/*
* Make sure that the query is marked correctly if the added qual
* has sublinks.
*/
if (!parsetree->hasSubLinks)
parsetree->hasSubLinks = checkExprHasSubLink(viewqual);
}
else
AddQual(parsetree, (Node *) viewqual);
} }
/* /*
...@@ -2863,9 +2881,8 @@ rewriteTargetView(Query *parsetree, Relation view) ...@@ -2863,9 +2881,8 @@ rewriteTargetView(Query *parsetree, Relation view)
* Make sure that the query is marked correctly if the added * Make sure that the query is marked correctly if the added
* qual has sublinks. We can skip this check if the query is * qual has sublinks. We can skip this check if the query is
* already marked, or if the command is an UPDATE, in which * already marked, or if the command is an UPDATE, in which
* case the same qual will have already been added to the * case the same qual will have already been added, and this
* query's WHERE clause, and AddQual will have already done * check will already have been done.
* this check.
*/ */
if (!parsetree->hasSubLinks && if (!parsetree->hasSubLinks &&
parsetree->commandType != CMD_UPDATE) parsetree->commandType != CMD_UPDATE)
......
...@@ -801,6 +801,7 @@ typedef struct RangeTblEntry ...@@ -801,6 +801,7 @@ typedef struct RangeTblEntry
Oid checkAsUser; /* if valid, check access as this role */ Oid checkAsUser; /* if valid, check access as this role */
Bitmapset *selectedCols; /* columns needing SELECT permission */ Bitmapset *selectedCols; /* columns needing SELECT permission */
Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */ Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */
List *securityQuals; /* any security barrier quals to apply */
} RangeTblEntry; } RangeTblEntry;
/* /*
......
...@@ -35,6 +35,11 @@ extern Relids get_relids_for_join(PlannerInfo *root, int joinrelid); ...@@ -35,6 +35,11 @@ extern Relids get_relids_for_join(PlannerInfo *root, int joinrelid);
extern Node *negate_clause(Node *node); extern Node *negate_clause(Node *node);
extern Expr *canonicalize_qual(Expr *qual); extern Expr *canonicalize_qual(Expr *qual);
/*
* prototypes for prepsecurity.c
*/
extern void expand_security_quals(PlannerInfo *root, List *tlist);
/* /*
* prototypes for preptlist.c * prototypes for preptlist.c
*/ */
......
...@@ -25,7 +25,6 @@ extern void AcquireRewriteLocks(Query *parsetree, ...@@ -25,7 +25,6 @@ extern void AcquireRewriteLocks(Query *parsetree,
extern Node *build_column_default(Relation rel, int attrno); extern Node *build_column_default(Relation rel, int attrno);
extern Query *get_view_query(Relation view); extern Query *get_view_query(Relation view);
extern const char *view_query_is_auto_updatable(Query *viewquery, extern const char *view_query_is_auto_updatable(Query *viewquery,
bool security_barrier,
bool check_cols); bool check_cols);
extern int relation_is_updatable(Oid reloid, extern int relation_is_updatable(Oid reloid,
bool include_triggers, bool include_triggers,
......
...@@ -252,7 +252,7 @@ CREATE VIEW mysecview4 WITH (security_barrier) ...@@ -252,7 +252,7 @@ CREATE VIEW mysecview4 WITH (security_barrier)
AS SELECT * FROM tbl1 WHERE a <> 0; AS SELECT * FROM tbl1 WHERE a <> 0;
CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error
AS SELECT * FROM tbl1 WHERE a > 100; AS SELECT * FROM tbl1 WHERE a > 100;
ERROR: security_barrier requires a Boolean value ERROR: invalid value for boolean option "security_barrier": 100
CREATE VIEW mysecview6 WITH (invalid_option) -- Error CREATE VIEW mysecview6 WITH (invalid_option) -- Error
AS SELECT * FROM tbl1 WHERE a < 100; AS SELECT * FROM tbl1 WHERE a < 100;
ERROR: unrecognized parameter "invalid_option" ERROR: unrecognized parameter "invalid_option"
......
...@@ -25,12 +25,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may ...@@ -25,12 +25,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may
CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view
CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view
CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable
CREATE VIEW ro_view18 WITH (security_barrier = true) CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
AS SELECT * FROM base_tbl; -- Security barrier views not updatable
CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
CREATE SEQUENCE seq; CREATE SEQUENCE seq;
CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence
CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported
SELECT table_name, is_insertable_into SELECT table_name, is_insertable_into
FROM information_schema.tables FROM information_schema.tables
...@@ -87,13 +85,12 @@ SELECT * FROM base_tbl; ...@@ -87,13 +85,12 @@ SELECT * FROM base_tbl;
DELETE FROM rw_view16 WHERE a=-3; -- should be OK DELETE FROM rw_view16 WHERE a=-3; -- should be OK
-- Read-only views -- Read-only views
INSERT INTO ro_view17 VALUES (3, 'ROW 3'); INSERT INTO ro_view17 VALUES (3, 'ROW 3');
INSERT INTO ro_view18 VALUES (3, 'ROW 3'); DELETE FROM ro_view18;
DELETE FROM ro_view19; UPDATE ro_view19 SET max_value=1000;
UPDATE ro_view20 SET max_value=1000; UPDATE ro_view20 SET b=upper(b);
UPDATE ro_view21 SET b=upper(b);
DROP TABLE base_tbl CASCADE; DROP TABLE base_tbl CASCADE;
DROP VIEW ro_view10, ro_view12, ro_view19; DROP VIEW ro_view10, ro_view12, ro_view18;
DROP SEQUENCE seq CASCADE; DROP SEQUENCE seq CASCADE;
-- simple updatable view -- simple updatable view
...@@ -828,3 +825,166 @@ CREATE VIEW rw_view2 AS ...@@ -828,3 +825,166 @@ CREATE VIEW rw_view2 AS
SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION; SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION;
INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check) INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check)
DROP TABLE base_tbl CASCADE; DROP TABLE base_tbl CASCADE;
-- security barrier view
CREATE TABLE base_tbl (person text, visibility text);
INSERT INTO base_tbl VALUES ('Tom', 'public'),
('Dick', 'private'),
('Harry', 'public');
CREATE VIEW rw_view1 AS
SELECT person FROM base_tbl WHERE visibility = 'public';
CREATE FUNCTION snoop(anyelement)
RETURNS boolean AS
$$
BEGIN
RAISE NOTICE 'snooped value: %', $1;
RETURN true;
END;
$$
LANGUAGE plpgsql COST 0.000001;
CREATE OR REPLACE FUNCTION leakproof(anyelement)
RETURNS boolean AS
$$
BEGIN
RETURN true;
END;
$$
LANGUAGE plpgsql STRICT IMMUTABLE LEAKPROOF;
SELECT * FROM rw_view1 WHERE snoop(person);
UPDATE rw_view1 SET person=person WHERE snoop(person);
DELETE FROM rw_view1 WHERE NOT snoop(person);
ALTER VIEW rw_view1 SET (security_barrier = true);
SELECT table_name, is_insertable_into
FROM information_schema.tables
WHERE table_name = 'rw_view1';
SELECT table_name, is_updatable, is_insertable_into
FROM information_schema.views
WHERE table_name = 'rw_view1';
SELECT table_name, column_name, is_updatable
FROM information_schema.columns
WHERE table_name = 'rw_view1'
ORDER BY ordinal_position;
SELECT * FROM rw_view1 WHERE snoop(person);
UPDATE rw_view1 SET person=person WHERE snoop(person);
DELETE FROM rw_view1 WHERE NOT snoop(person);
EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person);
EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person);
EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person);
-- security barrier view on top of security barrier view
CREATE VIEW rw_view2 WITH (security_barrier = true) AS
SELECT * FROM rw_view1 WHERE snoop(person);
SELECT table_name, is_insertable_into
FROM information_schema.tables
WHERE table_name = 'rw_view2';
SELECT table_name, is_updatable, is_insertable_into
FROM information_schema.views
WHERE table_name = 'rw_view2';
SELECT table_name, column_name, is_updatable
FROM information_schema.columns
WHERE table_name = 'rw_view2'
ORDER BY ordinal_position;
SELECT * FROM rw_view2 WHERE snoop(person);
UPDATE rw_view2 SET person=person WHERE snoop(person);
DELETE FROM rw_view2 WHERE NOT snoop(person);
EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person);
EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person);
EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person);
DROP TABLE base_tbl CASCADE;
-- security barrier view on top of table with rules
CREATE TABLE base_tbl(id int PRIMARY KEY, data text, deleted boolean);
INSERT INTO base_tbl VALUES (1, 'Row 1', false), (2, 'Row 2', true);
CREATE RULE base_tbl_ins_rule AS ON INSERT TO base_tbl
WHERE EXISTS (SELECT 1 FROM base_tbl t WHERE t.id = new.id)
DO INSTEAD
UPDATE base_tbl SET data = new.data, deleted = false WHERE id = new.id;
CREATE RULE base_tbl_del_rule AS ON DELETE TO base_tbl
DO INSTEAD
UPDATE base_tbl SET deleted = true WHERE id = old.id;
CREATE VIEW rw_view1 WITH (security_barrier=true) AS
SELECT id, data FROM base_tbl WHERE NOT deleted;
SELECT * FROM rw_view1;
EXPLAIN (costs off) DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (2, 'New row 2');
INSERT INTO rw_view1 VALUES (2, 'New row 2');
SELECT * FROM base_tbl;
DROP TABLE base_tbl CASCADE;
-- security barrier view based on inheiritance set
CREATE TABLE t1 (a int, b float, c text);
CREATE INDEX t1_a_idx ON t1(a);
INSERT INTO t1
SELECT i,i,'t1' FROM generate_series(1,10) g(i);
CREATE TABLE t11 (d text) INHERITS (t1);
CREATE INDEX t11_a_idx ON t11(a);
INSERT INTO t11
SELECT i,i,'t11','t11d' FROM generate_series(1,10) g(i);
CREATE TABLE t12 (e int[]) INHERITS (t1);
CREATE INDEX t12_a_idx ON t12(a);
INSERT INTO t12
SELECT i,i,'t12','{1,2}'::int[] FROM generate_series(1,10) g(i);
CREATE TABLE t111 () INHERITS (t11, t12);
CREATE INDEX t111_a_idx ON t111(a);
INSERT INTO t111
SELECT i,i,'t111','t111d','{1,1,1}'::int[] FROM generate_series(1,10) g(i);
CREATE VIEW v1 WITH (security_barrier=true) AS
SELECT *, (SELECT d FROM t11 WHERE t11.a = t1.a LIMIT 1) AS d
FROM t1
WHERE a > 5 AND EXISTS(SELECT 1 FROM t12 WHERE t12.a = t1.a);
SELECT * FROM v1 WHERE a=3; -- should not see anything
SELECT * FROM v1 WHERE a=8;
EXPLAIN (VERBOSE, COSTS OFF)
UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
SELECT * FROM v1 WHERE a=100; -- Nothing should have been changed to 100
SELECT * FROM t1 WHERE a=100; -- Nothing should have been changed to 100
EXPLAIN (VERBOSE, COSTS OFF)
UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
SELECT * FROM v1 WHERE b=8;
DELETE FROM v1 WHERE snoop(a) AND leakproof(a); -- should not delete everything, just where a>5
TABLE t1; -- verify all a<=5 are intact
DROP TABLE t1, t11, t12, t111 CASCADE;
DROP FUNCTION snoop(anyelement);
DROP FUNCTION leakproof(anyelement);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment