Commit 0e4611c0 authored by Robert Haas's avatar Robert Haas

Add a security_barrier option for views.

When a view is marked as a security barrier, it will not be pulled up
into the containing query, and no quals will be pushed down into it,
so that no function or operator chosen by the user can be applied to
rows not exposed by the view.  Views not configured with this
option cannot provide robust row-level security, but will perform far
better.

Patch by KaiGai Kohei; original problem report by Heikki Linnakangas
(in October 2009!).  Review (in earlier versions) by Noah Misch and
others.  Design advice by Tom Lane and myself.  Further review and
cleanup by me.
parent f90dd280
...@@ -26,6 +26,8 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> ALTER [ COLUMN ] <r ...@@ -26,6 +26,8 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> ALTER [ COLUMN ] <r
ALTER VIEW <replaceable class="parameter">name</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable> ALTER VIEW <replaceable class="parameter">name</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
ALTER VIEW <replaceable class="parameter">name</replaceable> RENAME TO <replaceable class="parameter">new_name</replaceable> ALTER VIEW <replaceable class="parameter">name</replaceable> RENAME TO <replaceable class="parameter">new_name</replaceable>
ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">new_schema</replaceable> ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
ALTER VIEW <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">view_option_name</replaceable> [= <replaceable class="parameter">view_option_value</replaceable>] [, ... ] )
ALTER VIEW <replaceable class="parameter">name</replaceable> RESET ( <replaceable class="parameter">view_option_name</replaceable> [, ... ] )
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -102,6 +104,24 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replace ...@@ -102,6 +104,24 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replace
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><replaceable class="parameter">view_option_name</replaceable></term>
<listitem>
<para>
The name of a view option to be set or reset.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">view_option_name</replaceable></term>
<listitem>
<para>
The new value for a view option.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</refsect1> </refsect1>
......
...@@ -22,6 +22,7 @@ PostgreSQL documentation ...@@ -22,6 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">name</replaceable> [ ( <replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ] CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">name</replaceable> [ ( <replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ]
[ WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] ) ]
AS <replaceable class="PARAMETER">query</replaceable> AS <replaceable class="PARAMETER">query</replaceable>
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
...@@ -98,6 +99,18 @@ CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">n ...@@ -98,6 +99,18 @@ CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">n
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] )</literal></term>
<listitem>
<para>
This clause specifies optional parameters for a view; currently, the
only suppored parameter name is <literal>security_barrier</literal>,
which should be enabled when a view is intended to provide row-level
security. See <xref linkend="rules-privileges"> for full details.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><replaceable class="parameter">query</replaceable></term> <term><replaceable class="parameter">query</replaceable></term>
<listitem> <listitem>
......
...@@ -1822,8 +1822,9 @@ GRANT SELECT ON phone_number TO secretary; ...@@ -1822,8 +1822,9 @@ GRANT SELECT ON phone_number TO secretary;
<para> <para>
Note that while views can be used to hide the contents of certain Note that while views can be used to hide the contents of certain
columns using the technique shown above, they cannot be used to reliably columns using the technique shown above, they cannot be used to reliably
conceal the data in unseen rows. For example, the following view is conceal the data in unseen rows unless the
insecure: <literal>security_barrier</literal> flag has been set. For example,
the following view is insecure:
<programlisting> <programlisting>
CREATE VIEW phone_number AS CREATE VIEW phone_number AS
SELECT person, phone FROM phone_data WHERE phone NOT LIKE '412%'; SELECT person, phone FROM phone_data WHERE phone NOT LIKE '412%';
...@@ -1870,6 +1871,40 @@ SELECT * FROM phone_number WHERE tricky(person, phone); ...@@ -1870,6 +1871,40 @@ SELECT * FROM phone_number WHERE tricky(person, phone);
which references <literal>shoelace_log</> is an unqualified which references <literal>shoelace_log</> is an unqualified
<literal>INSERT</>. This might not be true in more complex scenarios. <literal>INSERT</>. This might not be true in more complex scenarios.
</para> </para>
<para>
When it is necessary for a view to provide row-level security, the
<literal>security_barrier</literal> attribute should be applied to
the view. This prevents maliciously-chosen functions and operators from
being invoked on rows until afterthe view has done its work. For
example, if the view shown above had been created like this, it would
be secure:
<programlisting>
CREATE VIEW phone_number WITH (security_barrier) AS
SELECT person, phone FROM phone_data WHERE phone NOT LIKE '412%';
</programlisting>
Views created with the <literal>security_barrier</literal> may perform
far worse than views created without this option. In general, there is
no way to avoid this: the fastest possible plan must be rejected
if it may compromise security. For this reason, this option is not
enabled by default.
</para>
<para>
It is important to understand that even a view created with the
<literal>security_barrier</literal> option is intended to be secure only
in the limited sense that the contents of the invisible tuples will not
passed to possibly-insecure functions. The user may well have other means
of making inferences about the unseen data; for example, they can see the
query plan using <command>EXPLAIN</command>, or measure the runtime of
queries against the view. A malicious attacker might be able to infer
something about the amount of unseen data, or even gain some information
about the data distribution or most common values (since these things may
affect the runtime of the plan; or even, since they are also reflected in
the optimizer statistics, the choice of plan). If these types of "covert
channel" attacks are of concern, it is probably unwise to grant any access
to the data at all.
</para>
</sect1> </sect1>
<sect1 id="rules-status"> <sect1 id="rules-status">
......
...@@ -67,6 +67,14 @@ static relopt_bool boolRelOpts[] = ...@@ -67,6 +67,14 @@ static relopt_bool boolRelOpts[] =
}, },
true true
}, },
{
{
"security_barrier",
"View acts as a row security barrier",
RELOPT_KIND_VIEW
},
false
},
/* list terminator */ /* list terminator */
{{NULL}} {{NULL}}
}; };
...@@ -781,6 +789,7 @@ extractRelOptions(HeapTuple tuple, TupleDesc tupdesc, Oid amoptions) ...@@ -781,6 +789,7 @@ extractRelOptions(HeapTuple tuple, TupleDesc tupdesc, Oid amoptions)
{ {
case RELKIND_RELATION: case RELKIND_RELATION:
case RELKIND_TOASTVALUE: case RELKIND_TOASTVALUE:
case RELKIND_VIEW:
case RELKIND_UNCATALOGED: case RELKIND_UNCATALOGED:
options = heap_reloptions(classForm->relkind, datum, false); options = heap_reloptions(classForm->relkind, datum, false);
break; break;
...@@ -1139,7 +1148,9 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) ...@@ -1139,7 +1148,9 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
{"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL, {"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL,
offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, vacuum_scale_factor)}, offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, vacuum_scale_factor)},
{"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL, {"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL,
offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, analyze_scale_factor)} offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, analyze_scale_factor)},
{"security_barrier", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, security_barrier)},
}; };
options = parseRelOptions(reloptions, validate, kind, &numoptions); options = parseRelOptions(reloptions, validate, kind, &numoptions);
...@@ -1159,7 +1170,7 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) ...@@ -1159,7 +1170,7 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
} }
/* /*
* Parse options for heaps and toast tables. * Parse options for heaps, views and toast tables.
*/ */
bytea * bytea *
heap_reloptions(char relkind, Datum reloptions, bool validate) heap_reloptions(char relkind, Datum reloptions, bool validate)
...@@ -1181,6 +1192,8 @@ heap_reloptions(char relkind, Datum reloptions, bool validate) ...@@ -1181,6 +1192,8 @@ heap_reloptions(char relkind, Datum reloptions, bool validate)
return (bytea *) rdopts; return (bytea *) rdopts;
case RELKIND_RELATION: case RELKIND_RELATION:
return default_reloptions(reloptions, validate, RELOPT_KIND_HEAP); return default_reloptions(reloptions, validate, RELOPT_KIND_HEAP);
case RELKIND_VIEW:
return default_reloptions(reloptions, validate, RELOPT_KIND_VIEW);
default: default:
/* other relkinds are not supported */ /* other relkinds are not supported */
return NULL; return NULL;
......
...@@ -366,7 +366,9 @@ static void ATExecDropCluster(Relation rel, LOCKMODE lockmode); ...@@ -366,7 +366,9 @@ static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
char *tablespacename, LOCKMODE lockmode); char *tablespacename, LOCKMODE lockmode);
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode); static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode);
static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode); static void ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation,
LOCKMODE lockmode);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname, static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
char fires_when, bool skip_system, LOCKMODE lockmode); char fires_when, bool skip_system, LOCKMODE lockmode);
static void ATExecEnableDisableRule(Relation rel, char *rulename, static void ATExecEnableDisableRule(Relation rel, char *rulename,
...@@ -2866,6 +2868,7 @@ AlterTableGetLockLevel(List *cmds) ...@@ -2866,6 +2868,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DropCluster: case AT_DropCluster:
case AT_SetRelOptions: case AT_SetRelOptions:
case AT_ResetRelOptions: case AT_ResetRelOptions:
case AT_ReplaceRelOptions:
case AT_SetOptions: case AT_SetOptions:
case AT_ResetOptions: case AT_ResetOptions:
case AT_SetStorage: case AT_SetStorage:
...@@ -3095,7 +3098,8 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -3095,7 +3098,8 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break; break;
case AT_SetRelOptions: /* SET (...) */ case AT_SetRelOptions: /* SET (...) */
case AT_ResetRelOptions: /* RESET (...) */ case AT_ResetRelOptions: /* RESET (...) */
ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX); case AT_ReplaceRelOptions: /* reset them all, then set just these */
ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX | ATT_VIEW);
/* This command never recurses */ /* This command never recurses */
/* No command-specific prep needed */ /* No command-specific prep needed */
pass = AT_PASS_MISC; pass = AT_PASS_MISC;
...@@ -3338,12 +3342,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -3338,12 +3342,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/ */
break; break;
case AT_SetRelOptions: /* SET (...) */ case AT_SetRelOptions: /* SET (...) */
ATExecSetRelOptions(rel, (List *) cmd->def, false, lockmode);
break;
case AT_ResetRelOptions: /* RESET (...) */ case AT_ResetRelOptions: /* RESET (...) */
ATExecSetRelOptions(rel, (List *) cmd->def, true, lockmode); case AT_ReplaceRelOptions: /* replace entire option list */
ATExecSetRelOptions(rel, (List *) cmd->def, cmd->subtype, lockmode);
break; break;
case AT_EnableTrig: /* ENABLE TRIGGER name */ case AT_EnableTrig: /* ENABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ON_ORIGIN, false, lockmode); TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
...@@ -8271,10 +8273,11 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename, L ...@@ -8271,10 +8273,11 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename, L
} }
/* /*
* ALTER TABLE/INDEX SET (...) or RESET (...) * Set, reset, or replace reloptions.
*/ */
static void static void
ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode) ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
LOCKMODE lockmode)
{ {
Oid relid; Oid relid;
Relation pgclass; Relation pgclass;
...@@ -8288,28 +8291,44 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode ...@@ -8288,28 +8291,44 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode
bool repl_repl[Natts_pg_class]; bool repl_repl[Natts_pg_class];
static char *validnsps[] = HEAP_RELOPT_NAMESPACES; static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
if (defList == NIL) if (defList == NIL && operation != AT_ReplaceRelOptions)
return; /* nothing to do */ return; /* nothing to do */
pgclass = heap_open(RelationRelationId, RowExclusiveLock); pgclass = heap_open(RelationRelationId, RowExclusiveLock);
/* Get the old reloptions */ /* Fetch heap tuple */
relid = RelationGetRelid(rel); relid = RelationGetRelid(rel);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid); elog(ERROR, "cache lookup failed for relation %u", relid);
datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull); if (operation == AT_ReplaceRelOptions)
{
/*
* If we're supposed to replace the reloptions list, we just pretend
* there were none before.
*/
datum = (Datum) 0;
isnull = true;
}
else
{
/* Get the old reloptions */
datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
&isnull);
}
/* Generate new proposed reloptions (text array) */ /* Generate new proposed reloptions (text array) */
newOptions = transformRelOptions(isnull ? (Datum) 0 : datum, newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
defList, NULL, validnsps, false, isReset); defList, NULL, validnsps, false,
operation == AT_ResetRelOptions);
/* Validate */ /* Validate */
switch (rel->rd_rel->relkind) switch (rel->rd_rel->relkind)
{ {
case RELKIND_RELATION: case RELKIND_RELATION:
case RELKIND_TOASTVALUE: case RELKIND_TOASTVALUE:
case RELKIND_VIEW:
(void) heap_reloptions(rel->rd_rel->relkind, newOptions, true); (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true);
break; break;
case RELKIND_INDEX: case RELKIND_INDEX:
...@@ -8357,15 +8376,30 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode ...@@ -8357,15 +8376,30 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode
toastrel = heap_open(toastid, lockmode); toastrel = heap_open(toastid, lockmode);
/* Get the old reloptions */ /* Fetch heap tuple */
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid)); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", toastid); elog(ERROR, "cache lookup failed for relation %u", toastid);
datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull); if (operation == AT_ReplaceRelOptions)
{
/*
* If we're supposed to replace the reloptions list, we just
* pretend there were none before.
*/
datum = (Datum) 0;
isnull = true;
}
else
{
/* Get the old reloptions */
datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
&isnull);
}
newOptions = transformRelOptions(isnull ? (Datum) 0 : datum, newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
defList, "toast", validnsps, false, isReset); defList, "toast", validnsps, false,
operation == AT_ResetRelOptions);
(void) heap_reloptions(RELKIND_TOASTVALUE, newOptions, true); (void) heap_reloptions(RELKIND_TOASTVALUE, newOptions, true);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h"
static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc); static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
...@@ -98,7 +99,7 @@ isViewOnTempTable_walker(Node *node, void *context) ...@@ -98,7 +99,7 @@ isViewOnTempTable_walker(Node *node, void *context)
*/ */
static Oid static Oid
DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
Oid namespaceId) Oid namespaceId, List *options)
{ {
Oid viewOid; Oid viewOid;
CreateStmt *createStmt = makeNode(CreateStmt); CreateStmt *createStmt = makeNode(CreateStmt);
...@@ -166,6 +167,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, ...@@ -166,6 +167,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
{ {
Relation rel; Relation rel;
TupleDesc descriptor; TupleDesc descriptor;
List *atcmds = NIL;
AlterTableCmd *atcmd;
/* /*
* Yes. Get exclusive lock on the existing view ... * Yes. Get exclusive lock on the existing view ...
...@@ -203,6 +206,15 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, ...@@ -203,6 +206,15 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
descriptor = BuildDescForRelation(attrList); descriptor = BuildDescForRelation(attrList);
checkViewTupleDesc(descriptor, rel->rd_att); checkViewTupleDesc(descriptor, rel->rd_att);
/*
* The new options list replaces the existing options list, even
* if it's empty.
*/
atcmd = makeNode(AlterTableCmd);
atcmd->subtype = AT_ReplaceRelOptions;
atcmd->def = (Node *) options;
atcmds = lappend(atcmds, atcmd);
/* /*
* If new attributes have been added, we must add pg_attribute entries * If new attributes have been added, we must add pg_attribute entries
* for them. It is convenient (although overkill) to use the ALTER * for them. It is convenient (although overkill) to use the ALTER
...@@ -210,14 +222,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, ...@@ -210,14 +222,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
*/ */
if (list_length(attrList) > rel->rd_att->natts) if (list_length(attrList) > rel->rd_att->natts)
{ {
List *atcmds = NIL;
ListCell *c; ListCell *c;
int skip = rel->rd_att->natts; int skip = rel->rd_att->natts;
foreach(c, attrList) foreach(c, attrList)
{ {
AlterTableCmd *atcmd;
if (skip > 0) if (skip > 0)
{ {
skip--; skip--;
...@@ -228,9 +237,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, ...@@ -228,9 +237,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
atcmd->def = (Node *) lfirst(c); atcmd->def = (Node *) lfirst(c);
atcmds = lappend(atcmds, atcmd); atcmds = lappend(atcmds, atcmd);
} }
AlterTableInternal(viewOid, atcmds, true);
} }
/* OK, let's do it. */
AlterTableInternal(viewOid, atcmds, true);
/* /*
* Seems okay, so return the OID of the pre-existing view. * Seems okay, so return the OID of the pre-existing view.
*/ */
...@@ -250,7 +261,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, ...@@ -250,7 +261,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
createStmt->tableElts = attrList; createStmt->tableElts = attrList;
createStmt->inhRelations = NIL; createStmt->inhRelations = NIL;
createStmt->constraints = NIL; createStmt->constraints = NIL;
createStmt->options = list_make1(defWithOids(false)); createStmt->options = options;
createStmt->options = lappend(options, defWithOids(false));
createStmt->oncommit = ONCOMMIT_NOOP; createStmt->oncommit = ONCOMMIT_NOOP;
createStmt->tablespacename = NULL; createStmt->tablespacename = NULL;
createStmt->if_not_exists = false; createStmt->if_not_exists = false;
...@@ -513,7 +525,7 @@ DefineView(ViewStmt *stmt, const char *queryString) ...@@ -513,7 +525,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
* aborted. * aborted.
*/ */
viewOid = DefineVirtualRelation(view, viewParse->targetList, viewOid = DefineVirtualRelation(view, viewParse->targetList,
stmt->replace, namespaceId); stmt->replace, namespaceId, stmt->options);
/* /*
* The relation we have just created is not visible to any other commands * The relation we have just created is not visible to any other commands
......
...@@ -1965,6 +1965,7 @@ _copyRangeTblEntry(const RangeTblEntry *from) ...@@ -1965,6 +1965,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
COPY_SCALAR_FIELD(relid); COPY_SCALAR_FIELD(relid);
COPY_SCALAR_FIELD(relkind); COPY_SCALAR_FIELD(relkind);
COPY_NODE_FIELD(subquery); COPY_NODE_FIELD(subquery);
COPY_SCALAR_FIELD(security_barrier);
COPY_SCALAR_FIELD(jointype); COPY_SCALAR_FIELD(jointype);
COPY_NODE_FIELD(joinaliasvars); COPY_NODE_FIELD(joinaliasvars);
COPY_NODE_FIELD(funcexpr); COPY_NODE_FIELD(funcexpr);
......
...@@ -2228,6 +2228,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b) ...@@ -2228,6 +2228,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
COMPARE_SCALAR_FIELD(relid); COMPARE_SCALAR_FIELD(relid);
COMPARE_SCALAR_FIELD(relkind); COMPARE_SCALAR_FIELD(relkind);
COMPARE_NODE_FIELD(subquery); COMPARE_NODE_FIELD(subquery);
COMPARE_SCALAR_FIELD(security_barrier);
COMPARE_SCALAR_FIELD(jointype); COMPARE_SCALAR_FIELD(jointype);
COMPARE_NODE_FIELD(joinaliasvars); COMPARE_NODE_FIELD(joinaliasvars);
COMPARE_NODE_FIELD(funcexpr); COMPARE_NODE_FIELD(funcexpr);
......
...@@ -2321,6 +2321,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node) ...@@ -2321,6 +2321,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
break; break;
case RTE_SUBQUERY: case RTE_SUBQUERY:
WRITE_NODE_FIELD(subquery); WRITE_NODE_FIELD(subquery);
WRITE_BOOL_FIELD(security_barrier);
break; break;
case RTE_JOIN: case RTE_JOIN:
WRITE_ENUM_FIELD(jointype, JoinType); WRITE_ENUM_FIELD(jointype, JoinType);
......
...@@ -1192,6 +1192,7 @@ _readRangeTblEntry(void) ...@@ -1192,6 +1192,7 @@ _readRangeTblEntry(void)
break; break;
case RTE_SUBQUERY: case RTE_SUBQUERY:
READ_NODE_FIELD(subquery); READ_NODE_FIELD(subquery);
READ_BOOL_FIELD(security_barrier);
break; break;
case RTE_JOIN: case RTE_JOIN:
READ_ENUM_FIELD(jointype, JoinType); READ_ENUM_FIELD(jointype, JoinType);
......
...@@ -744,6 +744,11 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, ...@@ -744,6 +744,11 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
* pseudoconstant clauses; better to have the gating node above the * pseudoconstant clauses; better to have the gating node above the
* subquery. * subquery.
* *
* Also, if the sub-query has "security_barrier" flag, it means the
* sub-query originated from a view that must enforce row-level security.
* We must not push down quals in order to avoid information leaks, either
* via side-effects or error output.
*
* Non-pushed-down clauses will get evaluated as qpquals of the * Non-pushed-down clauses will get evaluated as qpquals of the
* SubqueryScan node. * SubqueryScan node.
* *
...@@ -762,7 +767,16 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, ...@@ -762,7 +767,16 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
RestrictInfo *rinfo = (RestrictInfo *) lfirst(l); RestrictInfo *rinfo = (RestrictInfo *) lfirst(l);
Node *clause = (Node *) rinfo->clause; Node *clause = (Node *) rinfo->clause;
/*
* XXX. You might wonder why we're testing rte->security_barrier
* qual-by-qual here rather than hoisting the test up into the
* surrounding if statement; after all, the answer will be the
* same for all quals. The answer is that we expect to shortly
* change this logic to allow pushing down some quals that use only
* "leakproof" operators even through a security barrier.
*/
if (!rinfo->pseudoconstant && if (!rinfo->pseudoconstant &&
!rte->security_barrier &&
qual_is_pushdown_safe(subquery, rti, clause, differentTypes)) qual_is_pushdown_safe(subquery, rti, clause, differentTypes))
{ {
/* Push it down */ /* Push it down */
......
...@@ -543,6 +543,7 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode, ...@@ -543,6 +543,7 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode,
*/ */
if (rte->rtekind == RTE_SUBQUERY && if (rte->rtekind == RTE_SUBQUERY &&
is_simple_subquery(rte->subquery) && is_simple_subquery(rte->subquery) &&
!rte->security_barrier &&
(containing_appendrel == NULL || (containing_appendrel == NULL ||
is_safe_append_member(rte->subquery))) is_safe_append_member(rte->subquery)))
return pull_up_simple_subquery(root, jtnode, rte, return pull_up_simple_subquery(root, jtnode, rte,
...@@ -712,6 +713,7 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte, ...@@ -712,6 +713,7 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte,
* pull_up_subqueries. * pull_up_subqueries.
*/ */
if (is_simple_subquery(subquery) && if (is_simple_subquery(subquery) &&
!rte->security_barrier &&
(containing_appendrel == NULL || is_safe_append_member(subquery))) (containing_appendrel == NULL || is_safe_append_member(subquery)))
{ {
/* good to go */ /* good to go */
......
...@@ -7322,26 +7322,28 @@ transaction_mode_list_or_empty: ...@@ -7322,26 +7322,28 @@ transaction_mode_list_or_empty:
* *
*****************************************************************************/ *****************************************************************************/
ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
AS SelectStmt opt_check_option AS SelectStmt opt_check_option
{ {
ViewStmt *n = makeNode(ViewStmt); ViewStmt *n = makeNode(ViewStmt);
n->view = $4; n->view = $4;
n->view->relpersistence = $2; n->view->relpersistence = $2;
n->aliases = $5; n->aliases = $5;
n->query = $7; n->query = $8;
n->replace = false; n->replace = false;
n->options = $6;
$$ = (Node *) n; $$ = (Node *) n;
} }
| CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list | CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list opt_reloptions
AS SelectStmt opt_check_option AS SelectStmt opt_check_option
{ {
ViewStmt *n = makeNode(ViewStmt); ViewStmt *n = makeNode(ViewStmt);
n->view = $6; n->view = $6;
n->view->relpersistence = $4; n->view->relpersistence = $4;
n->aliases = $7; n->aliases = $7;
n->query = $9; n->query = $10;
n->replace = true; n->replace = true;
n->options = $8;
$$ = (Node *) n; $$ = (Node *) n;
} }
; ;
......
...@@ -1382,6 +1382,7 @@ ApplyRetrieveRule(Query *parsetree, ...@@ -1382,6 +1382,7 @@ ApplyRetrieveRule(Query *parsetree,
rte->rtekind = RTE_SUBQUERY; rte->rtekind = RTE_SUBQUERY;
rte->relid = InvalidOid; rte->relid = InvalidOid;
rte->security_barrier = RelationIsSecurityView(relation);
rte->subquery = rule_action; rte->subquery = rule_action;
rte->inh = false; /* must not be set for a subquery */ rte->inh = false; /* must not be set for a subquery */
......
...@@ -4372,6 +4372,19 @@ examine_simple_variable(PlannerInfo *root, Var *var, ...@@ -4372,6 +4372,19 @@ examine_simple_variable(PlannerInfo *root, Var *var,
subquery->distinctClause) subquery->distinctClause)
return; return;
/*
* If the sub-query originated from a view with the security_barrier
* attribute, we treat it as a black-box from outside of the view.
* This is probably a harsher restriction than necessary; it's
* certainly OK for the selectivity estimator (which is a C function,
* and therefore omnipotent anyway) to look at the statistics. But
* many selectivity estimators will happily *invoke the operator
* function* to try to work out a good estimate - and that's not OK.
* So for now, we do this.
*/
if (rte->security_barrier)
return;
/* /*
* OK, fetch RelOptInfo for subquery. Note that we don't change the * OK, fetch RelOptInfo for subquery. Note that we don't change the
* rel returned in vardata, since caller expects it to be a rel of the * rel returned in vardata, since caller expects it to be a rel of the
......
...@@ -374,6 +374,7 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple) ...@@ -374,6 +374,7 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
case RELKIND_RELATION: case RELKIND_RELATION:
case RELKIND_TOASTVALUE: case RELKIND_TOASTVALUE:
case RELKIND_INDEX: case RELKIND_INDEX:
case RELKIND_VIEW:
break; break;
default: default:
return; return;
......
...@@ -12397,8 +12397,10 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo) ...@@ -12397,8 +12397,10 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
if (binary_upgrade) if (binary_upgrade)
binary_upgrade_set_pg_class_oids(q, tbinfo->dobj.catId.oid, false); binary_upgrade_set_pg_class_oids(q, tbinfo->dobj.catId.oid, false);
appendPQExpBuffer(q, "CREATE VIEW %s AS\n %s\n", appendPQExpBuffer(q, "CREATE VIEW %s", fmtId(tbinfo->dobj.name));
fmtId(tbinfo->dobj.name), viewdef); if (tbinfo->reloptions && strlen(tbinfo->reloptions) > 0)
appendPQExpBuffer(q, " WITH (%s)", tbinfo->reloptions);
appendPQExpBuffer(q, " AS\n %s\n", viewdef);
appendPQExpBuffer(labelq, "VIEW %s", appendPQExpBuffer(labelq, "VIEW %s",
fmtId(tbinfo->dobj.name)); fmtId(tbinfo->dobj.name));
......
...@@ -43,8 +43,9 @@ typedef enum relopt_kind ...@@ -43,8 +43,9 @@ typedef enum relopt_kind
RELOPT_KIND_ATTRIBUTE = (1 << 6), RELOPT_KIND_ATTRIBUTE = (1 << 6),
RELOPT_KIND_TABLESPACE = (1 << 7), RELOPT_KIND_TABLESPACE = (1 << 7),
RELOPT_KIND_SPGIST = (1 << 8), RELOPT_KIND_SPGIST = (1 << 8),
RELOPT_KIND_VIEW = (1 << 9),
/* if you add a new kind, make sure you update "last_default" too */ /* if you add a new kind, make sure you update "last_default" too */
RELOPT_KIND_LAST_DEFAULT = RELOPT_KIND_SPGIST, RELOPT_KIND_LAST_DEFAULT = RELOPT_KIND_VIEW,
/* some compilers treat enums as signed ints, so we can't use 1 << 31 */ /* some compilers treat enums as signed ints, so we can't use 1 << 31 */
RELOPT_KIND_MAX = (1 << 30) RELOPT_KIND_MAX = (1 << 30)
} relopt_kind; } relopt_kind;
......
...@@ -706,6 +706,7 @@ typedef struct RangeTblEntry ...@@ -706,6 +706,7 @@ typedef struct RangeTblEntry
* Fields valid for a subquery RTE (else NULL): * Fields valid for a subquery RTE (else NULL):
*/ */
Query *subquery; /* the sub-query */ Query *subquery; /* the sub-query */
bool security_barrier; /* subquery from security_barrier view */
/* /*
* Fields valid for a join RTE (else NULL/zero): * Fields valid for a join RTE (else NULL/zero):
...@@ -1208,6 +1209,7 @@ typedef enum AlterTableType ...@@ -1208,6 +1209,7 @@ typedef enum AlterTableType
AT_SetTableSpace, /* SET TABLESPACE */ AT_SetTableSpace, /* SET TABLESPACE */
AT_SetRelOptions, /* SET (...) -- AM specific parameters */ AT_SetRelOptions, /* SET (...) -- AM specific parameters */
AT_ResetRelOptions, /* RESET (...) -- AM specific parameters */ AT_ResetRelOptions, /* RESET (...) -- AM specific parameters */
AT_ReplaceRelOptions, /* replace reloption list in its entirety */
AT_EnableTrig, /* ENABLE TRIGGER name */ AT_EnableTrig, /* ENABLE TRIGGER name */
AT_EnableAlwaysTrig, /* ENABLE ALWAYS TRIGGER name */ AT_EnableAlwaysTrig, /* ENABLE ALWAYS TRIGGER name */
AT_EnableReplicaTrig, /* ENABLE REPLICA TRIGGER name */ AT_EnableReplicaTrig, /* ENABLE REPLICA TRIGGER name */
...@@ -2277,6 +2279,7 @@ typedef struct ViewStmt ...@@ -2277,6 +2279,7 @@ typedef struct ViewStmt
List *aliases; /* target column names */ List *aliases; /* target column names */
Node *query; /* the SELECT query */ Node *query; /* the SELECT query */
bool replace; /* replace an existing view? */ bool replace; /* replace an existing view? */
List *options; /* options from WITH clause */
} ViewStmt; } ViewStmt;
/* ---------------------- /* ----------------------
......
...@@ -195,6 +195,7 @@ typedef struct StdRdOptions ...@@ -195,6 +195,7 @@ typedef struct StdRdOptions
int32 vl_len_; /* varlena header (do not touch directly!) */ int32 vl_len_; /* varlena header (do not touch directly!) */
int fillfactor; /* page fill factor in percent (0..100) */ int fillfactor; /* page fill factor in percent (0..100) */
AutoVacOpts autovacuum; /* autovacuum-related options */ AutoVacOpts autovacuum; /* autovacuum-related options */
bool security_barrier; /* for views */
} StdRdOptions; } StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10 #define HEAP_MIN_FILLFACTOR 10
...@@ -222,6 +223,14 @@ typedef struct StdRdOptions ...@@ -222,6 +223,14 @@ typedef struct StdRdOptions
#define RelationGetTargetPageFreeSpace(relation, defaultff) \ #define RelationGetTargetPageFreeSpace(relation, defaultff) \
(BLCKSZ * (100 - RelationGetFillFactor(relation, defaultff)) / 100) (BLCKSZ * (100 - RelationGetFillFactor(relation, defaultff)) / 100)
/*
* RelationIsSecurityView
* Returns whether the relation is security view, or not
*/
#define RelationIsSecurityView(relation) \
((relation)->rd_options ? \
((StdRdOptions *) (relation)->rd_options)->security_barrier : false)
/* /*
* RelationIsValid * RelationIsValid
* True iff relation descriptor is valid. * True iff relation descriptor is valid.
......
...@@ -239,6 +239,55 @@ And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%') ...@@ -239,6 +239,55 @@ And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%')
1 1
(1 row) (1 row)
--
-- CREATE VIEW and WITH(...) clause
--
CREATE VIEW mysecview1
AS SELECT * FROM tbl1 WHERE a = 0;
CREATE VIEW mysecview2 WITH (security_barrier=true)
AS SELECT * FROM tbl1 WHERE a > 0;
CREATE VIEW mysecview3 WITH (security_barrier=false)
AS SELECT * FROM tbl1 WHERE a < 0;
CREATE VIEW mysecview4 WITH (security_barrier)
AS SELECT * FROM tbl1 WHERE a <> 0;
CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error
AS SELECT * FROM tbl1 WHERE a > 100;
ERROR: invalid value for boolean option "security_barrier": 100
CREATE VIEW mysecview6 WITH (invalid_option) -- Error
AS SELECT * FROM tbl1 WHERE a < 100;
ERROR: unrecognized parameter "invalid_option"
SELECT relname, relkind, reloptions FROM pg_class
WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
'mysecview3'::regclass, 'mysecview4'::regclass)
ORDER BY relname;
relname | relkind | reloptions
------------+---------+--------------------------
mysecview1 | v |
mysecview2 | v | {security_barrier=true}
mysecview3 | v | {security_barrier=false}
mysecview4 | v | {security_barrier=true}
(4 rows)
CREATE OR REPLACE VIEW mysecview1
AS SELECT * FROM tbl1 WHERE a = 256;
CREATE OR REPLACE VIEW mysecview2
AS SELECT * FROM tbl1 WHERE a > 256;
CREATE OR REPLACE VIEW mysecview3 WITH (security_barrier=true)
AS SELECT * FROM tbl1 WHERE a < 256;
CREATE OR REPLACE VIEW mysecview4 WITH (security_barrier=false)
AS SELECT * FROM tbl1 WHERE a <> 256;
SELECT relname, relkind, reloptions FROM pg_class
WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
'mysecview3'::regclass, 'mysecview4'::regclass)
ORDER BY relname;
relname | relkind | reloptions
------------+---------+--------------------------
mysecview1 | v |
mysecview2 | v |
mysecview3 | v | {security_barrier=true}
mysecview4 | v | {security_barrier=false}
(4 rows)
DROP SCHEMA temp_view_test CASCADE; DROP SCHEMA temp_view_test CASCADE;
NOTICE: drop cascades to 22 other objects NOTICE: drop cascades to 22 other objects
DETAIL: drop cascades to table temp_view_test.base_table DETAIL: drop cascades to table temp_view_test.base_table
...@@ -264,7 +313,7 @@ drop cascades to view temp_view_test.v8 ...@@ -264,7 +313,7 @@ drop cascades to view temp_view_test.v8
drop cascades to sequence temp_view_test.seq1 drop cascades to sequence temp_view_test.seq1
drop cascades to view temp_view_test.v9 drop cascades to view temp_view_test.v9
DROP SCHEMA testviewschm2 CASCADE; DROP SCHEMA testviewschm2 CASCADE;
NOTICE: drop cascades to 16 other objects NOTICE: drop cascades to 20 other objects
DETAIL: drop cascades to table t1 DETAIL: drop cascades to table t1
drop cascades to view temporal1 drop cascades to view temporal1
drop cascades to view temporal2 drop cascades to view temporal2
...@@ -281,4 +330,8 @@ drop cascades to table tbl3 ...@@ -281,4 +330,8 @@ drop cascades to table tbl3
drop cascades to table tbl4 drop cascades to table tbl4
drop cascades to view mytempview drop cascades to view mytempview
drop cascades to view pubview drop cascades to view pubview
drop cascades to view mysecview1
drop cascades to view mysecview2
drop cascades to view mysecview3
drop cascades to view mysecview4
SET search_path to public; SET search_path to public;
...@@ -191,6 +191,39 @@ AND NOT EXISTS (SELECT g FROM tbl4 LEFT JOIN tmptbl ON tbl4.h = tmptbl.j); ...@@ -191,6 +191,39 @@ AND NOT EXISTS (SELECT g FROM tbl4 LEFT JOIN tmptbl ON tbl4.h = tmptbl.j);
SELECT count(*) FROM pg_class where relname LIKE 'mytempview' SELECT count(*) FROM pg_class where relname LIKE 'mytempview'
And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%'); And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%');
--
-- CREATE VIEW and WITH(...) clause
--
CREATE VIEW mysecview1
AS SELECT * FROM tbl1 WHERE a = 0;
CREATE VIEW mysecview2 WITH (security_barrier=true)
AS SELECT * FROM tbl1 WHERE a > 0;
CREATE VIEW mysecview3 WITH (security_barrier=false)
AS SELECT * FROM tbl1 WHERE a < 0;
CREATE VIEW mysecview4 WITH (security_barrier)
AS SELECT * FROM tbl1 WHERE a <> 0;
CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error
AS SELECT * FROM tbl1 WHERE a > 100;
CREATE VIEW mysecview6 WITH (invalid_option) -- Error
AS SELECT * FROM tbl1 WHERE a < 100;
SELECT relname, relkind, reloptions FROM pg_class
WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
'mysecview3'::regclass, 'mysecview4'::regclass)
ORDER BY relname;
CREATE OR REPLACE VIEW mysecview1
AS SELECT * FROM tbl1 WHERE a = 256;
CREATE OR REPLACE VIEW mysecview2
AS SELECT * FROM tbl1 WHERE a > 256;
CREATE OR REPLACE VIEW mysecview3 WITH (security_barrier=true)
AS SELECT * FROM tbl1 WHERE a < 256;
CREATE OR REPLACE VIEW mysecview4 WITH (security_barrier=false)
AS SELECT * FROM tbl1 WHERE a <> 256;
SELECT relname, relkind, reloptions FROM pg_class
WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
'mysecview3'::regclass, 'mysecview4'::regclass)
ORDER BY relname;
DROP SCHEMA temp_view_test CASCADE; DROP SCHEMA temp_view_test CASCADE;
DROP SCHEMA testviewschm2 CASCADE; DROP SCHEMA testviewschm2 CASCADE;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment