Commit 9d984866 authored by Peter Eisentraut's avatar Peter Eisentraut

Split the plpython regression test into test cases arranged by topic, instead

of the previous monolithic setup-create-run sequence, that was apparently
inherited from a previous test infrastructure, but makes working with the
tests and adding new ones weird.
parent ef7574eb
# $PostgreSQL: pgsql/src/pl/plpython/Makefile,v 1.32 2009/01/15 13:49:56 petere Exp $
# $PostgreSQL: pgsql/src/pl/plpython/Makefile,v 1.33 2009/08/12 16:37:25 petere Exp $
subdir = src/pl/plpython
top_builddir = ../../..
......@@ -57,7 +57,22 @@ endif
SHLIB_LINK = $(python_libspec) $(python_additional_libs) $(filter -lintl,$(LIBS))
REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-language=plpythonu
REGRESS = plpython_schema plpython_populate plpython_function plpython_test plpython_error plpython_drop
REGRESS = \
plpython_schema \
plpython_populate \
plpython_test \
plpython_global \
plpython_import \
plpython_spi \
plpython_newline \
plpython_void \
plpython_params \
plpython_setof \
plpython_record \
plpython_trigger \
plpython_error \
plpython_unicode \
plpython_drop
# where to find psql for running the tests
PSQLDIR = $(bindir)
......
Guide to alternative expected files:
plpython_error_2.out Python 2.2, 2.3, 2.4
plpython_error.out Python 2.5, 2.6
plpython_unicode_2.out Python 2.2
plpython_unicode_3.out Python 2.3, 2.4
plpython_unicode.out Python 2.5, 2.6
-- test error handling, i forgot to restore Warn_restart in
-- the trigger handler once. the errors and subsequent core dump were
-- interesting.
/* Flat out syntax error
*/
CREATE FUNCTION sql_syntax_error() RETURNS text
AS
'plpy.execute("syntax error")'
LANGUAGE plpythonu;
SELECT sql_syntax_error();
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_execute_query
CONTEXT: PL/Python function "sql_syntax_error"
ERROR: syntax error at or near "syntax"
LINE 1: syntax error
^
QUERY: syntax error
CONTEXT: PL/Python function "sql_syntax_error"
/* check the handling of uncaught python exceptions
*/
CREATE FUNCTION exception_index_invalid(text) RETURNS text
AS
'return args[1]'
LANGUAGE plpythonu;
SELECT exception_index_invalid('test');
ERROR: PL/Python: PL/Python function "exception_index_invalid" failed
DETAIL: <type 'exceptions.IndexError'>: list index out of range
CONTEXT: PL/Python function "exception_index_invalid"
/* check handling of nested exceptions
*/
CREATE FUNCTION exception_index_invalid_nested() RETURNS text
AS
'rv = plpy.execute("SELECT test5(''foo'')")
return rv[0]'
LANGUAGE plpythonu;
SELECT exception_index_invalid_nested();
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_execute_query
CONTEXT: PL/Python function "exception_index_invalid_nested"
ERROR: function test5(unknown) does not exist
LINE 1: SELECT test5('foo')
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
QUERY: SELECT test5('foo')
CONTEXT: PL/Python function "exception_index_invalid_nested"
/* a typo
*/
CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
SD["plan"] = plpy.prepare(q, [ "test" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_uncaught('rick');
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_uncaught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_uncaught"
/* for what it's worth catch the exception generated by
* the typo, and return None
*/
CREATE FUNCTION invalid_type_caught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.notice(str(ex))
return None
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_caught('rick');
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_caught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_caught"
/* for what it's worth catch the exception generated by
* the typo, and reraise it as a plain error
*/
CREATE FUNCTION invalid_type_reraised(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.error(str(ex))
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_reraised('rick');
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_reraised"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_reraised"
/* no typo no messing about
*/
CREATE FUNCTION valid_type(a text) RETURNS text
AS
'if not SD.has_key("plan"):
SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT valid_type('rick');
valid_type
------------
(1 row)
--
-- Test Unicode error handling.
--
SELECT unicode_return_error();
ERROR: PL/Python: could not create string representation of Python object, while creating return value
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_return_error"
INSERT INTO unicode_test (testvalue) VALUES ('test');
ERROR: PL/Python: could not compute string representation of Python object, while modifying trigger row
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_trigger_error"
SELECT unicode_plan_error1();
WARNING: PL/Python: plpy.Error: unrecognized error in PLy_spi_execute_plan
CONTEXT: PL/Python function "unicode_plan_error1"
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error1"
SELECT unicode_plan_error2();
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error2"
-- test error handling, i forgot to restore Warn_restart in
-- the trigger handler once. the errors and subsequent core dump were
-- interesting.
SELECT invalid_type_uncaught('rick');
WARNING: PL/Python: in PL/Python function "invalid_type_uncaught"
DETAIL: plpy.SPIError: unrecognized error in PLy_spi_prepare
ERROR: type "test" does not exist
SELECT invalid_type_caught('rick');
WARNING: PL/Python: in PL/Python function "invalid_type_caught"
DETAIL: plpy.SPIError: unrecognized error in PLy_spi_prepare
ERROR: type "test" does not exist
SELECT invalid_type_reraised('rick');
WARNING: PL/Python: in PL/Python function "invalid_type_reraised"
DETAIL: plpy.SPIError: unrecognized error in PLy_spi_prepare
ERROR: type "test" does not exist
SELECT valid_type('rick');
valid_type
------------
(1 row)
--
-- Test Unicode error handling.
--
SELECT unicode_return_error();
ERROR: PL/Python: could not create string representation of Python object in PL/Python function "unicode_return_error" while creating return value
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character '\u80' in position 0: ordinal not in range(128)
INSERT INTO unicode_test (testvalue) VALUES ('test');
ERROR: PL/Python: could not compute string representation of Python object in PL/Python function "unicode_trigger_error" while modifying trigger row
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character '\u80' in position 0: ordinal not in range(128)
SELECT unicode_plan_error1();
WARNING: PL/Python: in PL/Python function "unicode_plan_error1"
DETAIL: plpy.Error: unrecognized error in PLy_spi_execute_plan
ERROR: PL/Python: PL/Python function "unicode_plan_error1" could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character '\u80' in position 0: ordinal not in range(128)
SELECT unicode_plan_error2();
ERROR: PL/Python: PL/Python function "unicode_plan_error2" could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character '\u80' in position 0: ordinal not in range(128)
-- test error handling, i forgot to restore Warn_restart in
-- the trigger handler once. the errors and subsequent core dump were
-- interesting.
/* Flat out syntax error
*/
CREATE FUNCTION sql_syntax_error() RETURNS text
AS
'plpy.execute("syntax error")'
LANGUAGE plpythonu;
SELECT sql_syntax_error();
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_execute_query
CONTEXT: PL/Python function "sql_syntax_error"
ERROR: syntax error at or near "syntax"
LINE 1: syntax error
^
QUERY: syntax error
CONTEXT: PL/Python function "sql_syntax_error"
/* check the handling of uncaught python exceptions
*/
CREATE FUNCTION exception_index_invalid(text) RETURNS text
AS
'return args[1]'
LANGUAGE plpythonu;
SELECT exception_index_invalid('test');
ERROR: PL/Python: PL/Python function "exception_index_invalid" failed
DETAIL: exceptions.IndexError: list index out of range
CONTEXT: PL/Python function "exception_index_invalid"
/* check handling of nested exceptions
*/
CREATE FUNCTION exception_index_invalid_nested() RETURNS text
AS
'rv = plpy.execute("SELECT test5(''foo'')")
return rv[0]'
LANGUAGE plpythonu;
SELECT exception_index_invalid_nested();
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_execute_query
CONTEXT: PL/Python function "exception_index_invalid_nested"
ERROR: function test5(unknown) does not exist
LINE 1: SELECT test5('foo')
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
QUERY: SELECT test5('foo')
CONTEXT: PL/Python function "exception_index_invalid_nested"
/* a typo
*/
CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
SD["plan"] = plpy.prepare(q, [ "test" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_uncaught('rick');
WARNING: plpython: in function invalid_type_uncaught:
DETAIL: plpy.SPIError: Unknown error in PLy_spi_prepare
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_uncaught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_uncaught"
/* for what it's worth catch the exception generated by
* the typo, and return None
*/
CREATE FUNCTION invalid_type_caught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.notice(str(ex))
return None
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_caught('rick');
WARNING: plpython: in function invalid_type_caught:
DETAIL: plpy.SPIError: Unknown error in PLy_spi_prepare
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_caught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_caught"
/* for what it's worth catch the exception generated by
* the typo, and reraise it as a plain error
*/
CREATE FUNCTION invalid_type_reraised(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.error(str(ex))
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_reraised('rick');
WARNING: plpython: in function invalid_type_reraised:
DETAIL: plpy.SPIError: Unknown error in PLy_spi_prepare
WARNING: PL/Python: plpy.SPIError: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_reraised"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_reraised"
/* no typo no messing about
*/
CREATE FUNCTION valid_type(a text) RETURNS text
AS
'if not SD.has_key("plan"):
SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT valid_type('rick');
valid_type
------------
(1 row)
--
-- Test Unicode error handling.
--
SELECT unicode_return_error();
ERROR: plpython: function "unicode_return_error" could not create return value
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
INSERT INTO unicode_test (testvalue) VALUES ('test');
ERROR: plpython: function "unicode_trigger_error" could not modify tuple
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
SELECT unicode_plan_error1();
WARNING: plpython: in function unicode_plan_error1:
DETAIL: plpy.Error: Unknown error in PLy_spi_execute_plan
ERROR: plpython: function "unicode_plan_error1" could not execute plan
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
SELECT unicode_plan_error2();
ERROR: plpython: function "unicode_plan_error2" could not execute plan
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
--
-- check static and global data (SD and GD)
--
CREATE FUNCTION global_test_one() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_one"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_one"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION global_test_two() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_two"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_two"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION static_test() returns int4
AS
'if SD.has_key("call"):
SD["call"] = SD["call"] + 1
else:
SD["call"] = 1
return SD["call"]
'
LANGUAGE plpythonu;
SELECT static_test();
static_test
-------------
1
(1 row)
SELECT static_test();
static_test
-------------
2
(1 row)
SELECT global_test_one();
global_test_one
--------------------------------------------------------
SD: set by global_test_one, GD: set by global_test_one
(1 row)
SELECT global_test_two();
global_test_two
--------------------------------------------------------
SD: set by global_test_two, GD: set by global_test_one
(1 row)
-- import python modules
CREATE FUNCTION import_fail() returns text
AS
'try:
import foosocket
except Exception, ex:
plpy.notice("import socket failed -- %s" % str(ex))
return "failed as expected"
return "succeeded, that wasn''t supposed to happen"'
LANGUAGE plpythonu;
CREATE FUNCTION import_succeed() returns text
AS
'try:
import array
import bisect
import calendar
import cmath
import errno
import math
import md5
import operator
import random
import re
import sha
import string
import time
except Exception, ex:
plpy.notice("import failed -- %s" % str(ex))
return "failed, that wasn''t supposed to happen"
return "succeeded, as expected"'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_one(p text) RETURNS text
AS
'import sha
digest = sha.new(p)
return digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_two(u users) RETURNS text
AS
'import sha
plain = u["fname"] + u["lname"]
digest = sha.new(plain);
return "sha hash of " + plain + " is " + digest.hexdigest()'
LANGUAGE plpythonu;
-- import python modules
--
SELECT import_fail();
NOTICE: ('import socket failed -- No module named foosocket',)
CONTEXT: PL/Python function "import_fail"
import_fail
--------------------
failed as expected
(1 row)
SELECT import_succeed();
import_succeed
------------------------
succeeded, as expected
(1 row)
-- test import and simple argument handling
--
SELECT import_test_one('sha hash of this string');
import_test_one
------------------------------------------
a04e23cb9b1a09cd1051a04a7c571aae0f90346c
(1 row)
-- test import and tuple argument handling
--
select import_test_two(users) from users where fname = 'willem';
import_test_two
-------------------------------------------------------------------
sha hash of willemdoe is 3cde6b574953b0ca937b4d76ebc40d534d910759
(1 row)
--
-- Universal Newline Support
--
CREATE OR REPLACE FUNCTION newline_lf() RETURNS integer AS
E'x = 100\ny = 23\nreturn x + y\n'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_cr() RETURNS integer AS
E'x = 100\ry = 23\rreturn x + y\r'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_crlf() RETURNS integer AS
E'x = 100\r\ny = 23\r\nreturn x + y\r\n'
LANGUAGE plpythonu;
SELECT newline_lf();
newline_lf
------------
123
(1 row)
SELECT newline_cr();
newline_cr
------------
123
(1 row)
SELECT newline_crlf();
newline_crlf
--------------
123
(1 row)
--
-- Test named and nameless parameters
--
CREATE FUNCTION test_param_names0(integer, integer) RETURNS int AS $$
return args[0] + args[1]
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names1(a0 integer, a1 text) RETURNS boolean AS $$
assert a0 == args[0]
assert a1 == args[1]
return True
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names2(u users) RETURNS text AS $$
assert u == args[0]
return str(u)
$$ LANGUAGE plpythonu;
-- use deliberately wrong parameter names
CREATE FUNCTION test_param_names3(a0 integer) RETURNS boolean AS $$
try:
assert a1 == args[0]
return False
except NameError, e:
assert e.args[0].find("a1") > -1
return True
$$ LANGUAGE plpythonu;
SELECT test_param_names0(2,7);
test_param_names0
-------------------
9
(1 row)
SELECT test_param_names1(1,'text');
test_param_names1
-------------------
t
(1 row)
SELECT test_param_names2(users) from users;
test_param_names2
-----------------------------------------------------------------------
{'lname': 'doe', 'username': 'j_doe', 'userid': 1, 'fname': 'jane'}
{'lname': 'doe', 'username': 'johnd', 'userid': 2, 'fname': 'john'}
{'lname': 'doe', 'username': 'w_doe', 'userid': 3, 'fname': 'willem'}
{'lname': 'smith', 'username': 'slash', 'userid': 4, 'fname': 'rick'}
(4 rows)
SELECT test_param_names3(1);
test_param_names3
-------------------
t
(1 row)
--
-- Test returning tuples
--
CREATE FUNCTION test_table_record_as(typ text, first text, second integer, retnull boolean) RETURNS table_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_type_record_as(typ text, first text, second integer, retnull boolean) RETURNS type_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_in_out_params(first in text, second out text) AS $$
return first + '_in_to_out';
$$ LANGUAGE plpythonu;
-- this doesn't work yet :-(
CREATE FUNCTION test_in_out_params_multi(first in text,
second out text, third out text) AS $$
return first + '_record_in_to_out';
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_inout_params(first inout text) AS $$
return first + '_inout';
$$ LANGUAGE plpythonu;
-- Test tuple returning functions
SELECT * FROM test_table_record_as('dict', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('dict', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('dict', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('dict', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('dict', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('tuple', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('tuple', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('tuple', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('tuple', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('tuple', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('list', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('list', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('list', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('list', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('list', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('obj', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('obj', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('obj', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('obj', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('obj', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('dict', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('dict', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('dict', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('dict', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('dict', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('tuple', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('tuple', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('tuple', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('tuple', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('tuple', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('list', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('list', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('list', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('list', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('list', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('obj', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('obj', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('obj', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('obj', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('obj', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_in_out_params('test_in');
second
-------------------
test_in_in_to_out
(1 row)
-- this doesn't work yet :-(
SELECT * FROM test_in_out_params_multi('test_in');
ERROR: PL/Python functions cannot return type record
SELECT * FROM test_inout_params('test_in');
first
---------------
test_in_inout
(1 row)
......@@ -41,9 +41,6 @@ CREATE TABLE xsequences (
sequence text not null
) ;
CREATE INDEX xsequences_pid_idx ON xsequences(pid) ;
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE TABLE table_record (
first text,
second int4
......
--
-- Test returning SETOF
--
CREATE FUNCTION test_setof_as_list(count integer, content text) RETURNS SETOF text AS $$
return [ content ]*count
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_tuple(count integer, content text) RETURNS SETOF text AS $$
t = ()
for i in xrange(count):
t += ( content, )
return t
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_iterator(count integer, content text) RETURNS SETOF text AS $$
class producer:
def __init__ (self, icount, icontent):
self.icontent = icontent
self.icount = icount
def __iter__ (self):
return self
def next (self):
if self.icount == 0:
raise StopIteration
self.icount -= 1
return self.icontent
return producer(count, content)
$$ LANGUAGE plpythonu;
-- Test set returning functions
SELECT test_setof_as_list(0, 'list');
test_setof_as_list
--------------------
(0 rows)
SELECT test_setof_as_list(1, 'list');
test_setof_as_list
--------------------
list
(1 row)
SELECT test_setof_as_list(2, 'list');
test_setof_as_list
--------------------
list
list
(2 rows)
SELECT test_setof_as_list(2, null);
test_setof_as_list
--------------------
(2 rows)
SELECT test_setof_as_tuple(0, 'tuple');
test_setof_as_tuple
---------------------
(0 rows)
SELECT test_setof_as_tuple(1, 'tuple');
test_setof_as_tuple
---------------------
tuple
(1 row)
SELECT test_setof_as_tuple(2, 'tuple');
test_setof_as_tuple
---------------------
tuple
tuple
(2 rows)
SELECT test_setof_as_tuple(2, null);
test_setof_as_tuple
---------------------
(2 rows)
SELECT test_setof_as_iterator(0, 'list');
test_setof_as_iterator
------------------------
(0 rows)
SELECT test_setof_as_iterator(1, 'list');
test_setof_as_iterator
------------------------
list
(1 row)
SELECT test_setof_as_iterator(2, 'list');
test_setof_as_iterator
------------------------
list
list
(2 rows)
SELECT test_setof_as_iterator(2, null);
test_setof_as_iterator
------------------------
(2 rows)
-- nested calls
--
CREATE FUNCTION nested_call_one(a text) RETURNS text
AS
'q = "SELECT nested_call_two(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_two(a text) RETURNS text
AS
'q = "SELECT nested_call_three(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_three(a text) RETURNS text
AS
'return a'
LANGUAGE plpythonu ;
-- some spi stuff
CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT count(*) FROM users WHERE lname = $1"
SD["myplan"] = plpy.prepare(q, [ "text" ])
try:
rv = plpy.execute(SD["myplan"], [a])
return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a
SD["myplan"] = plpy.prepare(q)
try:
rv = plpy.execute(SD["myplan"])
if len(rv):
return rv[0]["count"]
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION join_sequences(s sequences) RETURNS text
AS
'if not s["multipart"]:
return s["sequence"]
q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]
rv = plpy.execute(q)
seq = s["sequence"]
for r in rv:
seq = seq + r["sequence"]
return seq
'
LANGUAGE plpythonu;
-- spi and nested calls
--
select nested_call_one('pass this along');
nested_call_one
-----------------------------------------------------------------
{'nested_call_two': "{'nested_call_three': 'pass this along'}"}
(1 row)
select spi_prepared_plan_test_one('doe');
spi_prepared_plan_test_one
----------------------------
there are 3 does
(1 row)
select spi_prepared_plan_test_one('smith');
spi_prepared_plan_test_one
----------------------------
there are 1 smiths
(1 row)
select spi_prepared_plan_test_nested('smith');
spi_prepared_plan_test_nested
-------------------------------
there are 1 smiths
(1 row)
SELECT join_sequences(sequences) FROM sequences;
join_sequences
----------------
ABCDEFGHIJKL
ABCDEF
ABCDEF
ABCDEF
ABCDEF
ABCDEF
(6 rows)
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^A';
join_sequences
----------------
ABCDEFGHIJKL
ABCDEF
ABCDEF
ABCDEF
ABCDEF
ABCDEF
(6 rows)
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^B';
join_sequences
----------------
(0 rows)
-- first some tests of basic functionality
--
-- better succeed
--
-- really stupid function just to get the module loaded
CREATE FUNCTION stupid() RETURNS text AS 'return "zarkon"' LANGUAGE plpythonu;
select stupid();
stupid
--------
zarkon
(1 row)
-- check static and global data
--
SELECT static_test();
static_test
-------------
1
(1 row)
SELECT static_test();
static_test
-------------
2
(1 row)
SELECT global_test_one();
global_test_one
--------------------------------------------------------
SD: set by global_test_one, GD: set by global_test_one
(1 row)
SELECT global_test_two();
global_test_two
--------------------------------------------------------
SD: set by global_test_two, GD: set by global_test_one
(1 row)
-- import python modules
--
SELECT import_fail();
NOTICE: ('import socket failed -- No module named foosocket',)
CONTEXT: PL/Python function "import_fail"
import_fail
--------------------
failed as expected
(1 row)
SELECT import_succeed();
import_succeed
------------------------
succeeded, as expected
(1 row)
-- test import and simple argument handling
--
SELECT import_test_one('sha hash of this string');
import_test_one
------------------------------------------
a04e23cb9b1a09cd1051a04a7c571aae0f90346c
(1 row)
-- test import and tuple argument handling
--
select import_test_two(users) from users where fname = 'willem';
import_test_two
-------------------------------------------------------------------
sha hash of willemdoe is 3cde6b574953b0ca937b4d76ebc40d534d910759
(1 row)
-- test multiple arguments
--
CREATE FUNCTION argument_test_one(u users, a1 text, a2 text) RETURNS text
AS
'keys = u.keys()
keys.sort()
out = []
for key in keys:
out.append("%s: %s" % (key, u[key]))
words = a1 + " " + a2 + " => {" + ", ".join(out) + "}"
return words'
LANGUAGE plpythonu;
select argument_test_one(users, fname, lname) from users where lname = 'doe' order by 1;
argument_test_one
-----------------------------------------------------------------------
......@@ -76,488 +26,3 @@ select argument_test_one(users, fname, lname) from users where lname = 'doe' ord
willem doe => {fname: willem, lname: doe, userid: 3, username: w_doe}
(3 rows)
-- spi and nested calls
--
select nested_call_one('pass this along');
nested_call_one
-----------------------------------------------------------------
{'nested_call_two': "{'nested_call_three': 'pass this along'}"}
(1 row)
select spi_prepared_plan_test_one('doe');
spi_prepared_plan_test_one
----------------------------
there are 3 does
(1 row)
select spi_prepared_plan_test_one('smith');
spi_prepared_plan_test_one
----------------------------
there are 1 smiths
(1 row)
select spi_prepared_plan_test_nested('smith');
spi_prepared_plan_test_nested
-------------------------------
there are 1 smiths
(1 row)
-- quick peek at the table
--
SELECT * FROM users;
fname | lname | username | userid
--------+-------+----------+--------
jane | doe | j_doe | 1
john | doe | johnd | 2
willem | doe | w_doe | 3
rick | smith | slash | 4
(4 rows)
-- should fail
--
UPDATE users SET fname = 'william' WHERE fname = 'willem';
-- should modify william to willem and create username
--
INSERT INTO users (fname, lname) VALUES ('william', 'smith');
INSERT INTO users (fname, lname, username) VALUES ('charles', 'darwin', 'beagle');
SELECT * FROM users;
fname | lname | username | userid
---------+--------+----------+--------
jane | doe | j_doe | 1
john | doe | johnd | 2
willem | doe | w_doe | 3
rick | smith | slash | 4
willem | smith | w_smith | 5
charles | darwin | beagle | 6
(6 rows)
SELECT join_sequences(sequences) FROM sequences;
join_sequences
----------------
ABCDEFGHIJKL
ABCDEF
ABCDEF
ABCDEF
ABCDEF
ABCDEF
(6 rows)
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^A';
join_sequences
----------------
ABCDEFGHIJKL
ABCDEF
ABCDEF
ABCDEF
ABCDEF
ABCDEF
(6 rows)
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^B';
join_sequences
----------------
(0 rows)
-- error in trigger
--
--
-- Check Universal Newline Support
--
SELECT newline_lf();
newline_lf
------------
123
(1 row)
SELECT newline_cr();
newline_cr
------------
123
(1 row)
SELECT newline_crlf();
newline_crlf
--------------
123
(1 row)
-- Tests for functions returning void
SELECT test_void_func1(), test_void_func1() IS NULL AS "is null";
test_void_func1 | is null
-----------------+---------
| f
(1 row)
SELECT test_void_func2(); -- should fail
ERROR: PL/Python function with return type "void" did not return None
CONTEXT: PL/Python function "test_void_func2"
SELECT test_return_none(), test_return_none() IS NULL AS "is null";
test_return_none | is null
------------------+---------
| t
(1 row)
-- Test for functions with named and nameless parameters
SELECT test_param_names0(2,7);
test_param_names0
-------------------
9
(1 row)
SELECT test_param_names1(1,'text');
test_param_names1
-------------------
t
(1 row)
SELECT test_param_names2(users) from users;
test_param_names2
----------------------------------------------------------------------------
{'lname': 'doe', 'username': 'j_doe', 'userid': 1, 'fname': 'jane'}
{'lname': 'doe', 'username': 'johnd', 'userid': 2, 'fname': 'john'}
{'lname': 'doe', 'username': 'w_doe', 'userid': 3, 'fname': 'willem'}
{'lname': 'smith', 'username': 'slash', 'userid': 4, 'fname': 'rick'}
{'lname': 'smith', 'username': 'w_smith', 'userid': 5, 'fname': 'willem'}
{'lname': 'darwin', 'username': 'beagle', 'userid': 6, 'fname': 'charles'}
(6 rows)
SELECT test_param_names3(1);
test_param_names3
-------------------
t
(1 row)
-- Test set returning functions
SELECT test_setof_as_list(0, 'list');
test_setof_as_list
--------------------
(0 rows)
SELECT test_setof_as_list(1, 'list');
test_setof_as_list
--------------------
list
(1 row)
SELECT test_setof_as_list(2, 'list');
test_setof_as_list
--------------------
list
list
(2 rows)
SELECT test_setof_as_list(2, null);
test_setof_as_list
--------------------
(2 rows)
SELECT test_setof_as_tuple(0, 'tuple');
test_setof_as_tuple
---------------------
(0 rows)
SELECT test_setof_as_tuple(1, 'tuple');
test_setof_as_tuple
---------------------
tuple
(1 row)
SELECT test_setof_as_tuple(2, 'tuple');
test_setof_as_tuple
---------------------
tuple
tuple
(2 rows)
SELECT test_setof_as_tuple(2, null);
test_setof_as_tuple
---------------------
(2 rows)
SELECT test_setof_as_iterator(0, 'list');
test_setof_as_iterator
------------------------
(0 rows)
SELECT test_setof_as_iterator(1, 'list');
test_setof_as_iterator
------------------------
list
(1 row)
SELECT test_setof_as_iterator(2, 'list');
test_setof_as_iterator
------------------------
list
list
(2 rows)
SELECT test_setof_as_iterator(2, null);
test_setof_as_iterator
------------------------
(2 rows)
-- Test tuple returning functions
SELECT * FROM test_table_record_as('dict', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('dict', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('dict', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('dict', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('dict', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('tuple', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('tuple', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('tuple', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('tuple', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('tuple', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('list', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('list', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('list', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('list', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('list', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('obj', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_table_record_as('obj', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_table_record_as('obj', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_table_record_as('obj', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_table_record_as('obj', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('dict', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('dict', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('dict', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('dict', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('dict', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('tuple', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('tuple', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('tuple', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('tuple', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('tuple', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('list', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('list', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('list', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('list', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('list', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('obj', null, null, false);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_type_record_as('obj', 'one', null, false);
first | second
-------+--------
one |
(1 row)
SELECT * FROM test_type_record_as('obj', null, 2, false);
first | second
-------+--------
| 2
(1 row)
SELECT * FROM test_type_record_as('obj', 'three', 3, false);
first | second
-------+--------
three | 3
(1 row)
SELECT * FROM test_type_record_as('obj', null, null, true);
first | second
-------+--------
|
(1 row)
SELECT * FROM test_in_out_params('test_in');
second
-------------------
test_in_in_to_out
(1 row)
-- this doesn't work yet :-(
SELECT * FROM test_in_out_params_multi('test_in');
ERROR: PL/Python functions cannot return type record
SELECT * FROM test_inout_params('test_in');
first
---------------
test_in_inout
(1 row)
CREATE FUNCTION global_test_one() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_one"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_one"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION global_test_two() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_two"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_two"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION static_test() returns int4
AS
'if SD.has_key("call"):
SD["call"] = SD["call"] + 1
else:
SD["call"] = 1
return SD["call"]
'
LANGUAGE plpythonu;
-- import python modules
CREATE FUNCTION import_fail() returns text
AS
'try:
import foosocket
except Exception, ex:
plpy.notice("import socket failed -- %s" % str(ex))
return "failed as expected"
return "succeeded, that wasn''t supposed to happen"'
LANGUAGE plpythonu;
CREATE FUNCTION import_succeed() returns text
AS
'try:
import array
import bisect
import calendar
import cmath
import errno
import math
import md5
import operator
import random
import re
import sha
import string
import time
except Exception, ex:
plpy.notice("import failed -- %s" % str(ex))
return "failed, that wasn''t supposed to happen"
return "succeeded, as expected"'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_one(p text) RETURNS text
AS
'import sha
digest = sha.new(p)
return digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_two(u users) RETURNS text
AS
'import sha
plain = u["fname"] + u["lname"]
digest = sha.new(plain);
return "sha hash of " + plain + " is " + digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION argument_test_one(u users, a1 text, a2 text) RETURNS text
AS
'keys = u.keys()
keys.sort()
out = []
for key in keys:
out.append("%s: %s" % (key, u[key]))
words = a1 + " " + a2 + " => {" + ", ".join(out) + "}"
return words'
LANGUAGE plpythonu;
-- these triggers are dedicated to HPHC of RI who
-- decided that my kid's name was william not willem, and
-- vigorously resisted all efforts at correction. they have
......@@ -114,6 +35,35 @@ CREATE TRIGGER users_update_trig BEFORE UPDATE ON users FOR EACH ROW
EXECUTE PROCEDURE users_update ('willem');
CREATE TRIGGER users_delete_trig BEFORE DELETE ON users FOR EACH ROW
EXECUTE PROCEDURE users_delete ('willem');
-- quick peek at the table
--
SELECT * FROM users;
fname | lname | username | userid
--------+-------+----------+--------
jane | doe | j_doe | 1
john | doe | johnd | 2
willem | doe | w_doe | 3
rick | smith | slash | 4
(4 rows)
-- should fail
--
UPDATE users SET fname = 'william' WHERE fname = 'willem';
-- should modify william to willem and create username
--
INSERT INTO users (fname, lname) VALUES ('william', 'smith');
INSERT INTO users (fname, lname, username) VALUES ('charles', 'darwin', 'beagle');
SELECT * FROM users;
fname | lname | username | userid
---------+--------+----------+--------
jane | doe | j_doe | 1
john | doe | johnd | 2
willem | doe | w_doe | 3
rick | smith | slash | 4
willem | smith | w_smith | 5
charles | darwin | beagle | 6
(6 rows)
-- dump trigger data
CREATE TABLE trigger_test
(i int, v text );
......@@ -201,282 +151,3 @@ CONTEXT: PL/Python function "trigger_data"
DROP TRIGGER show_trigger_data_trig on trigger_test;
DROP FUNCTION trigger_data();
-- nested calls
--
CREATE FUNCTION nested_call_one(a text) RETURNS text
AS
'q = "SELECT nested_call_two(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_two(a text) RETURNS text
AS
'q = "SELECT nested_call_three(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_three(a text) RETURNS text
AS
'return a'
LANGUAGE plpythonu ;
-- some spi stuff
CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT count(*) FROM users WHERE lname = $1"
SD["myplan"] = plpy.prepare(q, [ "text" ])
try:
rv = plpy.execute(SD["myplan"], [a])
return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a
SD["myplan"] = plpy.prepare(q)
try:
rv = plpy.execute(SD["myplan"])
if len(rv):
return rv[0]["count"]
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
/* really stupid function just to get the module loaded
*/
CREATE FUNCTION stupid() RETURNS text AS 'return "zarkon"' LANGUAGE plpythonu;
/* a typo
*/
CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
SD["plan"] = plpy.prepare(q, [ "test" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* for what it's worth catch the exception generated by
* the typo, and return None
*/
CREATE FUNCTION invalid_type_caught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.notice(str(ex))
return None
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* for what it's worth catch the exception generated by
* the typo, and reraise it as a plain error
*/
CREATE FUNCTION invalid_type_reraised(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.error(str(ex))
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* no typo no messing about
*/
CREATE FUNCTION valid_type(a text) RETURNS text
AS
'if not SD.has_key("plan"):
SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* Flat out syntax error
*/
CREATE FUNCTION sql_syntax_error() RETURNS text
AS
'plpy.execute("syntax error")'
LANGUAGE plpythonu;
/* check the handling of uncaught python exceptions
*/
CREATE FUNCTION exception_index_invalid(text) RETURNS text
AS
'return args[1]'
LANGUAGE plpythonu;
/* check handling of nested exceptions
*/
CREATE FUNCTION exception_index_invalid_nested() RETURNS text
AS
'rv = plpy.execute("SELECT test5(''foo'')")
return rv[0]'
LANGUAGE plpythonu;
CREATE FUNCTION join_sequences(s sequences) RETURNS text
AS
'if not s["multipart"]:
return s["sequence"]
q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]
rv = plpy.execute(q)
seq = s["sequence"]
for r in rv:
seq = seq + r["sequence"]
return seq
'
LANGUAGE plpythonu;
--
-- Universal Newline Support
--
CREATE OR REPLACE FUNCTION newline_lf() RETURNS integer AS
E'x = 100\ny = 23\nreturn x + y\n'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_cr() RETURNS integer AS
E'x = 100\ry = 23\rreturn x + y\r'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_crlf() RETURNS integer AS
E'x = 100\r\ny = 23\r\nreturn x + y\r\n'
LANGUAGE plpythonu;
--
-- Unicode error handling
--
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
-- Tests for functions that return void
CREATE FUNCTION test_void_func1() RETURNS void AS $$
x = 10
$$ LANGUAGE plpythonu;
-- illegal: can't return non-None value in void-returning func
CREATE FUNCTION test_void_func2() RETURNS void AS $$
return 10
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_return_none() RETURNS int AS $$
None
$$ LANGUAGE plpythonu;
--
-- Test named and nameless parameters
--
CREATE FUNCTION test_param_names0(integer, integer) RETURNS int AS $$
return args[0] + args[1]
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names1(a0 integer, a1 text) RETURNS boolean AS $$
assert a0 == args[0]
assert a1 == args[1]
return True
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names2(u users) RETURNS text AS $$
assert u == args[0]
return str(u)
$$ LANGUAGE plpythonu;
-- use deliberately wrong parameter names
CREATE FUNCTION test_param_names3(a0 integer) RETURNS boolean AS $$
try:
assert a1 == args[0]
return False
except NameError, e:
assert e.args[0].find("a1") > -1
return True
$$ LANGUAGE plpythonu;
--
-- Test returning SETOF
--
CREATE FUNCTION test_setof_as_list(count integer, content text) RETURNS SETOF text AS $$
return [ content ]*count
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_tuple(count integer, content text) RETURNS SETOF text AS $$
t = ()
for i in xrange(count):
t += ( content, )
return t
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_iterator(count integer, content text) RETURNS SETOF text AS $$
class producer:
def __init__ (self, icount, icontent):
self.icontent = icontent
self.icount = icount
def __iter__ (self):
return self
def next (self):
if self.icount == 0:
raise StopIteration
self.icount -= 1
return self.icontent
return producer(count, content)
$$ LANGUAGE plpythonu;
--
-- Test returning tuples
--
CREATE FUNCTION test_table_record_as(typ text, first text, second integer, retnull boolean) RETURNS table_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_type_record_as(typ text, first text, second integer, retnull boolean) RETURNS type_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_in_out_params(first in text, second out text) AS $$
return first + '_in_to_out';
$$ LANGUAGE plpythonu;
-- this doesn't work yet :-(
CREATE FUNCTION test_in_out_params_multi(first in text,
second out text, third out text) AS $$
return first + '_record_in_to_out';
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_inout_params(first inout text) AS $$
return first + '_inout';
$$ LANGUAGE plpythonu;
-- test error handling, i forgot to restore Warn_restart in
-- the trigger handler once. the errors and subsequent core dump were
-- interesting.
SELECT invalid_type_uncaught('rick');
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_uncaught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_uncaught"
SELECT invalid_type_caught('rick');
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_caught"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_caught"
SELECT invalid_type_reraised('rick');
WARNING: PL/Python: <class 'plpy.SPIError'>: unrecognized error in PLy_spi_prepare
CONTEXT: PL/Python function "invalid_type_reraised"
ERROR: type "test" does not exist
CONTEXT: PL/Python function "invalid_type_reraised"
SELECT valid_type('rick');
valid_type
------------
(1 row)
--
-- Test Unicode error handling.
-- Unicode handling
--
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
SELECT unicode_return_error();
ERROR: PL/Python: could not create string representation of Python object, while creating return value
DETAIL: <type 'exceptions.UnicodeEncodeError'>: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
......
--
-- Unicode handling
--
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
SELECT unicode_return_error();
ERROR: PL/Python: could not create string representation of Python object, while creating return value
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_return_error"
INSERT INTO unicode_test (testvalue) VALUES ('test');
ERROR: PL/Python: could not compute string representation of Python object, while modifying trigger row
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_trigger_error"
SELECT unicode_plan_error1();
WARNING: PL/Python: plpy.Error: unrecognized error in PLy_spi_execute_plan
CONTEXT: PL/Python function "unicode_plan_error1"
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error1"
SELECT unicode_plan_error2();
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeError: ASCII encoding error: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error2"
--
-- Unicode handling
--
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
SELECT unicode_return_error();
ERROR: PL/Python: could not create string representation of Python object, while creating return value
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_return_error"
INSERT INTO unicode_test (testvalue) VALUES ('test');
ERROR: PL/Python: could not compute string representation of Python object, while modifying trigger row
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_trigger_error"
SELECT unicode_plan_error1();
WARNING: PL/Python: plpy.Error: unrecognized error in PLy_spi_execute_plan
CONTEXT: PL/Python function "unicode_plan_error1"
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error1"
SELECT unicode_plan_error2();
ERROR: PL/Python: could not execute plan
DETAIL: exceptions.UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 0: ordinal not in range(128)
CONTEXT: PL/Python function "unicode_plan_error2"
--
-- Tests for functions that return void
--
CREATE FUNCTION test_void_func1() RETURNS void AS $$
x = 10
$$ LANGUAGE plpythonu;
-- illegal: can't return non-None value in void-returning func
CREATE FUNCTION test_void_func2() RETURNS void AS $$
return 10
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_return_none() RETURNS int AS $$
None
$$ LANGUAGE plpythonu;
-- Tests for functions returning void
SELECT test_void_func1(), test_void_func1() IS NULL AS "is null";
test_void_func1 | is null
-----------------+---------
| f
(1 row)
SELECT test_void_func2(); -- should fail
ERROR: PL/Python function with return type "void" did not return None
CONTEXT: PL/Python function "test_void_func2"
SELECT test_return_none(), test_return_none() IS NULL AS "is null";
test_return_none | is null
------------------+---------
| t
(1 row)
-- test error handling, i forgot to restore Warn_restart in
-- the trigger handler once. the errors and subsequent core dump were
-- interesting.
/* Flat out syntax error
*/
CREATE FUNCTION sql_syntax_error() RETURNS text
AS
'plpy.execute("syntax error")'
LANGUAGE plpythonu;
SELECT sql_syntax_error();
/* check the handling of uncaught python exceptions
*/
CREATE FUNCTION exception_index_invalid(text) RETURNS text
AS
'return args[1]'
LANGUAGE plpythonu;
SELECT exception_index_invalid('test');
/* check handling of nested exceptions
*/
CREATE FUNCTION exception_index_invalid_nested() RETURNS text
AS
'rv = plpy.execute("SELECT test5(''foo'')")
return rv[0]'
LANGUAGE plpythonu;
SELECT exception_index_invalid_nested();
/* a typo
*/
CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
SD["plan"] = plpy.prepare(q, [ "test" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_uncaught('rick');
/* for what it's worth catch the exception generated by
* the typo, and return None
*/
CREATE FUNCTION invalid_type_caught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.notice(str(ex))
return None
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_caught('rick');
/* for what it's worth catch the exception generated by
* the typo, and reraise it as a plain error
*/
CREATE FUNCTION invalid_type_reraised(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.error(str(ex))
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT invalid_type_reraised('rick');
SELECT valid_type('rick');
--
-- Test Unicode error handling.
--
SELECT unicode_return_error();
INSERT INTO unicode_test (testvalue) VALUES ('test');
SELECT unicode_plan_error1();
SELECT unicode_plan_error2();
/* no typo no messing about
*/
CREATE FUNCTION valid_type(a text) RETURNS text
AS
'if not SD.has_key("plan"):
SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
SELECT valid_type('rick');
CREATE FUNCTION global_test_one() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_one"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_one"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION global_test_two() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_two"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_two"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION static_test() returns int4
AS
'if SD.has_key("call"):
SD["call"] = SD["call"] + 1
else:
SD["call"] = 1
return SD["call"]
'
LANGUAGE plpythonu;
-- import python modules
CREATE FUNCTION import_fail() returns text
AS
'try:
import foosocket
except Exception, ex:
plpy.notice("import socket failed -- %s" % str(ex))
return "failed as expected"
return "succeeded, that wasn''t supposed to happen"'
LANGUAGE plpythonu;
CREATE FUNCTION import_succeed() returns text
AS
'try:
import array
import bisect
import calendar
import cmath
import errno
import math
import md5
import operator
import random
import re
import sha
import string
import time
except Exception, ex:
plpy.notice("import failed -- %s" % str(ex))
return "failed, that wasn''t supposed to happen"
return "succeeded, as expected"'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_one(p text) RETURNS text
AS
'import sha
digest = sha.new(p)
return digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_two(u users) RETURNS text
AS
'import sha
plain = u["fname"] + u["lname"]
digest = sha.new(plain);
return "sha hash of " + plain + " is " + digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION argument_test_one(u users, a1 text, a2 text) RETURNS text
AS
'keys = u.keys()
keys.sort()
out = []
for key in keys:
out.append("%s: %s" % (key, u[key]))
words = a1 + " " + a2 + " => {" + ", ".join(out) + "}"
return words'
LANGUAGE plpythonu;
-- these triggers are dedicated to HPHC of RI who
-- decided that my kid's name was william not willem, and
-- vigorously resisted all efforts at correction. they have
-- since gone bankrupt...
CREATE FUNCTION users_insert() returns trigger
AS
'if TD["new"]["fname"] == None or TD["new"]["lname"] == None:
return "SKIP"
if TD["new"]["username"] == None:
TD["new"]["username"] = TD["new"]["fname"][:1] + "_" + TD["new"]["lname"]
rv = "MODIFY"
else:
rv = None
if TD["new"]["fname"] == "william":
TD["new"]["fname"] = TD["args"][0]
rv = "MODIFY"
return rv'
LANGUAGE plpythonu;
CREATE FUNCTION users_update() returns trigger
AS
'if TD["event"] == "UPDATE":
if TD["old"]["fname"] != TD["new"]["fname"] and TD["old"]["fname"] == TD["args"][0]:
return "SKIP"
return None'
LANGUAGE plpythonu;
CREATE FUNCTION users_delete() RETURNS trigger
AS
'if TD["old"]["fname"] == TD["args"][0]:
return "SKIP"
return None'
LANGUAGE plpythonu;
CREATE TRIGGER users_insert_trig BEFORE INSERT ON users FOR EACH ROW
EXECUTE PROCEDURE users_insert ('willem');
CREATE TRIGGER users_update_trig BEFORE UPDATE ON users FOR EACH ROW
EXECUTE PROCEDURE users_update ('willem');
CREATE TRIGGER users_delete_trig BEFORE DELETE ON users FOR EACH ROW
EXECUTE PROCEDURE users_delete ('willem');
-- dump trigger data
CREATE TABLE trigger_test
(i int, v text );
CREATE FUNCTION trigger_data() returns trigger language plpythonu as $$
if TD.has_key('relid'):
TD['relid'] = "bogus:12345"
skeys = TD.keys()
skeys.sort()
for key in skeys:
val = TD[key]
plpy.notice("TD[" + key + "] => " + str(val))
return None
$$;
CREATE TRIGGER show_trigger_data_trig
BEFORE INSERT OR UPDATE OR DELETE ON trigger_test
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
insert into trigger_test values(1,'insert');
update trigger_test set v = 'update' where i = 1;
delete from trigger_test;
DROP TRIGGER show_trigger_data_trig on trigger_test;
DROP FUNCTION trigger_data();
-- nested calls
--
CREATE FUNCTION nested_call_one(a text) RETURNS text
AS
'q = "SELECT nested_call_two(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_two(a text) RETURNS text
AS
'q = "SELECT nested_call_three(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_three(a text) RETURNS text
AS
'return a'
LANGUAGE plpythonu ;
-- some spi stuff
CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT count(*) FROM users WHERE lname = $1"
SD["myplan"] = plpy.prepare(q, [ "text" ])
try:
rv = plpy.execute(SD["myplan"], [a])
return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a
SD["myplan"] = plpy.prepare(q)
try:
rv = plpy.execute(SD["myplan"])
if len(rv):
return rv[0]["count"]
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
/* really stupid function just to get the module loaded
*/
CREATE FUNCTION stupid() RETURNS text AS 'return "zarkon"' LANGUAGE plpythonu;
/* a typo
*/
CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
SD["plan"] = plpy.prepare(q, [ "test" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* for what it's worth catch the exception generated by
* the typo, and return None
*/
CREATE FUNCTION invalid_type_caught(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.notice(str(ex))
return None
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* for what it's worth catch the exception generated by
* the typo, and reraise it as a plain error
*/
CREATE FUNCTION invalid_type_reraised(a text) RETURNS text
AS
'if not SD.has_key("plan"):
q = "SELECT fname FROM users WHERE lname = $1"
try:
SD["plan"] = plpy.prepare(q, [ "test" ])
except plpy.SPIError, ex:
plpy.error(str(ex))
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* no typo no messing about
*/
CREATE FUNCTION valid_type(a text) RETURNS text
AS
'if not SD.has_key("plan"):
SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])
rv = plpy.execute(SD["plan"], [ a ])
if len(rv):
return rv[0]["fname"]
return None
'
LANGUAGE plpythonu;
/* Flat out syntax error
*/
CREATE FUNCTION sql_syntax_error() RETURNS text
AS
'plpy.execute("syntax error")'
LANGUAGE plpythonu;
/* check the handling of uncaught python exceptions
*/
CREATE FUNCTION exception_index_invalid(text) RETURNS text
AS
'return args[1]'
LANGUAGE plpythonu;
/* check handling of nested exceptions
*/
CREATE FUNCTION exception_index_invalid_nested() RETURNS text
AS
'rv = plpy.execute("SELECT test5(''foo'')")
return rv[0]'
LANGUAGE plpythonu;
CREATE FUNCTION join_sequences(s sequences) RETURNS text
AS
'if not s["multipart"]:
return s["sequence"]
q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]
rv = plpy.execute(q)
seq = s["sequence"]
for r in rv:
seq = seq + r["sequence"]
return seq
'
LANGUAGE plpythonu;
--
-- Universal Newline Support
--
CREATE OR REPLACE FUNCTION newline_lf() RETURNS integer AS
E'x = 100\ny = 23\nreturn x + y\n'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_cr() RETURNS integer AS
E'x = 100\ry = 23\rreturn x + y\r'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_crlf() RETURNS integer AS
E'x = 100\r\ny = 23\r\nreturn x + y\r\n'
LANGUAGE plpythonu;
--
-- Unicode error handling
--
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
-- Tests for functions that return void
CREATE FUNCTION test_void_func1() RETURNS void AS $$
x = 10
$$ LANGUAGE plpythonu;
-- illegal: can't return non-None value in void-returning func
CREATE FUNCTION test_void_func2() RETURNS void AS $$
return 10
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_return_none() RETURNS int AS $$
None
$$ LANGUAGE plpythonu;
--
-- Test named and nameless parameters
--
CREATE FUNCTION test_param_names0(integer, integer) RETURNS int AS $$
return args[0] + args[1]
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names1(a0 integer, a1 text) RETURNS boolean AS $$
assert a0 == args[0]
assert a1 == args[1]
return True
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names2(u users) RETURNS text AS $$
assert u == args[0]
return str(u)
$$ LANGUAGE plpythonu;
-- use deliberately wrong parameter names
CREATE FUNCTION test_param_names3(a0 integer) RETURNS boolean AS $$
try:
assert a1 == args[0]
return False
except NameError, e:
assert e.args[0].find("a1") > -1
return True
$$ LANGUAGE plpythonu;
--
-- Test returning SETOF
--
CREATE FUNCTION test_setof_as_list(count integer, content text) RETURNS SETOF text AS $$
return [ content ]*count
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_tuple(count integer, content text) RETURNS SETOF text AS $$
t = ()
for i in xrange(count):
t += ( content, )
return t
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_iterator(count integer, content text) RETURNS SETOF text AS $$
class producer:
def __init__ (self, icount, icontent):
self.icontent = icontent
self.icount = icount
def __iter__ (self):
return self
def next (self):
if self.icount == 0:
raise StopIteration
self.icount -= 1
return self.icontent
return producer(count, content)
$$ LANGUAGE plpythonu;
--
-- Test returning tuples
--
CREATE FUNCTION test_table_record_as(typ text, first text, second integer, retnull boolean) RETURNS table_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_type_record_as(typ text, first text, second integer, retnull boolean) RETURNS type_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_in_out_params(first in text, second out text) AS $$
return first + '_in_to_out';
$$ LANGUAGE plpythonu;
-- this doesn't work yet :-(
CREATE FUNCTION test_in_out_params_multi(first in text,
second out text, third out text) AS $$
return first + '_record_in_to_out';
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_inout_params(first inout text) AS $$
return first + '_inout';
$$ LANGUAGE plpythonu;
--
-- check static and global data (SD and GD)
--
CREATE FUNCTION global_test_one() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_one"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_one"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION global_test_two() returns text
AS
'if not SD.has_key("global_test"):
SD["global_test"] = "set by global_test_two"
if not GD.has_key("global_test"):
GD["global_test"] = "set by global_test_two"
return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'
LANGUAGE plpythonu;
CREATE FUNCTION static_test() returns int4
AS
'if SD.has_key("call"):
SD["call"] = SD["call"] + 1
else:
SD["call"] = 1
return SD["call"]
'
LANGUAGE plpythonu;
SELECT static_test();
SELECT static_test();
SELECT global_test_one();
SELECT global_test_two();
-- import python modules
CREATE FUNCTION import_fail() returns text
AS
'try:
import foosocket
except Exception, ex:
plpy.notice("import socket failed -- %s" % str(ex))
return "failed as expected"
return "succeeded, that wasn''t supposed to happen"'
LANGUAGE plpythonu;
CREATE FUNCTION import_succeed() returns text
AS
'try:
import array
import bisect
import calendar
import cmath
import errno
import math
import md5
import operator
import random
import re
import sha
import string
import time
except Exception, ex:
plpy.notice("import failed -- %s" % str(ex))
return "failed, that wasn''t supposed to happen"
return "succeeded, as expected"'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_one(p text) RETURNS text
AS
'import sha
digest = sha.new(p)
return digest.hexdigest()'
LANGUAGE plpythonu;
CREATE FUNCTION import_test_two(u users) RETURNS text
AS
'import sha
plain = u["fname"] + u["lname"]
digest = sha.new(plain);
return "sha hash of " + plain + " is " + digest.hexdigest()'
LANGUAGE plpythonu;
-- import python modules
--
SELECT import_fail();
SELECT import_succeed();
-- test import and simple argument handling
--
SELECT import_test_one('sha hash of this string');
-- test import and tuple argument handling
--
select import_test_two(users) from users where fname = 'willem';
--
-- Universal Newline Support
--
CREATE OR REPLACE FUNCTION newline_lf() RETURNS integer AS
E'x = 100\ny = 23\nreturn x + y\n'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_cr() RETURNS integer AS
E'x = 100\ry = 23\rreturn x + y\r'
LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION newline_crlf() RETURNS integer AS
E'x = 100\r\ny = 23\r\nreturn x + y\r\n'
LANGUAGE plpythonu;
SELECT newline_lf();
SELECT newline_cr();
SELECT newline_crlf();
--
-- Test named and nameless parameters
--
CREATE FUNCTION test_param_names0(integer, integer) RETURNS int AS $$
return args[0] + args[1]
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names1(a0 integer, a1 text) RETURNS boolean AS $$
assert a0 == args[0]
assert a1 == args[1]
return True
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_param_names2(u users) RETURNS text AS $$
assert u == args[0]
return str(u)
$$ LANGUAGE plpythonu;
-- use deliberately wrong parameter names
CREATE FUNCTION test_param_names3(a0 integer) RETURNS boolean AS $$
try:
assert a1 == args[0]
return False
except NameError, e:
assert e.args[0].find("a1") > -1
return True
$$ LANGUAGE plpythonu;
SELECT test_param_names0(2,7);
SELECT test_param_names1(1,'text');
SELECT test_param_names2(users) from users;
SELECT test_param_names3(1);
--
-- Test returning tuples
--
CREATE FUNCTION test_table_record_as(typ text, first text, second integer, retnull boolean) RETURNS table_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_type_record_as(typ text, first text, second integer, retnull boolean) RETURNS type_record AS $$
if retnull:
return None
if typ == 'dict':
return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }
elif typ == 'tuple':
return ( first, second )
elif typ == 'list':
return [ first, second ]
elif typ == 'obj':
class type_record: pass
type_record.first = first
type_record.second = second
return type_record
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_in_out_params(first in text, second out text) AS $$
return first + '_in_to_out';
$$ LANGUAGE plpythonu;
-- this doesn't work yet :-(
CREATE FUNCTION test_in_out_params_multi(first in text,
second out text, third out text) AS $$
return first + '_record_in_to_out';
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_inout_params(first inout text) AS $$
return first + '_inout';
$$ LANGUAGE plpythonu;
-- Test tuple returning functions
SELECT * FROM test_table_record_as('dict', null, null, false);
SELECT * FROM test_table_record_as('dict', 'one', null, false);
SELECT * FROM test_table_record_as('dict', null, 2, false);
SELECT * FROM test_table_record_as('dict', 'three', 3, false);
SELECT * FROM test_table_record_as('dict', null, null, true);
SELECT * FROM test_table_record_as('tuple', null, null, false);
SELECT * FROM test_table_record_as('tuple', 'one', null, false);
SELECT * FROM test_table_record_as('tuple', null, 2, false);
SELECT * FROM test_table_record_as('tuple', 'three', 3, false);
SELECT * FROM test_table_record_as('tuple', null, null, true);
SELECT * FROM test_table_record_as('list', null, null, false);
SELECT * FROM test_table_record_as('list', 'one', null, false);
SELECT * FROM test_table_record_as('list', null, 2, false);
SELECT * FROM test_table_record_as('list', 'three', 3, false);
SELECT * FROM test_table_record_as('list', null, null, true);
SELECT * FROM test_table_record_as('obj', null, null, false);
SELECT * FROM test_table_record_as('obj', 'one', null, false);
SELECT * FROM test_table_record_as('obj', null, 2, false);
SELECT * FROM test_table_record_as('obj', 'three', 3, false);
SELECT * FROM test_table_record_as('obj', null, null, true);
SELECT * FROM test_type_record_as('dict', null, null, false);
SELECT * FROM test_type_record_as('dict', 'one', null, false);
SELECT * FROM test_type_record_as('dict', null, 2, false);
SELECT * FROM test_type_record_as('dict', 'three', 3, false);
SELECT * FROM test_type_record_as('dict', null, null, true);
SELECT * FROM test_type_record_as('tuple', null, null, false);
SELECT * FROM test_type_record_as('tuple', 'one', null, false);
SELECT * FROM test_type_record_as('tuple', null, 2, false);
SELECT * FROM test_type_record_as('tuple', 'three', 3, false);
SELECT * FROM test_type_record_as('tuple', null, null, true);
SELECT * FROM test_type_record_as('list', null, null, false);
SELECT * FROM test_type_record_as('list', 'one', null, false);
SELECT * FROM test_type_record_as('list', null, 2, false);
SELECT * FROM test_type_record_as('list', 'three', 3, false);
SELECT * FROM test_type_record_as('list', null, null, true);
SELECT * FROM test_type_record_as('obj', null, null, false);
SELECT * FROM test_type_record_as('obj', 'one', null, false);
SELECT * FROM test_type_record_as('obj', null, 2, false);
SELECT * FROM test_type_record_as('obj', 'three', 3, false);
SELECT * FROM test_type_record_as('obj', null, null, true);
SELECT * FROM test_in_out_params('test_in');
-- this doesn't work yet :-(
SELECT * FROM test_in_out_params_multi('test_in');
SELECT * FROM test_inout_params('test_in');
......@@ -39,10 +39,6 @@ CREATE TABLE xsequences (
) ;
CREATE INDEX xsequences_pid_idx ON xsequences(pid) ;
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE TABLE table_record (
first text,
second int4
......
--
-- Test returning SETOF
--
CREATE FUNCTION test_setof_as_list(count integer, content text) RETURNS SETOF text AS $$
return [ content ]*count
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_tuple(count integer, content text) RETURNS SETOF text AS $$
t = ()
for i in xrange(count):
t += ( content, )
return t
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_setof_as_iterator(count integer, content text) RETURNS SETOF text AS $$
class producer:
def __init__ (self, icount, icontent):
self.icontent = icontent
self.icount = icount
def __iter__ (self):
return self
def next (self):
if self.icount == 0:
raise StopIteration
self.icount -= 1
return self.icontent
return producer(count, content)
$$ LANGUAGE plpythonu;
-- Test set returning functions
SELECT test_setof_as_list(0, 'list');
SELECT test_setof_as_list(1, 'list');
SELECT test_setof_as_list(2, 'list');
SELECT test_setof_as_list(2, null);
SELECT test_setof_as_tuple(0, 'tuple');
SELECT test_setof_as_tuple(1, 'tuple');
SELECT test_setof_as_tuple(2, 'tuple');
SELECT test_setof_as_tuple(2, null);
SELECT test_setof_as_iterator(0, 'list');
SELECT test_setof_as_iterator(1, 'list');
SELECT test_setof_as_iterator(2, 'list');
SELECT test_setof_as_iterator(2, null);
-- nested calls
--
CREATE FUNCTION nested_call_one(a text) RETURNS text
AS
'q = "SELECT nested_call_two(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_two(a text) RETURNS text
AS
'q = "SELECT nested_call_three(''%s'')" % a
r = plpy.execute(q)
return r[0]'
LANGUAGE plpythonu ;
CREATE FUNCTION nested_call_three(a text) RETURNS text
AS
'return a'
LANGUAGE plpythonu ;
-- some spi stuff
CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT count(*) FROM users WHERE lname = $1"
SD["myplan"] = plpy.prepare(q, [ "text" ])
try:
rv = plpy.execute(SD["myplan"], [a])
return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text
AS
'if not SD.has_key("myplan"):
q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a
SD["myplan"] = plpy.prepare(q)
try:
rv = plpy.execute(SD["myplan"])
if len(rv):
return rv[0]["count"]
except Exception, ex:
plpy.error(str(ex))
return None
'
LANGUAGE plpythonu;
CREATE FUNCTION join_sequences(s sequences) RETURNS text
AS
'if not s["multipart"]:
return s["sequence"]
q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]
rv = plpy.execute(q)
seq = s["sequence"]
for r in rv:
seq = seq + r["sequence"]
return seq
'
LANGUAGE plpythonu;
-- spi and nested calls
--
select nested_call_one('pass this along');
select spi_prepared_plan_test_one('doe');
select spi_prepared_plan_test_one('smith');
select spi_prepared_plan_test_nested('smith');
SELECT join_sequences(sequences) FROM sequences;
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^A';
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^B';
-- first some tests of basic functionality
--
-- better succeed
--
select stupid();
-- check static and global data
--
SELECT static_test();
SELECT static_test();
SELECT global_test_one();
SELECT global_test_two();
-- import python modules
--
SELECT import_fail();
SELECT import_succeed();
-- really stupid function just to get the module loaded
CREATE FUNCTION stupid() RETURNS text AS 'return "zarkon"' LANGUAGE plpythonu;
-- test import and simple argument handling
--
SELECT import_test_one('sha hash of this string');
select stupid();
-- test import and tuple argument handling
--
select import_test_two(users) from users where fname = 'willem';
-- test multiple arguments
--
select argument_test_one(users, fname, lname) from users where lname = 'doe' order by 1;
-- spi and nested calls
--
select nested_call_one('pass this along');
select spi_prepared_plan_test_one('doe');
select spi_prepared_plan_test_one('smith');
select spi_prepared_plan_test_nested('smith');
-- quick peek at the table
--
SELECT * FROM users;
-- should fail
--
UPDATE users SET fname = 'william' WHERE fname = 'willem';
-- should modify william to willem and create username
--
INSERT INTO users (fname, lname) VALUES ('william', 'smith');
INSERT INTO users (fname, lname, username) VALUES ('charles', 'darwin', 'beagle');
SELECT * FROM users;
SELECT join_sequences(sequences) FROM sequences;
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^A';
SELECT join_sequences(sequences) FROM sequences
WHERE join_sequences(sequences) ~* '^B';
-- error in trigger
--
--
-- Check Universal Newline Support
--
SELECT newline_lf();
SELECT newline_cr();
SELECT newline_crlf();
CREATE FUNCTION argument_test_one(u users, a1 text, a2 text) RETURNS text
AS
'keys = u.keys()
keys.sort()
out = []
for key in keys:
out.append("%s: %s" % (key, u[key]))
words = a1 + " " + a2 + " => {" + ", ".join(out) + "}"
return words'
LANGUAGE plpythonu;
-- Tests for functions returning void
SELECT test_void_func1(), test_void_func1() IS NULL AS "is null";
SELECT test_void_func2(); -- should fail
SELECT test_return_none(), test_return_none() IS NULL AS "is null";
-- Test for functions with named and nameless parameters
SELECT test_param_names0(2,7);
SELECT test_param_names1(1,'text');
SELECT test_param_names2(users) from users;
SELECT test_param_names3(1);
-- Test set returning functions
SELECT test_setof_as_list(0, 'list');
SELECT test_setof_as_list(1, 'list');
SELECT test_setof_as_list(2, 'list');
SELECT test_setof_as_list(2, null);
SELECT test_setof_as_tuple(0, 'tuple');
SELECT test_setof_as_tuple(1, 'tuple');
SELECT test_setof_as_tuple(2, 'tuple');
SELECT test_setof_as_tuple(2, null);
SELECT test_setof_as_iterator(0, 'list');
SELECT test_setof_as_iterator(1, 'list');
SELECT test_setof_as_iterator(2, 'list');
SELECT test_setof_as_iterator(2, null);
-- Test tuple returning functions
SELECT * FROM test_table_record_as('dict', null, null, false);
SELECT * FROM test_table_record_as('dict', 'one', null, false);
SELECT * FROM test_table_record_as('dict', null, 2, false);
SELECT * FROM test_table_record_as('dict', 'three', 3, false);
SELECT * FROM test_table_record_as('dict', null, null, true);
SELECT * FROM test_table_record_as('tuple', null, null, false);
SELECT * FROM test_table_record_as('tuple', 'one', null, false);
SELECT * FROM test_table_record_as('tuple', null, 2, false);
SELECT * FROM test_table_record_as('tuple', 'three', 3, false);
SELECT * FROM test_table_record_as('tuple', null, null, true);
SELECT * FROM test_table_record_as('list', null, null, false);
SELECT * FROM test_table_record_as('list', 'one', null, false);
SELECT * FROM test_table_record_as('list', null, 2, false);
SELECT * FROM test_table_record_as('list', 'three', 3, false);
SELECT * FROM test_table_record_as('list', null, null, true);
SELECT * FROM test_table_record_as('obj', null, null, false);
SELECT * FROM test_table_record_as('obj', 'one', null, false);
SELECT * FROM test_table_record_as('obj', null, 2, false);
SELECT * FROM test_table_record_as('obj', 'three', 3, false);
SELECT * FROM test_table_record_as('obj', null, null, true);
SELECT * FROM test_type_record_as('dict', null, null, false);
SELECT * FROM test_type_record_as('dict', 'one', null, false);
SELECT * FROM test_type_record_as('dict', null, 2, false);
SELECT * FROM test_type_record_as('dict', 'three', 3, false);
SELECT * FROM test_type_record_as('dict', null, null, true);
SELECT * FROM test_type_record_as('tuple', null, null, false);
SELECT * FROM test_type_record_as('tuple', 'one', null, false);
SELECT * FROM test_type_record_as('tuple', null, 2, false);
SELECT * FROM test_type_record_as('tuple', 'three', 3, false);
SELECT * FROM test_type_record_as('tuple', null, null, true);
SELECT * FROM test_type_record_as('list', null, null, false);
SELECT * FROM test_type_record_as('list', 'one', null, false);
SELECT * FROM test_type_record_as('list', null, 2, false);
SELECT * FROM test_type_record_as('list', 'three', 3, false);
SELECT * FROM test_type_record_as('list', null, null, true);
SELECT * FROM test_type_record_as('obj', null, null, false);
SELECT * FROM test_type_record_as('obj', 'one', null, false);
SELECT * FROM test_type_record_as('obj', null, 2, false);
SELECT * FROM test_type_record_as('obj', 'three', 3, false);
SELECT * FROM test_type_record_as('obj', null, null, true);
SELECT * FROM test_in_out_params('test_in');
-- this doesn't work yet :-(
SELECT * FROM test_in_out_params_multi('test_in');
SELECT * FROM test_inout_params('test_in');
select argument_test_one(users, fname, lname) from users where lname = 'doe' order by 1;
-- these triggers are dedicated to HPHC of RI who
-- decided that my kid's name was william not willem, and
-- vigorously resisted all efforts at correction. they have
-- since gone bankrupt...
CREATE FUNCTION users_insert() returns trigger
AS
'if TD["new"]["fname"] == None or TD["new"]["lname"] == None:
return "SKIP"
if TD["new"]["username"] == None:
TD["new"]["username"] = TD["new"]["fname"][:1] + "_" + TD["new"]["lname"]
rv = "MODIFY"
else:
rv = None
if TD["new"]["fname"] == "william":
TD["new"]["fname"] = TD["args"][0]
rv = "MODIFY"
return rv'
LANGUAGE plpythonu;
CREATE FUNCTION users_update() returns trigger
AS
'if TD["event"] == "UPDATE":
if TD["old"]["fname"] != TD["new"]["fname"] and TD["old"]["fname"] == TD["args"][0]:
return "SKIP"
return None'
LANGUAGE plpythonu;
CREATE FUNCTION users_delete() RETURNS trigger
AS
'if TD["old"]["fname"] == TD["args"][0]:
return "SKIP"
return None'
LANGUAGE plpythonu;
CREATE TRIGGER users_insert_trig BEFORE INSERT ON users FOR EACH ROW
EXECUTE PROCEDURE users_insert ('willem');
CREATE TRIGGER users_update_trig BEFORE UPDATE ON users FOR EACH ROW
EXECUTE PROCEDURE users_update ('willem');
CREATE TRIGGER users_delete_trig BEFORE DELETE ON users FOR EACH ROW
EXECUTE PROCEDURE users_delete ('willem');
-- quick peek at the table
--
SELECT * FROM users;
-- should fail
--
UPDATE users SET fname = 'william' WHERE fname = 'willem';
-- should modify william to willem and create username
--
INSERT INTO users (fname, lname) VALUES ('william', 'smith');
INSERT INTO users (fname, lname, username) VALUES ('charles', 'darwin', 'beagle');
SELECT * FROM users;
-- dump trigger data
CREATE TABLE trigger_test
(i int, v text );
CREATE FUNCTION trigger_data() returns trigger language plpythonu as $$
if TD.has_key('relid'):
TD['relid'] = "bogus:12345"
skeys = TD.keys()
skeys.sort()
for key in skeys:
val = TD[key]
plpy.notice("TD[" + key + "] => " + str(val))
return None
$$;
CREATE TRIGGER show_trigger_data_trig
BEFORE INSERT OR UPDATE OR DELETE ON trigger_test
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
insert into trigger_test values(1,'insert');
update trigger_test set v = 'update' where i = 1;
delete from trigger_test;
DROP TRIGGER show_trigger_data_trig on trigger_test;
DROP FUNCTION trigger_data();
--
-- Unicode handling
--
CREATE TABLE unicode_test (
testvalue text NOT NULL
);
CREATE FUNCTION unicode_return_error() RETURNS text AS E'
return u"\\x80"
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'
TD["new"]["testvalue"] = u"\\x80"
return "MODIFY"
' LANGUAGE plpythonu;
CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test
FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();
CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])
rv = plpy.execute(plan, [u"\\x80"], 1)
return rv[0]["testvalue"]
' LANGUAGE plpythonu;
CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'
plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])
rv = plpy.execute(plan, u"\\x80", 1)
return rv[0]["testvalue1"]
' LANGUAGE plpythonu;
SELECT unicode_return_error();
INSERT INTO unicode_test (testvalue) VALUES ('test');
SELECT unicode_plan_error1();
SELECT unicode_plan_error2();
--
-- Tests for functions that return void
--
CREATE FUNCTION test_void_func1() RETURNS void AS $$
x = 10
$$ LANGUAGE plpythonu;
-- illegal: can't return non-None value in void-returning func
CREATE FUNCTION test_void_func2() RETURNS void AS $$
return 10
$$ LANGUAGE plpythonu;
CREATE FUNCTION test_return_none() RETURNS int AS $$
None
$$ LANGUAGE plpythonu;
-- Tests for functions returning void
SELECT test_void_func1(), test_void_func1() IS NULL AS "is null";
SELECT test_void_func2(); -- should fail
SELECT test_return_none(), test_return_none() IS NULL AS "is null";
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment