Commit c09b18f2 authored by Alvaro Herrera's avatar Alvaro Herrera

Support \crosstabview in psql

\crosstabview is a completely different way to display results from a
query: instead of a vertical display of rows, the data values are placed
in a grid where the column and row headers come from the data itself,
similar to a spreadsheet.

The sort order of the horizontal header can be specified by using
another column in the query, and the vertical header determines its
ordering from the order in which they appear in the query.

This only allows displaying a single value in each cell.  If more than
one value correspond to the same cell, an error is thrown.  Merging of
values can be done in the query itself, if necessary.  This may be
revisited in the future.

Author: Daniel Verité
Reviewed-by: Pavel Stehule, Dean Rasheed
parent 279d86af
......@@ -989,6 +989,106 @@ testdb=>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>\crosstabview [
<replaceable class="parameter">colV</replaceable>
<replaceable class="parameter">colH</replaceable>
[:<replaceable class="parameter">scolH</replaceable>]
[<replaceable class="parameter">colD</replaceable>]
] </literal></term>
<listitem>
<para>
Execute the current query buffer (like <literal>\g</literal>) and shows
the results inside a crosstab grid.
The query must return at least three columns.
The output column <replaceable class="parameter">colV</replaceable>
becomes a vertical header
and the output column <replaceable class="parameter">colH</replaceable>
becomes a horizontal header, optionally sorted by ranking data obtained
from <replaceable class="parameter">scolH</replaceable>.
<replaceable class="parameter">colD</replaceable>
is the output column to project into the grid. If this is not
specified and there are exactly three columns in the result set,
the column that isn't
<replaceable class="parameter">colV</replaceable> nor
<replaceable class="parameter">colH</replaceable>
is displayed; if there are more columns, an error is thrown.
</para>
<para>
All columns can be refered to by their position (starting at 1), or by
their name. Normal case folding and quoting rules apply on column
names. By default,
<replaceable class="parameter">colV</replaceable> corresponds to column 1
and <replaceable class="parameter">colH</replaceable> to column 2.
A query having only one output column cannot be viewed in crosstab, and
<replaceable class="parameter">colH</replaceable> must differ from
<replaceable class="parameter">colV</replaceable>.
</para>
<para>
The vertical header, displayed as the leftmost column,
contains the deduplicated values found in
column <replaceable class="parameter">colV</replaceable>, in the same
order as in the query results.
</para>
<para>
The horizontal header, displayed as the first row,
contains the deduplicated values found in
column <replaceable class="parameter">colH</replaceable>, in
the order of appearance in the query results.
If specified, the optional <replaceable class="parameter">scolH</replaceable>
argument refers to a column whose values should be integer numbers
by which <replaceable class="parameter">colH</replaceable> will be sorted
to be positioned in the horizontal header.
</para>
<para>
Inside the crosstab grid,
given a query output with <literal>N</literal> columns
(including <replaceable class="parameter">colV</replaceable> and
<replaceable class="parameter">colH</replaceable>),
for each distinct value <literal>x</literal> of
<replaceable class="parameter">colH</replaceable>
and each distinct value <literal>y</literal> of
<replaceable class="parameter">colV</replaceable>,
the contents of a cell located at the intersection
<literal>(x,y)</literal> is determined by these rules:
<itemizedlist>
<listitem>
<para>
if there is no corresponding row in the query results such that the
value for <replaceable class="parameter">colH</replaceable>
is <literal>x</literal> and the value
for <replaceable class="parameter">colV</replaceable>
is <literal>y</literal>, the cell is empty.
</para>
</listitem>
<listitem>
<para>
if there is exactly one row such that the value
for <replaceable class="parameter">colH</replaceable>
is <literal>x</literal> and the value
for <replaceable class="parameter">colV</replaceable>
is <literal>y</literal>, then the <literal>colD</literal> column
is displayed.
</para>
</listitem>
<listitem>
<para>
if there are several such rows, an error is thrown.
</para>
</listitem>
</itemizedlist>
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>\d[S+] [ <link linkend="APP-PSQL-patterns"><replaceable class="parameter">pattern</replaceable></link> ]</literal></term>
......@@ -4070,6 +4170,47 @@ first | 4
second | four
</programlisting></para>
<para>
When suitable, query results can be shown in a crosstab representation
with the \crosstabview command:
<programlisting>
testdb=&gt; <userinput>SELECT first, second, first &gt; 2 AS gt2 FROM my_table;</userinput>
first | second | ge2
-------+--------+-----
1 | one | f
2 | two | f
3 | three | t
4 | four | t
(4 rows)
testdb=&gt; <userinput>\crosstabview first second</userinput>
first | one | two | three | four
-------+-----+-----+-------+------
1 | f | | |
2 | | f | |
3 | | | t |
4 | | | | t
(4 rows)
</programlisting>
This second example shows a multiplication table with rows sorted in reverse
numerical order and columns with an independant, ascending numerical order.
<programlisting>
testdb=&gt; <userinput>SELECT t1.first as "A", t2.first+100 AS "B", t1.first*(t2.first+100) as "AxB",</userinput>
testdb(&gt; <userinput>row_number() over(order by t2.first) AS ord</userinput>
testdb(&gt; <userinput>FROM my_table t1 CROSS JOIN my_table t2 ORDER BY 1 DESC</userinput>
testdb(&gt; <userinput>\crosstabview A B:ord AxB</userinput>
A | 101 | 102 | 103 | 104
---+-----+-----+-----+-----
4 | 404 | 408 | 412 | 416
3 | 303 | 306 | 309 | 312
2 | 202 | 204 | 206 | 208
1 | 101 | 102 | 103 | 104
(4 rows)
</programlisting>
</para>
</refsect1>
</refentry>
......@@ -23,7 +23,7 @@ LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
OBJS= command.o common.o help.o input.o stringutils.o mainloop.o copy.o \
startup.o prompt.o variables.o large_obj.o describe.o \
tab-complete.o \
crosstabview.o tab-complete.o \
sql_help.o psqlscanslash.o \
$(WIN32RES)
......
......@@ -39,6 +39,7 @@
#include "common.h"
#include "copy.h"
#include "crosstabview.h"
#include "describe.h"
#include "help.h"
#include "input.h"
......@@ -364,6 +365,20 @@ exec_command(const char *cmd,
else if (strcmp(cmd, "copyright") == 0)
print_copyright();
/* \crosstabview -- execute a query and display results in crosstab */
else if (strcmp(cmd, "crosstabview") == 0)
{
pset.ctv_col_V = psql_scan_slash_option(scan_state,
OT_NORMAL, NULL, false);
pset.ctv_col_H = psql_scan_slash_option(scan_state,
OT_NORMAL, NULL, false);
pset.ctv_col_D = psql_scan_slash_option(scan_state,
OT_NORMAL, NULL, false);
pset.crosstab_flag = true;
status = PSQL_CMD_SEND;
}
/* \d* commands */
else if (cmd[0] == 'd')
{
......
......@@ -23,6 +23,7 @@
#include "settings.h"
#include "command.h"
#include "copy.h"
#include "crosstabview.h"
#include "fe_utils/mbprint.h"
......@@ -1064,6 +1065,8 @@ PrintQueryResults(PGresult *results)
success = StoreQueryTuple(results);
else if (pset.gexec_flag)
success = ExecQueryTuples(results);
else if (pset.crosstab_flag)
success = PrintResultsInCrosstab(results);
else
success = PrintQueryTuples(results);
/* if it's INSERT/UPDATE/DELETE RETURNING, also print status */
......@@ -1213,7 +1216,8 @@ SendQuery(const char *query)
}
}
if (pset.fetch_count <= 0 || pset.gexec_flag || !is_select_command(query))
if (pset.fetch_count <= 0 || pset.gexec_flag ||
pset.crosstab_flag || !is_select_command(query))
{
/* Default fetch-it-all-and-print mode */
instr_time before,
......@@ -1356,6 +1360,24 @@ sendquery_cleanup:
/* reset \gexec trigger */
pset.gexec_flag = false;
/* reset \crosstabview trigger */
pset.crosstab_flag = false;
if (pset.ctv_col_V)
{
free(pset.ctv_col_V);
pset.ctv_col_V = NULL;
}
if (pset.ctv_col_H)
{
free(pset.ctv_col_H);
pset.ctv_col_H = NULL;
}
if (pset.ctv_col_D)
{
free(pset.ctv_col_D);
pset.ctv_col_D = NULL;
}
return OK;
}
......@@ -1501,7 +1523,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec)
break;
}
/* Note we do not deal with \gexec mode here */
/* Note we do not deal with \gexec or \crosstabview modes here */
ntuples = PQntuples(results);
......
/*
* psql - the PostgreSQL interactive terminal
*
* Copyright (c) 2000-2016, PostgreSQL Global Development Group
*
* src/bin/psql/crosstabview.c
*/
#include "postgres_fe.h"
#include <string.h>
#include "common.h"
#include "crosstabview.h"
#include "pqexpbuffer.h"
#include "settings.h"
/*
* Value/position from the resultset that goes into the horizontal or vertical
* crosstabview header.
*/
typedef struct _pivot_field
{
/*
* Pointer obtained from PQgetvalue() for colV or colH. Each distinct
* value becomes an entry in the vertical header (colV), or horizontal
* header (colH). A Null value is represented by a NULL pointer.
*/
char *name;
/*
* When a sort is requested on an alternative column, this holds
* PQgetvalue() for the sort column corresponding to <name>. If <name>
* appear multiple times, it's the first value in the order of the results
* that is kept. A Null value is represented by a NULL pointer.
*/
char *sort_value;
/*
* Rank of this value, starting at 0. Initially, it's the relative
* position of the first appearance of <name> in the resultset. For
* example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
* When a sort column is specified, ranks get updated in a final pass to
* reflect the desired order.
*/
int rank;
} pivot_field;
/* Node in avl_tree */
typedef struct _avl_node
{
/* Node contents */
pivot_field field;
/*
* Height of this node in the tree (number of nodes on the longest path to
* a leaf).
*/
int height;
/*
* Child nodes. [0] points to left subtree, [1] to right subtree. Never
* NULL, points to the empty node avl_tree.end when no left or right
* value.
*/
struct _avl_node *children[2];
} avl_node;
/*
* Control structure for the AVL tree (binary search tree kept
* balanced with the AVL algorithm)
*/
typedef struct _avl_tree
{
int count; /* Total number of nodes */
avl_node *root; /* root of the tree */
avl_node *end; /* Immutable dereferenceable empty tree */
} avl_tree;
static bool printCrosstab(const PGresult *results,
int num_columns, pivot_field *piv_columns, int field_for_columns,
int num_rows, pivot_field *piv_rows, int field_for_rows,
int field_for_data);
static int parseColumnRefs(char *arg, PGresult *res, int **col_numbers,
int max_columns, char separator);
static void avlInit(avl_tree *tree);
static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
static int avlCollectFields(avl_tree *tree, avl_node *node,
pivot_field *fields, int idx);
static void avlFree(avl_tree *tree, avl_node *node);
static void rankSort(int num_columns, pivot_field *piv_columns);
static int indexOfColumn(const char *arg, PGresult *res);
static int pivotFieldCompare(const void *a, const void *b);
static int rankCompare(const void *a, const void *b);
/*
* Main entry point to this module.
*
* Process the data from *res according the display options in pset (global),
* to generate the horizontal and vertical headers contents,
* then call printCrosstab() for the actual output.
*/
bool
PrintResultsInCrosstab(PGresult *res)
{
char *opt_field_for_rows = pset.ctv_col_V;
char *opt_field_for_columns = pset.ctv_col_H;
char *opt_field_for_data = pset.ctv_col_D;
int rn;
avl_tree piv_columns;
avl_tree piv_rows;
pivot_field *array_columns = NULL;
pivot_field *array_rows = NULL;
int num_columns = 0;
int num_rows = 0;
int *colsV = NULL,
*colsH = NULL,
*colsD = NULL;
int n;
int field_for_columns;
int sort_field_for_columns = -1;
int field_for_rows;
int field_for_data = -1;
bool retval = false;
avlInit(&piv_rows);
avlInit(&piv_columns);
if (res == NULL)
{
psql_error(_("No result\n"));
goto error_return;
}
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
psql_error(_("The query must return results to be shown in crosstab\n"));
goto error_return;
}
if (opt_field_for_rows && !opt_field_for_columns)
{
psql_error(_("A second column must be specified for the horizontal header\n"));
goto error_return;
}
if (PQnfields(res) <= 2)
{
psql_error(_("The query must return at least two columns to be shown in crosstab\n"));
goto error_return;
}
/*
* Arguments processing for the vertical header (1st arg) displayed in the
* left-most column. Only a reference to a field is accepted (no sort
* column).
*/
if (opt_field_for_rows == NULL)
{
field_for_rows = 0;
}
else
{
n = parseColumnRefs(opt_field_for_rows, res, &colsV, 1, ':');
if (n != 1)
goto error_return;
field_for_rows = colsV[0];
}
if (field_for_rows < 0)
goto error_return;
/*----------
* Arguments processing for the horizontal header (2nd arg)
* (pivoted column that gets displayed as the first row).
* Determine:
* - the field number for the horizontal header column
* - the field number of the associated sort column, if any
*/
if (opt_field_for_columns == NULL)
field_for_columns = 1;
else
{
n = parseColumnRefs(opt_field_for_columns, res, &colsH, 2, ':');
if (n <= 0)
goto error_return;
if (n == 1)
field_for_columns = colsH[0];
else
{
field_for_columns = colsH[0];
sort_field_for_columns = colsH[1];
}
if (field_for_columns < 0)
goto error_return;
}
if (field_for_columns == field_for_rows)
{
psql_error(_("The same column cannot be used for both vertical and horizontal headers\n"));
goto error_return;
}
/*
* Arguments processing for the data columns (3rd arg). Determine the
* column to display in the grid.
*/
if (opt_field_for_data == NULL)
{
int i;
/*
* If the data column was not specified, we search for the one not
* used as either vertical or horizontal headers. If the result has
* more than three columns, raise an error.
*/
if (PQnfields(res) > 3)
{
psql_error(_("Data column must be specified when the result set has more than three columns\n"));
goto error_return;
}
for (i = 0; i < PQnfields(res); i++)
{
if (i != field_for_rows && i != field_for_columns)
{
field_for_data = i;
break;
}
}
Assert(field_for_data >= 0);
}
else
{
int num_fields;
/* If a field was given, find out what it is. Only one is allowed. */
num_fields = parseColumnRefs(opt_field_for_data, res, &colsD, 1, ',');
if (num_fields < 1)
goto error_return;
field_for_data = colsD[0];
}
/*
* First part: accumulate the names that go into the vertical and
* horizontal headers, each into an AVL binary tree to build the set of
* DISTINCT values.
*/
for (rn = 0; rn < PQntuples(res); rn++)
{
char *val;
char *val1;
/* horizontal */
val = PQgetisnull(res, rn, field_for_columns) ? NULL :
PQgetvalue(res, rn, field_for_columns);
val1 = NULL;
if (sort_field_for_columns >= 0 &&
!PQgetisnull(res, rn, sort_field_for_columns))
val1 = PQgetvalue(res, rn, sort_field_for_columns);
avlMergeValue(&piv_columns, val, val1);
if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
{
psql_error(_("Maximum number of columns (%d) exceeded\n"),
CROSSTABVIEW_MAX_COLUMNS);
goto error_return;
}
/* vertical */
val = PQgetisnull(res, rn, field_for_rows) ? NULL :
PQgetvalue(res, rn, field_for_rows);
avlMergeValue(&piv_rows, val, NULL);
}
/*
* Second part: Generate sorted arrays from the AVL trees.
*/
num_columns = piv_columns.count;
num_rows = piv_rows.count;
array_columns = (pivot_field *)
pg_malloc(sizeof(pivot_field) * num_columns);
array_rows = (pivot_field *)
pg_malloc(sizeof(pivot_field) * num_rows);
avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0);
avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0);
/*
* Third part: optionally, process the ranking data for the horizontal
* header
*/
if (sort_field_for_columns >= 0)
rankSort(num_columns, array_columns);
/*
* Fourth part: print the crosstab'ed results.
*/
retval = printCrosstab(res,
num_columns, array_columns, field_for_columns,
num_rows, array_rows, field_for_rows,
field_for_data);
error_return:
avlFree(&piv_columns, piv_columns.root);
avlFree(&piv_rows, piv_rows.root);
pg_free(array_columns);
pg_free(array_rows);
pg_free(colsV);
pg_free(colsH);
pg_free(colsD);
return retval;
}
/*
* Output the pivoted resultset with the printTable* functions. Return true
* if successful, false otherwise.
*/
static bool
printCrosstab(const PGresult *results,
int num_columns, pivot_field *piv_columns, int field_for_columns,
int num_rows, pivot_field *piv_rows, int field_for_rows,
int field_for_data)
{
printQueryOpt popt = pset.popt;
printTableContent cont;
int i,
rn;
char col_align;
int *horiz_map;
bool retval = false;
printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
/* Step 1: set target column names (horizontal header) */
/* The name of the first column is kept unchanged by the pivoting */
printTableAddHeader(&cont,
PQfname(results, field_for_rows),
false,
column_type_alignment(PQftype(results,
field_for_rows)));
/*
* To iterate over piv_columns[] by piv_columns[].rank, create a reverse
* map associating each piv_columns[].rank to its index in piv_columns.
* This avoids an O(N^2) loop later.
*/
horiz_map = (int *) pg_malloc(sizeof(int) * num_columns);
for (i = 0; i < num_columns; i++)
horiz_map[piv_columns[i].rank] = i;
/*
* The display alignment depends on its PQftype().
*/
col_align = column_type_alignment(PQftype(results, field_for_data));
for (i = 0; i < num_columns; i++)
{
char *colname;
colname = piv_columns[horiz_map[i]].name ?
piv_columns[horiz_map[i]].name :
(popt.nullPrint ? popt.nullPrint : "");
printTableAddHeader(&cont, colname, false, col_align);
}
pg_free(horiz_map);
/* Step 2: set row names in the first output column (vertical header) */
for (i = 0; i < num_rows; i++)
{
int k = piv_rows[i].rank;
cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
piv_rows[i].name :
(popt.nullPrint ? popt.nullPrint : "");
}
cont.cellsadded = num_rows * (num_columns + 1);
/*
* Step 3: fill in the content cells.
*/
for (rn = 0; rn < PQntuples(results); rn++)
{
int row_number;
int col_number;
pivot_field *p;
pivot_field elt;
/* Find target row */
if (!PQgetisnull(results, rn, field_for_rows))
elt.name = PQgetvalue(results, rn, field_for_rows);
else
elt.name = NULL;
p = (pivot_field *) bsearch(&elt,
piv_rows,
num_rows,
sizeof(pivot_field),
pivotFieldCompare);
Assert(p != NULL);
row_number = p->rank;
/* Find target column */
if (!PQgetisnull(results, rn, field_for_columns))
elt.name = PQgetvalue(results, rn, field_for_columns);
else
elt.name = NULL;
p = (pivot_field *) bsearch(&elt,
piv_columns,
num_columns,
sizeof(pivot_field),
pivotFieldCompare);
Assert(p != NULL);
col_number = p->rank;
/* Place value into cell */
if (col_number >= 0 && row_number >= 0)
{
int idx;
/* index into the cont.cells array */
idx = 1 + col_number + row_number * (num_columns + 1);
/*
* If the cell already contains a value, raise an error.
*/
if (cont.cells[idx] != NULL)
{
psql_error(_("data cell already contains a value: (row: \"%s\", column: \"%s\")\n"),
piv_rows[row_number].name ? piv_rows[row_number].name :
popt.nullPrint ? popt.nullPrint : "(null)",
piv_columns[col_number].name ? piv_columns[col_number].name :
popt.nullPrint ? popt.nullPrint : "(null)");
goto error;
}
cont.cells[idx] = !PQgetisnull(results, rn, field_for_data) ?
PQgetvalue(results, rn, field_for_data) :
(popt.nullPrint ? popt.nullPrint : "");
}
}
/*
* The non-initialized cells must be set to an empty string for the print
* functions
*/
for (i = 0; i < cont.cellsadded; i++)
{
if (cont.cells[i] == NULL)
cont.cells[i] = "";
}
printTable(&cont, pset.queryFout, false, pset.logfile);
retval = true;
error:
printTableCleanup(&cont);
return retval;
}
/*
* Parse col1[<sep>col2][<sep>col3]...
* where colN can be:
* - a number from 1 to PQnfields(res)
* - an unquoted column name matching (case insensitively) one of PQfname(res,...)
* - a quoted column name matching (case sensitively) one of PQfname(res,...)
* max_columns: 0 if no maximum
*/
static int
parseColumnRefs(char *arg,
PGresult *res,
int **col_numbers,
int max_columns,
char separator)
{
char *p = arg;
char c;
int col_num = -1;
int nb_cols = 0;
char *field_start = NULL;
*col_numbers = NULL;
while ((c = *p) != '\0')
{
bool quoted_field = false;
field_start = p;
/* first char */
if (c == '"')
{
quoted_field = true;
p++;
}
while ((c = *p) != '\0')
{
if (c == separator && !quoted_field)
break;
if (c == '"') /* end of field or embedded double quote */
{
p++;
if (*p == '"')
{
if (quoted_field)
{
p++;
continue;
}
}
else if (quoted_field && *p == separator)
break;
}
if (*p)
p += PQmblen(p, pset.encoding);
}
if (p != field_start)
{
/* look up the column and add its index into *col_numbers */
if (max_columns != 0 && nb_cols == max_columns)
{
psql_error(_("No more than %d column references expected\n"), max_columns);
goto errfail;
}
c = *p;
*p = '\0';
col_num = indexOfColumn(field_start, res);
if (col_num < 0)
goto errfail;
*p = c;
*col_numbers = (int *) pg_realloc(*col_numbers, (1 + nb_cols) * sizeof(int));
(*col_numbers)[nb_cols++] = col_num;
}
else
{
psql_error(_("Empty column reference\n"));
goto errfail;
}
if (*p)
p += PQmblen(p, pset.encoding);
}
return nb_cols;
errfail:
pg_free(*col_numbers);
*col_numbers = NULL;
return -1;
}
/*
* The avl* functions below provide a minimalistic implementation of AVL binary
* trees, to efficiently collect the distinct values that will form the horizontal
* and vertical headers. It only supports adding new values, no removal or even
* search.
*/
static void
avlInit(avl_tree *tree)
{
tree->end = (avl_node *) pg_malloc0(sizeof(avl_node));
tree->end->children[0] = tree->end->children[1] = tree->end;
tree->count = 0;
tree->root = tree->end;
}
/* Deallocate recursively an AVL tree, starting from node */
static void
avlFree(avl_tree *tree, avl_node *node)
{
if (node->children[0] != tree->end)
{
avlFree(tree, node->children[0]);
pg_free(node->children[0]);
}
if (node->children[1] != tree->end)
{
avlFree(tree, node->children[1]);
pg_free(node->children[1]);
}
if (node == tree->root)
{
/* free the root separately as it's not child of anything */
if (node != tree->end)
pg_free(node);
/* free the tree->end struct only once and when all else is freed */
pg_free(tree->end);
}
}
/* Set the height to 1 plus the greatest of left and right heights */
static void
avlUpdateHeight(avl_node *n)
{
n->height = 1 + (n->children[0]->height > n->children[1]->height ?
n->children[0]->height :
n->children[1]->height);
}
/* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
static avl_node *
avlRotate(avl_node **current, int dir)
{
avl_node *before = *current;
avl_node *after = (*current)->children[dir];
*current = after;
before->children[dir] = after->children[!dir];
avlUpdateHeight(before);
after->children[!dir] = before;
return after;
}
static int
avlBalance(avl_node *n)
{
return n->children[0]->height - n->children[1]->height;
}
/*
* After an insertion, possibly rebalance the tree so that the left and right
* node heights don't differ by more than 1.
* May update *node.
*/
static void
avlAdjustBalance(avl_tree *tree, avl_node **node)
{
avl_node *current = *node;
int b = avlBalance(current) / 2;
if (b != 0)
{
int dir = (1 - b) / 2;
if (avlBalance(current->children[dir]) == -b)
avlRotate(&current->children[dir], !dir);
current = avlRotate(node, dir);
}
if (current != tree->end)
avlUpdateHeight(current);
}
/*
* Insert a new value/field, starting from *node, reaching the correct position
* in the tree by recursion. Possibly rebalance the tree and possibly update
* *node. Do nothing if the value is already present in the tree.
*/
static void
avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
{
avl_node *current = *node;
if (current == tree->end)
{
avl_node *new_node = (avl_node *)
pg_malloc(sizeof(avl_node));
new_node->height = 1;
new_node->field = field;
new_node->children[0] = new_node->children[1] = tree->end;
tree->count++;
*node = new_node;
}
else
{
int cmp = pivotFieldCompare(&field, &current->field);
if (cmp != 0)
{
avlInsertNode(tree,
cmp > 0 ? &current->children[1] : &current->children[0],
field);
avlAdjustBalance(tree, node);
}
}
}
/* Insert the value into the AVL tree, if it does not preexist */
static void
avlMergeValue(avl_tree *tree, char *name, char *sort_value)
{
pivot_field field;
field.name = name;
field.rank = tree->count;
field.sort_value = sort_value;
avlInsertNode(tree, &tree->root, field);
}
/*
* Recursively extract node values into the names array, in sorted order with a
* left-to-right tree traversal.
* Return the next candidate offset to write into the names array.
* fields[] must be preallocated to hold tree->count entries
*/
static int
avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
{
if (node == tree->end)
return idx;
idx = avlCollectFields(tree, node->children[0], fields, idx);
fields[idx] = node->field;
return avlCollectFields(tree, node->children[1], fields, idx + 1);
}
static void
rankSort(int num_columns, pivot_field *piv_columns)
{
int *hmap; /* [[offset in piv_columns, rank], ...for
* every header entry] */
int i;
hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2);
for (i = 0; i < num_columns; i++)
{
char *val = piv_columns[i].sort_value;
/* ranking information is valid if non null and matches /^-?\d+$/ */
if (val &&
((*val == '-' &&
strspn(val + 1, "0123456789") == strlen(val + 1)) ||
strspn(val, "0123456789") == strlen(val)))
{
hmap[i * 2] = atoi(val);
hmap[i * 2 + 1] = i;
}
else
{
/* invalid rank information ignored (equivalent to rank 0) */
hmap[i * 2] = 0;
hmap[i * 2 + 1] = i;
}
}
qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
for (i = 0; i < num_columns; i++)
{
piv_columns[hmap[i * 2 + 1]].rank = i;
}
pg_free(hmap);
}
/*
* Compare a user-supplied argument against a field name obtained by PQfname(),
* which is already case-folded.
* If arg is not enclosed in double quotes, pg_strcasecmp applies, otherwise
* do a case-sensitive comparison with these rules:
* - double quotes enclosing 'arg' are filtered out
* - double quotes inside 'arg' are expected to be doubled
*/
static bool
fieldNameEquals(const char *arg, const char *fieldname)
{
const char *p = arg;
const char *f = fieldname;
char c;
if (*p++ != '"')
return !pg_strcasecmp(arg, fieldname);
while ((c = *p++))
{
if (c == '"')
{
if (*p == '"')
p++; /* skip second quote and continue */
else if (*p == '\0')
return (*f == '\0'); /* p is shorter than f, or is
* identical */
}
if (*f == '\0')
return false; /* f is shorter than p */
if (c != *f) /* found one byte that differs */
return false;
f++;
}
return (*f == '\0');
}
/*
* arg can be a number or a column name, possibly quoted (like in an ORDER BY clause)
* Returns:
* on success, the 0-based index of the column
* or -1 if the column number or name is not found in the result's structure,
* or if it's ambiguous (arg corresponding to several columns)
*/
static int
indexOfColumn(const char *arg, PGresult *res)
{
int idx;
if (strspn(arg, "0123456789") == strlen(arg))
{
/* if arg contains only digits, it's a column number */
idx = atoi(arg) - 1;
if (idx < 0 || idx >= PQnfields(res))
{
psql_error(_("Invalid column number: %s\n"), arg);
return -1;
}
}
else
{
int i;
idx = -1;
for (i = 0; i < PQnfields(res); i++)
{
if (fieldNameEquals(arg, PQfname(res, i)))
{
if (idx >= 0)
{
/* if another idx was already found for the same name */
psql_error(_("Ambiguous column name: %s\n"), arg);
return -1;
}
idx = i;
}
}
if (idx == -1)
{
psql_error(_("Invalid column name: %s\n"), arg);
return -1;
}
}
return idx;
}
/*
* Value comparator for vertical and horizontal headers
* used for deduplication only.
* - null values are considered equal
* - non-null < null
* - non-null values are compared with strcmp()
*/
static int
pivotFieldCompare(const void *a, const void *b)
{
pivot_field *pa = (pivot_field *) a;
pivot_field *pb = (pivot_field *) b;
/* test null values */
if (!pb->name)
return pa->name ? -1 : 0;
else if (!pa->name)
return 1;
/* non-null values */
return strcmp(((pivot_field *) a)->name,
((pivot_field *) b)->name);
}
static int
rankCompare(const void *a, const void *b)
{
return *((int *) a) - *((int *) b);
}
/*
* psql - the PostgreSQL interactive terminal
*
* Copyright (c) 2000-2016, PostgreSQL Global Development Group
*
* src/bin/psql/crosstabview.h
*/
#ifndef CROSSTABVIEW_H
#define CROSSTABVIEW_H
/*
* Limit the number of output columns generated in memory by the crosstabview
* algorithm. A new output column is added for each distinct value found in the
* column that pivots (to form the horizontal header).
* The purpose of this limit is to fail early instead of over-allocating or spending
* too much time if the crosstab to generate happens to be unreasonably large
* (worst case: a NxN cartesian product with N=number of tuples).
* The value of 1600 corresponds to the maximum columns per table in storage,
* but it could be as much as INT_MAX theorically.
*/
#define CROSSTABVIEW_MAX_COLUMNS 1600
/* prototypes */
extern bool PrintResultsInCrosstab(PGresult *res);
#endif /* CROSSTABVIEW_H */
......@@ -177,6 +177,7 @@ slashUsage(unsigned short int pager)
fprintf(output, _(" \\gexec execute query, then execute each value in its result\n"));
fprintf(output, _(" \\gset [PREFIX] execute query and store results in psql variables\n"));
fprintf(output, _(" \\q quit psql\n"));
fprintf(output, _(" \\crosstabview [COLUMNS] execute query and display results in crosstab\n"));
fprintf(output, _(" \\watch [SEC] execute query every SEC seconds\n"));
fprintf(output, "\n");
......
......@@ -93,6 +93,10 @@ typedef struct _psqlSettings
char *gfname; /* one-shot file output argument for \g */
char *gset_prefix; /* one-shot prefix argument for \gset */
bool gexec_flag; /* one-shot flag to execute query's results */
bool crosstab_flag; /* one-shot request to crosstab results */
char *ctv_col_V; /* \crosstabview 1st argument */
char *ctv_col_H; /* \crosstabview 2nd argument */
char *ctv_col_D; /* \crosstabview 3nd argument */
bool notty; /* stdin or stdout is not a tty (as determined
* on startup) */
......
......@@ -1274,7 +1274,8 @@ psql_completion(const char *text, int start, int end)
/* psql's backslash commands. */
static const char *const backslash_commands[] = {
"\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy", "\\copyright",
"\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy",
"\\copyright", "\\crosstabview",
"\\d", "\\da", "\\db", "\\dc", "\\dC", "\\dd", "\\ddp", "\\dD",
"\\des", "\\det", "\\deu", "\\dew", "\\dE", "\\df",
"\\dF", "\\dFd", "\\dFp", "\\dFt", "\\dg", "\\di", "\\dl", "\\dL",
......
......@@ -3295,30 +3295,9 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
for (i = 0; i < cont.ncolumns; i++)
{
char align;
Oid ftype = PQftype(result, i);
switch (ftype)
{
case INT2OID:
case INT4OID:
case INT8OID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
case OIDOID:
case XIDOID:
case CIDOID:
case CASHOID:
align = 'r';
break;
default:
align = 'l';
break;
}
printTableAddHeader(&cont, PQfname(result, i),
opt->translate_header, align);
opt->translate_header,
column_type_alignment(PQftype(result, i)));
}
/* set cells */
......@@ -3360,6 +3339,31 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
printTableCleanup(&cont);
}
char
column_type_alignment(Oid ftype)
{
char align;
switch (ftype)
{
case INT2OID:
case INT4OID:
case INT8OID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
case OIDOID:
case XIDOID:
case CIDOID:
case CASHOID:
align = 'r';
break;
default:
align = 'l';
break;
}
return align;
}
void
setDecimalLocale(void)
......
......@@ -206,6 +206,8 @@ extern void printTable(const printTableContent *cont,
extern void printQuery(const PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog);
extern char column_type_alignment(Oid);
extern void setDecimalLocale(void);
extern const printTextFormat *get_line_style(const printTableOpt *opt);
extern void refresh_utf8format(const printTableOpt *opt);
......
......@@ -2476,6 +2476,7 @@ execute q;
+------------------+-------------------+
deallocate q;
\pset linestyle ascii
prepare q as select ' | = | lkjsafi\\/ /oeu rio)(!@&*#)*(!&@*) \ (&' as " | -- | 012345678 9abc def!*@#&!@(*&*~~_+-=\ \", '11' as "0123456789", 11 as int from generate_series(1,10) as n;
\pset format asciidoc
\pset expanded off
......@@ -2682,6 +2683,9 @@ execute q;
<l|int >l|11
|====
deallocate q;
\pset format aligned
\pset expanded off
\pset border 1
-- SHOW_CONTEXT
\set SHOW_CONTEXT never
do $$
......@@ -2710,3 +2714,188 @@ NOTICE: foo
CONTEXT: PL/pgSQL function inline_code_block line 3 at RAISE
ERROR: bar
CONTEXT: PL/pgSQL function inline_code_block line 4 at RAISE
--
-- \crosstabview
--
CREATE TABLE ctv_data (v, h, c, i, d) AS
VALUES
('v1','h2','foo', 3, '2015-04-01'::date),
('v2','h1','bar', 3, '2015-01-02'),
('v1','h0','baz', NULL, '2015-07-12'),
('v0','h4','qux', 4, '2015-07-15'),
('v0','h4','dbl', -3, '2014-12-15'),
('v0',NULL,'qux', 5, '2014-07-15'),
('v1','h2','quux',7, '2015-04-04');
-- running \crosstabview after query uses query in buffer
SELECT v, EXTRACT(year FROM d), count(*)
FROM ctv_data
GROUP BY 1, 2
ORDER BY 1, 2;
v | date_part | count
----+-----------+-------
v0 | 2014 | 2
v0 | 2015 | 1
v1 | 2015 | 3
v2 | 2015 | 1
(4 rows)
-- basic usage with 3 columns
\crosstabview
v | 2014 | 2015
----+------+------
v0 | 2 | 1
v1 | | 3
v2 | | 1
(3 rows)
-- ordered months in horizontal header, quoted column name
SELECT v, to_char(d, 'Mon') AS "month name", EXTRACT(month FROM d) AS num,
count(*) FROM ctv_data GROUP BY 1,2,3 ORDER BY 1
\crosstabview v "month name":num 4
v | Jan | Apr | Jul | Dec
----+-----+-----+-----+-----
v0 | | | 2 | 1
v1 | | 2 | 1 |
v2 | 1 | | |
(3 rows)
-- ordered months in vertical header, ordered years in horizontal header
SELECT EXTRACT(year FROM d) AS year, to_char(d,'Mon') AS "month name",
EXTRACT(month FROM d) AS month,
format('sum=%s avg=%s', sum(i), avg(i)::numeric(2,1))
FROM ctv_data
GROUP BY EXTRACT(year FROM d), to_char(d,'Mon'), EXTRACT(month FROM d)
ORDER BY month
\crosstabview "month name" year:year format
month name | 2014 | 2015
------------+-----------------+----------------
Jan | | sum=3 avg=3.0
Apr | | sum=10 avg=5.0
Jul | sum=5 avg=5.0 | sum=4 avg=4.0
Dec | sum=-3 avg=-3.0 |
(4 rows)
-- combine contents vertically into the same cell (V/H duplicates)
SELECT v, h, string_agg(c, E'\n') FROM ctv_data GROUP BY v, h ORDER BY 1,2,3
\crosstabview 1 2 3
v | h4 | | h0 | h2 | h1
----+-----+-----+-----+------+-----
v0 | qux+| qux | | |
| dbl | | | |
v1 | | | baz | foo +|
| | | | quux |
v2 | | | | | bar
(3 rows)
-- horizontal ASC order from window function
SELECT v,h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
v | h0 | h1 | h2 | h4 |
----+-----+-----+------+-----+-----
v0 | | | | qux+| qux
| | | | dbl |
v1 | baz | | foo +| |
| | | quux | |
v2 | | bar | | |
(3 rows)
-- horizontal DESC order from window function
SELECT v, h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h DESC) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
v | | h4 | h2 | h1 | h0
----+-----+-----+------+-----+-----
v0 | qux | qux+| | |
| | dbl | | |
v1 | | | foo +| | baz
| | | quux | |
v2 | | | | bar |
(3 rows)
-- horizontal ASC order from window function, NULLs pushed rightmost
SELECT v,h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h NULLS LAST) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
v | h0 | h1 | h2 | h4 |
----+-----+-----+------+-----+-----
v0 | | | | qux+| qux
| | | | dbl |
v1 | baz | | foo +| |
| | | quux | |
v2 | | bar | | |
(3 rows)
-- only null, no column name, 2 columns: error
SELECT null,null \crosstabview
The query must return at least two columns to be shown in crosstab
-- only null, no column name, 3 columns: works
SELECT null,null,null \crosstabview
?column? |
----------+--
|
(1 row)
-- null display
\pset null '#null#'
SELECT v,h, string_agg(i::text, E'\n') AS i FROM ctv_data
GROUP BY v, h ORDER BY h,v
\crosstabview v h i
v | h0 | h1 | h2 | h4 | #null#
----+--------+----+----+----+--------
v1 | #null# | | 3 +| |
| | | 7 | |
v2 | | 3 | | |
v0 | | | | 4 +| 5
| | | | -3 |
(3 rows)
\pset null ''
-- refer to columns by position
SELECT v,h,string_agg(i::text, E'\n'), string_agg(c, E'\n')
FROM ctv_data GROUP BY v, h ORDER BY h,v
\crosstabview 2 1 4
h | v1 | v2 | v0
----+------+-----+-----
h0 | baz | |
h1 | | bar |
h2 | foo +| |
| quux | |
h4 | | | qux+
| | | dbl
| | | qux
(5 rows)
-- refer to columns by positions and names mixed
SELECT v,h, string_agg(i::text, E'\n') AS i, string_agg(c, E'\n') AS c
FROM ctv_data GROUP BY v, h ORDER BY h,v
\crosstabview 1 "h" 4
v | h0 | h1 | h2 | h4 |
----+-----+-----+------+-----+-----
v1 | baz | | foo +| |
| | | quux | |
v2 | | bar | | |
v0 | | | | qux+| qux
| | | | dbl |
(3 rows)
-- error: bad column name
SELECT v,h,c,i FROM ctv_data
\crosstabview v h j
Invalid column name: j
-- error: bad column number
SELECT v,h,i,c FROM ctv_data
\crosstabview 2 1 5
Invalid column number: 5
-- error: same H and V columns
SELECT v,h,i,c FROM ctv_data
\crosstabview 2 h 4
The same column cannot be used for both vertical and horizontal headers
-- error: too many columns
SELECT a,a,1 FROM generate_series(1,3000) AS a
\crosstabview
Maximum number of columns (1600) exceeded
-- error: only one column
SELECT 1 \crosstabview
The query must return at least two columns to be shown in crosstab
DROP TABLE ctv_data;
......@@ -326,6 +326,8 @@ execute q;
deallocate q;
\pset linestyle ascii
prepare q as select ' | = | lkjsafi\\/ /oeu rio)(!@&*#)*(!&@*) \ (&' as " | -- | 012345678 9abc def!*@#&!@(*&*~~_+-=\ \", '11' as "0123456789", 11 as int from generate_series(1,10) as n;
\pset format asciidoc
......@@ -351,6 +353,10 @@ execute q;
deallocate q;
\pset format aligned
\pset expanded off
\pset border 1
-- SHOW_CONTEXT
\set SHOW_CONTEXT never
......@@ -373,3 +379,102 @@ begin
raise notice 'foo';
raise exception 'bar';
end $$;
--
-- \crosstabview
--
CREATE TABLE ctv_data (v, h, c, i, d) AS
VALUES
('v1','h2','foo', 3, '2015-04-01'::date),
('v2','h1','bar', 3, '2015-01-02'),
('v1','h0','baz', NULL, '2015-07-12'),
('v0','h4','qux', 4, '2015-07-15'),
('v0','h4','dbl', -3, '2014-12-15'),
('v0',NULL,'qux', 5, '2014-07-15'),
('v1','h2','quux',7, '2015-04-04');
-- running \crosstabview after query uses query in buffer
SELECT v, EXTRACT(year FROM d), count(*)
FROM ctv_data
GROUP BY 1, 2
ORDER BY 1, 2;
-- basic usage with 3 columns
\crosstabview
-- ordered months in horizontal header, quoted column name
SELECT v, to_char(d, 'Mon') AS "month name", EXTRACT(month FROM d) AS num,
count(*) FROM ctv_data GROUP BY 1,2,3 ORDER BY 1
\crosstabview v "month name":num 4
-- ordered months in vertical header, ordered years in horizontal header
SELECT EXTRACT(year FROM d) AS year, to_char(d,'Mon') AS "month name",
EXTRACT(month FROM d) AS month,
format('sum=%s avg=%s', sum(i), avg(i)::numeric(2,1))
FROM ctv_data
GROUP BY EXTRACT(year FROM d), to_char(d,'Mon'), EXTRACT(month FROM d)
ORDER BY month
\crosstabview "month name" year:year format
-- combine contents vertically into the same cell (V/H duplicates)
SELECT v, h, string_agg(c, E'\n') FROM ctv_data GROUP BY v, h ORDER BY 1,2,3
\crosstabview 1 2 3
-- horizontal ASC order from window function
SELECT v,h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
-- horizontal DESC order from window function
SELECT v, h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h DESC) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
-- horizontal ASC order from window function, NULLs pushed rightmost
SELECT v,h, string_agg(c, E'\n') AS c, row_number() OVER(ORDER BY h NULLS LAST) AS r
FROM ctv_data GROUP BY v, h ORDER BY 1,3,2
\crosstabview v h:r c
-- only null, no column name, 2 columns: error
SELECT null,null \crosstabview
-- only null, no column name, 3 columns: works
SELECT null,null,null \crosstabview
-- null display
\pset null '#null#'
SELECT v,h, string_agg(i::text, E'\n') AS i FROM ctv_data
GROUP BY v, h ORDER BY h,v
\crosstabview v h i
\pset null ''
-- refer to columns by position
SELECT v,h,string_agg(i::text, E'\n'), string_agg(c, E'\n')
FROM ctv_data GROUP BY v, h ORDER BY h,v
\crosstabview 2 1 4
-- refer to columns by positions and names mixed
SELECT v,h, string_agg(i::text, E'\n') AS i, string_agg(c, E'\n') AS c
FROM ctv_data GROUP BY v, h ORDER BY h,v
\crosstabview 1 "h" 4
-- error: bad column name
SELECT v,h,c,i FROM ctv_data
\crosstabview v h j
-- error: bad column number
SELECT v,h,i,c FROM ctv_data
\crosstabview 2 1 5
-- error: same H and V columns
SELECT v,h,i,c FROM ctv_data
\crosstabview 2 h 4
-- error: too many columns
SELECT a,a,1 FROM generate_series(1,3000) AS a
\crosstabview
-- error: only one column
SELECT 1 \crosstabview
DROP TABLE ctv_data;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment