Commit 7b8eb0b4 authored by Bruce Momjian's avatar Bruce Momjian

Attached is a fairly sizeable update to contrib/dblink. I'd love to get

review/feedback if anyone is interested and can spend the time. But I'd
also love to get this committed and address changes as incremental
patches ;-), so if there are no objections, please apply.

Below I'll give a synopsis of the changes. More detailed descriptions
are now in a new doc directory under contrib/dblink. There is also a new

dblink.test.sql file which will give a pretty good overview of the
functions and their use.

Joe Conway
parent a12b4e27
...@@ -48,7 +48,7 @@ dbase - ...@@ -48,7 +48,7 @@ dbase -
dblink - dblink -
Allows remote query execution Allows remote query execution
by Joe Conway <joe.conway@mail.com> by Joe Conway <mail@joeconway.com>
dbmirror - dbmirror -
Replication server Replication server
...@@ -73,7 +73,7 @@ fulltextindex - ...@@ -73,7 +73,7 @@ fulltextindex -
fuzzystrmatch - fuzzystrmatch -
Levenshtein, metaphone, and soundex fuzzy string matching Levenshtein, metaphone, and soundex fuzzy string matching
by Joe Conway <joseph.conway@home.com>, Joel Burton <jburton@scw.org> by Joe Conway <mail@joeconway.com>, Joel Burton <jburton@scw.org>
intagg - intagg -
Integer aggregator Integer aggregator
......
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
* *
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, * Joe Conway <mail@joeconway.com>
*
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -25,13 +27,36 @@ ...@@ -25,13 +27,36 @@
* *
*/ */
Version 0.5 (25 August, 2002):
Version 0.4 (7 April, 2002): Major overhaul to work with new backend "table function" capability. Removed
Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and dblink_strtok() and dblink_replace() functions because they are now
various utility functions. available as backend functions (split() and replace() respectively).
Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version
is no longer backwards portable to PostgreSQL 7.2.
Release Notes: Release Notes:
Version 0.5
- dblink now supports use directly as a table function; this is the new
preferred usage going forward
- Use of dblink_tok is now deprecated; original form of dblink is also
deprecated. They _will_ be removed in the next version.
- dblink_last_oid is also deprecated; use dblink_exec() which returns
the command status as a single row, single column result.
- Original dblink, dblink_tok, and dblink_last_oid are commented out in
dblink.sql; remove the comments to use the deprecated functions.
- dblink_strtok() and dblink_replace() functions were removed. Use
split() and replace() respectively (new backend functions in
PostgreSQL 7.3) instead.
- New functions: dblink_exec() for non-SELECT queries; dblink_connect()
opens connection that persists for duration of a backend;
dblink_disconnect() closes a persistent connection; dblink_open()
opens a cursor; dblink_fetch() fetches results from an open cursor.
dblink_close() closes a cursor.
- New test suite: dblink_check.sh, dblink.test.sql,
dblink.test.expected.out. Execute dblink_check.sh from the same
directory as the other two files. Output is dblink.test.out and
dblink.test.diff. Note that dblink.test.sql is a good source
of example usage.
Version 0.4 Version 0.4
- removed cursor wrap around input sql to allow for remote - removed cursor wrap around input sql to allow for remote
...@@ -59,16 +84,48 @@ Installation: ...@@ -59,16 +84,48 @@ Installation:
installs following functions into database template1: installs following functions into database template1:
dblink(text,text) RETURNS setof int connection
- returns a resource id for results from remote query ------------
dblink_tok(int,int) RETURNS text dblink_connect(text) RETURNS text
- extracts and returns individual field results - opens a connection that will persist for duration of current
dblink_strtok(text,text,int) RETURNS text backend or until it is disconnected
- extracts and returns individual token from delimited text dblink_disconnect() RETURNS text
- disconnects a persistent connection
cursor
------------
dblink_open(text,text) RETURNS text
- opens a cursor using connection already opened with dblink_connect()
that will persist for duration of current backend or until it is
closed
dblink_fetch(text, int) RETURNS setof record
- fetches data from an already opened cursor
dblink_close(text) RETURNS text
- closes a cursor
query
------------
dblink(text,text) RETURNS setof record
- returns a set of results from remote SELECT query
(Note: comment out in dblink.sql to use deprecated version)
dblink(text) RETURNS setof record
- returns a set of results from remote SELECT query, using connection
already opened with dblink_connect()
execute
------------
dblink_exec(text, text) RETURNS text
- executes an INSERT/UPDATE/DELETE query remotely
dblink_exec(text) RETURNS text
- executes an INSERT/UPDATE/DELETE query remotely, using connection
already opened with dblink_connect()
misc
------------
dblink_current_query() RETURNS text
- returns the current query string
dblink_get_pkey(text) RETURNS setof text dblink_get_pkey(text) RETURNS setof text
- returns the field names of a relation's primary key fields - returns the field names of a relation's primary key fields
dblink_last_oid(int) RETURNS oid
- returns the last inserted oid
dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
- builds an insert statement using a local tuple, replacing the - builds an insert statement using a local tuple, replacing the
selection key field values with alternate supplied values selection key field values with alternate supplied values
...@@ -78,338 +135,30 @@ Installation: ...@@ -78,338 +135,30 @@ Installation:
dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
- builds an update statement using a local tuple, replacing the - builds an update statement using a local tuple, replacing the
selection key field values with alternate supplied values selection key field values with alternate supplied values
dblink_current_query() RETURNS text
- returns the current query string
dblink_replace(text,text,text) RETURNS text
- replace all occurences of substring-a in the input-string
with substring-b
Documentation
==================================================================
Name
dblink -- Returns a resource id for a data set from a remote database
Synopsis
dblink(text connstr, text sql)
Inputs
connstr
standard libpq format connection srting,
e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
sql
sql statement that you wish to execute on the remote host
e.g. "select * from pg_class"
Outputs
Returns setof int (res_id)
Example usage
select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'select f1, f2 from mytable');
==================================================================
Name
dblink_tok -- Returns individual select field results from a dblink remote query
Synopsis
dblink_tok(int res_id, int fnumber)
Inputs
res_id
a resource id returned by a call to dblink()
fnumber
the ordinal position (zero based) of the field to be returned from the dblink result set
Outputs
Returns text
Example usage
select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'select f1, f2 from mytable') as dblink_p) as t1;
==================================================================
A more convenient way to use dblink may be to create a view:
create view myremotetable as
select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=template1 user=postgres password=postgres'
,'select proname, prosrc from pg_proc') as dblink_p) as t1;
Then you can simply write:
select f1, f2 from myremotetable where f1 like 'bytea%';
==================================================================
Name
dblink_strtok -- Extracts and returns individual token from delimited text
Synopsis
dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
Inputs
inputstring
any string you want to parse a token out of;
e.g. 'f=1&g=3&h=4'
delimiter
a single character to use as the delimiter;
e.g. '&' or '='
posn
the position of the token of interest, 0 based;
e.g. 1
Outputs
Returns text
Example usage
test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
dblink_strtok
---------------
3
(1 row)
==================================================================
Name
dblink_get_pkey -- returns the field names of a relation's primary
key fields
Synopsis
dblink_get_pkey(text relname) RETURNS setof text
Inputs
relname
any relation name;
e.g. 'foobar'
Outputs
Returns setof text -- one row for each primary key field, in order of
precedence
Example usage Not installed by default
deprecated
test=# select dblink_get_pkey('foobar'); ------------
dblink_get_pkey dblink(text,text) RETURNS setof int
----------------- - *DEPRECATED* returns a resource id for results from remote query
f1 (Note: must uncomment in dblink.sql to use)
f2 dblink_tok(int,int) RETURNS text
f3 - *DEPRECATED* extracts and returns individual field results; used
f4 only in conjunction with the *DEPRECATED* form of dblink
f5 (Note: must uncomment in dblink.sql to use)
(5 rows) dblink_last_oid(int) RETURNS oid
- *DEPRECATED* returns the last inserted oid
==================================================================
Name
dblink_last_oid -- Returns last inserted oid
Synopsis
dblink_last_oid(int res_id) RETURNS oid
Inputs
res_id
any resource id returned by dblink function;
Outputs
Returns oid of last inserted tuple
Example usage
test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'insert into mytable (f1, f2) values (1,2)'));
dblink_last_oid
----------------
16553
(1 row)
==================================================================
Name
dblink_build_sql_insert -- builds an insert statement using a local
tuple, replacing the selection key field
values with alternate supplied values
dblink_build_sql_delete -- builds a delete statement using supplied
values for selection key field values
dblink_build_sql_update -- builds an update statement using a local
tuple, replacing the selection key field
values with alternate supplied values
Synopsis
dblink_build_sql_insert(text relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text src_pk_att_vals_array
,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_delete(text relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_update(text relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text src_pk_att_vals_array
,_text tgt_pk_att_vals_array) RETURNS text
Inputs
relname
any relation name;
e.g. 'foobar'
primary_key_attnums
vector of primary key attnums (1 based, see pg_index.indkey);
e.g. '1 2'
num_primary_key_atts
number of primary key attnums in the vector; e.g. 2
src_pk_att_vals_array
array of primary key values, used to look up the local matching
tuple, the values of which are then used to construct the SQL
statement
tgt_pk_att_vals_array
array of primary key values, used to replace the local tuple
values in the SQL statement
Outputs
Returns text -- requested SQL statement
Example usage
test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
dblink_build_sql_insert
--------------------------------------------------
INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
(1 row)
test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
dblink_build_sql_delete
---------------------------------------------
DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
(1 row)
test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
dblink_build_sql_update
-------------------------------------------------------------
UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
(1 row)
==================================================================
Name
dblink_current_query -- returns the current query string
Synopsis
dblink_current_query () RETURNS text
Inputs
None
Outputs
Returns text -- a copy of the currently executing query
Example usage
test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
dblink_current_query
-----------------------------------------------------------------------------------------------------------------------------------------------------
select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
(1 row)
==================================================================
Name
dblink_replace -- replace all occurences of substring-a in the
input-string with substring-b
Synopsis
dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
Inputs
input-string
the starting string, before replacement of substring-a
substring-a
the substring to find and replace
substring-b
the substring to be substituted in place of substring-a
Outputs
Returns text -- a copy of the starting string, but with all occurences of
substring-a replaced with substring-b
Example usage Documentation:
test=# select dblink_replace('12345678901234567890','56','hello'); See the following files:
dblink_replace doc/connection
---------------------------- doc/cursor
1234hello78901234hello7890 doc/query
(1 row) doc/execute
doc/misc
doc/deprecated
================================================================== ==================================================================
-- Joe Conway -- Joe Conway
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
* *
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, * Joe Conway <mail@joeconway.com>
*
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -25,16 +27,39 @@ ...@@ -25,16 +27,39 @@
* *
*/ */
#include "dblink.h" #include <string.h>
#include "postgres.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "fmgr.h"
#include "funcapi.h"
#include "access/tupdesc.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/namespace.h"
#include "catalog/pg_index.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "dblink.h"
/* /*
* Internal declarations * Internal declarations
*/ */
static dblink_results *init_dblink_results(MemoryContext fn_mcxt); static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
static char **get_pkey_attnames(Oid relid, int16 *numatts); static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
...@@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr); ...@@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text); static Oid get_relid_from_relname(text *relname_text);
static dblink_results *get_res_ptr(int32 res_id_index); static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results *results); static void append_res_ptr(dblink_results *results);
static void remove_res_ptr(dblink_results *results); static void remove_res_ptr(dblink_results *results);
static TupleDesc pgresultGetTupleDesc(PGresult *res);
/* Global */ /* Global */
List *res_id = NIL; List *res_id = NIL;
int res_id_index = 0; int res_id_index = 0;
PGconn *persistent_conn = NULL;
#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
#define xpfree(var_) \
do { \
if (var_ != NULL) \
{ \
pfree(var_); \
var_ = NULL; \
} \
} while (0)
/*
* Create a persistent connection to another database
*/
PG_FUNCTION_INFO_V1(dblink_connect);
Datum
dblink_connect(PG_FUNCTION_ARGS)
{
char *connstr = GET_STR(PG_GETARG_TEXT_P(0));
char *msg;
text *result_text;
MemoryContext oldcontext;
if (persistent_conn != NULL)
PQfinish(persistent_conn);
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
persistent_conn = PQconnectdb(connstr);
MemoryContextSwitchTo(oldcontext);
if (PQstatus(persistent_conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(persistent_conn));
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_connect: connection error: %s", msg);
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
PG_RETURN_TEXT_P(result_text);
}
/*
* Clear a persistent connection to another database
*/
PG_FUNCTION_INFO_V1(dblink_disconnect);
Datum
dblink_disconnect(PG_FUNCTION_ARGS)
{
text *result_text;
if (persistent_conn != NULL)
PQfinish(persistent_conn);
persistent_conn = NULL;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
PG_RETURN_TEXT_P(result_text);
}
/*
* opens a cursor using a persistent connection
*/
PG_FUNCTION_INFO_V1(dblink_open);
Datum
dblink_open(PG_FUNCTION_ARGS)
{
char *msg;
PGresult *res = NULL;
PGconn *conn = NULL;
text *result_text;
char *curname = GET_STR(PG_GETARG_TEXT_P(0));
char *sql = GET_STR(PG_GETARG_TEXT_P(1));
StringInfo str = makeStringInfo();
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_open: no connection available");
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
persistent_conn = NULL;
elog(ERROR, "dblink_open: begin error: %s", msg);
}
PQclear(res);
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
res = PQexec(conn, str->data);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
persistent_conn = NULL;
elog(ERROR, "dblink: sql error: %s", msg);
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
PG_RETURN_TEXT_P(result_text);
}
/*
* closes a cursor
*/
PG_FUNCTION_INFO_V1(dblink_close);
Datum
dblink_close(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
char *curname = GET_STR(PG_GETARG_TEXT_P(0));
StringInfo str = makeStringInfo();
text *result_text;
char *msg;
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_close: no connection available");
appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
/* close the cursor */
res = PQexec(conn, str->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_close: sql error: %s", msg);
}
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_close: commit error: %s", msg);
}
PQclear(res);
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
PG_RETURN_TEXT_P(result_text);
}
/*
* Fetch results from an open cursor
*/
PG_FUNCTION_INFO_V1(dblink_fetch);
Datum
dblink_fetch(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
int call_cntr;
int max_calls;
TupleTableSlot *slot;
AttInMetadata *attinmeta;
char *msg;
PGresult *res = NULL;
MemoryContext oldcontext;
/* stuff done only on the first call of the function */
if(SRF_IS_FIRSTCALL())
{
Oid functypeid;
char functyptype;
Oid funcid = fcinfo->flinfo->fn_oid;
PGconn *conn = NULL;
StringInfo str = makeStringInfo();
char *curname = GET_STR(PG_GETARG_TEXT_P(0));
int howmany = PG_GETARG_INT32(1);
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_fetch: no connection available");
appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));
res = PQexec(conn, str->data);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_fetch: sql error: %s", msg);
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* cursor does not exist - closed already or bad name */
PQclear(res);
elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
}
funcctx->max_calls = PQntuples(res);
/* got results, keep track of them */
funcctx->user_fctx = res;
/* fast track when no results */
if (funcctx->max_calls < 1)
SRF_RETURN_DONE(funcctx);
/* check typtype to see if we have a predetermined return type */
functypeid = get_func_rettype(funcid);
functyptype = get_typtype(functypeid);
if (functyptype == 'c')
tupdesc = TypeGetTupleDesc(functypeid, NIL);
else if (functyptype == 'p' && functypeid == RECORDOID)
tupdesc = pgresultGetTupleDesc(res);
else if (functyptype == 'b')
elog(ERROR, "dblink_fetch: invalid kind of return type specified for function");
else
elog(ERROR, "dblink_fetch: unknown kind of return type specified for function");
/* store needed metadata for subsequent calls */
slot = TupleDescGetSlot(tupdesc);
funcctx->slot = slot;
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
/*
* initialize per-call variables
*/
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
slot = funcctx->slot;
res = (PGresult *) funcctx->user_fctx;
attinmeta = funcctx->attinmeta;
tupdesc = attinmeta->tupdesc;
if (call_cntr < max_calls) /* do when there is more left to send */
{
char **values;
HeapTuple tuple;
Datum result;
int i;
int nfields = PQnfields(res);
values = (char **) palloc(nfields * sizeof(char *));
for (i = 0; i < nfields; i++)
{
if (PQgetisnull(res, call_cntr, i) == 0)
values[i] = PQgetvalue(res, call_cntr, i);
else
values[i] = NULL;
}
/* build the tuple */
tuple = BuildTupleFromCStrings(attinmeta, values);
/* make the tuple into a datum */
result = TupleGetDatum(slot, tuple);
SRF_RETURN_NEXT(funcctx, result);
}
else /* do when there is no more left */
{
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
/*
* Note: this is the new preferred version of dblink
*/
PG_FUNCTION_INFO_V1(dblink_record);
Datum
dblink_record(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
int call_cntr;
int max_calls;
TupleTableSlot *slot;
AttInMetadata *attinmeta;
char *msg;
PGresult *res = NULL;
bool is_sql_cmd = false;
char *sql_cmd_status = NULL;
MemoryContext oldcontext;
/* stuff done only on the first call of the function */
if(SRF_IS_FIRSTCALL())
{
Oid functypeid;
char functyptype;
Oid funcid = fcinfo->flinfo->fn_oid;
PGconn *conn = NULL;
char *connstr = NULL;
char *sql = NULL;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (fcinfo->nargs == 2)
{
connstr = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
}
else if (fcinfo->nargs == 1)
{
sql = GET_STR(PG_GETARG_TEXT_P(0));
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink: no connection available");
}
else
elog(ERROR, "dblink: wrong number of arguments");
res = PQexec(conn, sql);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
if (fcinfo->nargs == 1)
persistent_conn = NULL;
elog(ERROR, "dblink: sql error: %s", msg);
}
else
{
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0, false);
/*
* and save a copy of the command status string to return
* as our result tuple
*/
sql_cmd_status = PQcmdStatus(res);
funcctx->max_calls = 1;
}
else
funcctx->max_calls = PQntuples(res);
/* got results, keep track of them */
funcctx->user_fctx = res;
/* if needed, close the connection to the database and cleanup */
if (fcinfo->nargs == 2)
PQfinish(conn);
}
/* fast track when no results */
if (funcctx->max_calls < 1)
SRF_RETURN_DONE(funcctx);
/* check typtype to see if we have a predetermined return type */
functypeid = get_func_rettype(funcid);
functyptype = get_typtype(functypeid);
if (!is_sql_cmd)
{
if (functyptype == 'c')
tupdesc = TypeGetTupleDesc(functypeid, NIL);
else if (functyptype == 'p' && functypeid == RECORDOID)
tupdesc = pgresultGetTupleDesc(res);
else if (functyptype == 'b')
elog(ERROR, "Invalid kind of return type specified for function");
else
elog(ERROR, "Unknown kind of return type specified for function");
}
/* store needed metadata for subsequent calls */
slot = TupleDescGetSlot(tupdesc);
funcctx->slot = slot;
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
/*
* initialize per-call variables
*/
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
slot = funcctx->slot;
res = (PGresult *) funcctx->user_fctx;
attinmeta = funcctx->attinmeta;
tupdesc = attinmeta->tupdesc;
if (call_cntr < max_calls) /* do when there is more left to send */
{
char **values;
HeapTuple tuple;
Datum result;
if (!is_sql_cmd)
{
int i;
int nfields = PQnfields(res);
values = (char **) palloc(nfields * sizeof(char *));
for (i = 0; i < nfields; i++)
{
if (PQgetisnull(res, call_cntr, i) == 0)
values[i] = PQgetvalue(res, call_cntr, i);
else
values[i] = NULL;
}
}
else
{
values = (char **) palloc(1 * sizeof(char *));
values[0] = sql_cmd_status;
}
/* build the tuple */
tuple = BuildTupleFromCStrings(attinmeta, values);
/* make the tuple into a datum */
result = TupleGetDatum(slot, tuple);
SRF_RETURN_NEXT(funcctx, result);
}
else /* do when there is no more left */
{
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
/*
* Execute an SQL non-SELECT command
*/
PG_FUNCTION_INFO_V1(dblink_exec);
Datum
dblink_exec(PG_FUNCTION_ARGS)
{
char *msg;
PGresult *res = NULL;
char *sql_cmd_status = NULL;
TupleDesc tupdesc = NULL;
text *result_text;
PGconn *conn = NULL;
char *connstr = NULL;
char *sql = NULL;
if (fcinfo->nargs == 2)
{
connstr = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink_exec: connection error: %s", msg);
}
}
else if (fcinfo->nargs == 1)
{
sql = GET_STR(PG_GETARG_TEXT_P(0));
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_exec: no connection available");
}
else
elog(ERROR, "dblink_exec: wrong number of arguments");
res = PQexec(conn, sql);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
if (fcinfo->nargs == 1)
persistent_conn = NULL;
elog(ERROR, "dblink_exec: sql error: %s", msg);
}
else
{
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0, false);
/*
* and save a copy of the command status string to return
* as our result tuple
*/
sql_cmd_status = PQcmdStatus(res);
}
else
elog(ERROR, "dblink_exec: queries returning results not allowed");
}
PQclear(res);
/* if needed, close the connection to the database and cleanup */
if (fcinfo->nargs == 2)
PQfinish(conn);
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
PG_RETURN_TEXT_P(result_text);
}
/*
* Note: this original version of dblink is DEPRECATED;
* it *will* be removed in favor of the new version on next release
*/
PG_FUNCTION_INFO_V1(dblink); PG_FUNCTION_INFO_V1(dblink);
Datum Datum
dblink(PG_FUNCTION_ARGS) dblink(PG_FUNCTION_ARGS)
...@@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS) ...@@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS)
PG_RETURN_NULL(); PG_RETURN_NULL();
} }
/* /*
* Note: dblink_tok is DEPRECATED;
* it *will* be removed in favor of the new version on next release
*
* dblink_tok * dblink_tok
* parse dblink output string * parse dblink output string
* return fldnum item (0 based) * return fldnum item (0 based)
* based on provided field separator * based on provided field separator
*/ */
PG_FUNCTION_INFO_V1(dblink_tok); PG_FUNCTION_INFO_V1(dblink_tok);
Datum Datum
dblink_tok(PG_FUNCTION_ARGS) dblink_tok(PG_FUNCTION_ARGS)
...@@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS) ...@@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS)
} }
} }
/*
* dblink_strtok
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_strtok);
Datum
dblink_strtok(PG_FUNCTION_ARGS)
{
char *fldtext;
char *fldsep;
int fldnum;
char *buffer;
text *result_text;
fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
fldnum = PG_GETARG_INT32(2);
if (fldtext[0] == '\0')
{
elog(ERROR, "get_strtok: blank list not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
buffer = get_strtok(fldtext, fldsep, fldnum);
pfree(fldtext);
pfree(fldsep);
if (buffer == NULL)
{
PG_RETURN_NULL();
}
else
{
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
pfree(buffer);
PG_RETURN_TEXT_P(result_text);
}
}
/* /*
* dblink_get_pkey * dblink_get_pkey
* *
* Return comma delimited list of primary key * Return list of primary key fields for the supplied relation,
* fields for the supplied relation,
* or NULL if none exists. * or NULL if none exists.
*/ */
PG_FUNCTION_INFO_V1(dblink_get_pkey); PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum Datum
dblink_get_pkey(PG_FUNCTION_ARGS) dblink_get_pkey(PG_FUNCTION_ARGS)
{ {
text *relname_text; int16 numatts;
Oid relid; Oid relid;
char **result; char **results;
text *result_text; FuncCallContext *funcctx;
int16 numatts; int32 call_cntr;
ReturnSetInfo *rsi; int32 max_calls;
dblink_array_results *ret_set; TupleTableSlot *slot;
AttInMetadata *attinmeta;
MemoryContext oldcontext;
/* stuff done only on the first call of the function */
if(SRF_IS_FIRSTCALL())
{
TupleDesc tupdesc = NULL;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* convert relname to rel Oid */
relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation does not exist");
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) /* need a tuple descriptor representing one INT and one TEXT column */
elog(ERROR, "dblink: function called in context that does not accept a set result"); tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
INT4OID, -1, 0, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
TEXTOID, -1, 0, false);
if (fcinfo->flinfo->fn_extra == NULL) /* allocate a slot for a tuple with this tupdesc */
{ slot = TupleDescGetSlot(tupdesc);
relname_text = PG_GETARG_TEXT_P(0);
/* /* assign slot to function context */
* Convert relname to rel OID. funcctx->slot = slot;
*/
relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation does not exist");
/* /*
* get an array of attnums. * Generate attribute metadata needed later to produce tuples from raw
* C strings
*/ */
result = get_pkey_attnames(relid, &numatts); attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
/* get an array of attnums */
results = get_pkey_attnames(relid, &numatts);
if ((result != NULL) && (numatts > 0)) if ((results != NULL) && (numatts > 0))
{ {
ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); funcctx->max_calls = numatts;
ret_set->elem_num = 0; /* got results, keep track of them */
ret_set->num_elems = numatts; funcctx->user_fctx = results;
ret_set->res = result; }
else /* fast track when no results */
SRF_RETURN_DONE(funcctx);
fcinfo->flinfo->fn_extra = (void *) ret_set; MemoryContextSwitchTo(oldcontext);
}
rsi = (ReturnSetInfo *) fcinfo->resultinfo; /* stuff done on every call of the function */
rsi->isDone = ExprMultipleResult; funcctx = SRF_PERCALL_SETUP();
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); /*
* initialize per-call variables
*/
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
PG_RETURN_TEXT_P(result_text); slot = funcctx->slot;
}
else
{
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL(); results = (char **) funcctx->user_fctx;
} attinmeta = funcctx->attinmeta;
}
else
{
/*
* check for more results
*/
ret_set = fcinfo->flinfo->fn_extra;
ret_set->elem_num++;
result = ret_set->res;
if (ret_set->elem_num < ret_set->num_elems) if (call_cntr < max_calls) /* do when there is more left to send */
{ {
/* char **values;
* fetch next one HeapTuple tuple;
*/ Datum result;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); values = (char **) palloc(2 * sizeof(char *));
PG_RETURN_TEXT_P(result_text); values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
}
else
{
int i;
/* sprintf(values[0], "%d", call_cntr + 1);
* or if no more, clean things up
*/
for (i = 0; i < ret_set->num_elems; i++)
pfree(result[i]);
pfree(ret_set->res); values[1] = results[call_cntr];
pfree(ret_set);
rsi = (ReturnSetInfo *) fcinfo->resultinfo; /* build the tuple */
rsi->isDone = ExprEndResult; tuple = BuildTupleFromCStrings(attinmeta, values);
PG_RETURN_NULL(); /* make the tuple into a datum */
} result = TupleGetDatum(slot, tuple);
SRF_RETURN_NEXT(funcctx, result);
} }
PG_RETURN_NULL(); else /* do when there is no more left */
SRF_RETURN_DONE(funcctx);
} }
/* /*
* Note: dblink_last_oid is DEPRECATED;
* it *will* be removed on next release
*
* dblink_last_oid * dblink_last_oid
* return last inserted oid * return last inserted oid
*/ */
...@@ -447,23 +1011,26 @@ Datum ...@@ -447,23 +1011,26 @@ Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS) dblink_build_sql_insert(PG_FUNCTION_ARGS)
{ {
Oid relid; Oid relid;
text *relname_text; text *relname_text;
int16 *pkattnums; int16 *pkattnums;
int16 pknumatts; int16 pknumatts;
char **src_pkattvals; char **src_pkattvals;
char **tgt_pkattvals; char **tgt_pkattvals;
ArrayType *src_pkattvals_arry; ArrayType *src_pkattvals_arry;
ArrayType *tgt_pkattvals_arry; ArrayType *tgt_pkattvals_arry;
int src_ndim; int src_ndim;
int *src_dim; int *src_dim;
int src_nitems; int src_nitems;
int tgt_ndim; int tgt_ndim;
int *tgt_dim; int *tgt_dim;
int tgt_nitems; int tgt_nitems;
int i; int i;
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text; text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
relname_text = PG_GETARG_TEXT_P(0); relname_text = PG_GETARG_TEXT_P(0);
...@@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) ...@@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
* get array of pointers to c-strings from the input source array * get array of pointers to c-strings from the input source array
*/ */
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
&typlen, &typbyval, &typalign);
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry); ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++) for (i = 0; i < src_nitems; i++)
{ {
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
ptr = (char *) att_align(ptr, typalign);
} }
/* /*
...@@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) ...@@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
* get array of pointers to c-strings from the input target array * get array of pointers to c-strings from the input target array
*/ */
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
&typlen, &typbyval, &typalign);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry); ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++) for (i = 0; i < tgt_nitems; i++)
{ {
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
ptr = (char *) att_align(ptr, typalign);
} }
/* /*
...@@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) ...@@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text; text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
relname_text = PG_GETARG_TEXT_P(0); relname_text = PG_GETARG_TEXT_P(0);
...@@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) ...@@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
* get array of pointers to c-strings from the input target array * get array of pointers to c-strings from the input target array
*/ */
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
&typlen, &typbyval, &typalign);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry); ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++) for (i = 0; i < tgt_nitems; i++)
{ {
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
ptr = (char *) att_align(ptr, typalign);
} }
/* /*
...@@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text; text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
relname_text = PG_GETARG_TEXT_P(0); relname_text = PG_GETARG_TEXT_P(0);
...@@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
* get array of pointers to c-strings from the input source array * get array of pointers to c-strings from the input source array
*/ */
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
&typlen, &typbyval, &typalign);
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry); ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++) for (i = 0; i < src_nitems; i++)
{ {
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
ptr = (char *) att_align(ptr, typalign);
} }
/* /*
...@@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
* get array of pointers to c-strings from the input target array * get array of pointers to c-strings from the input target array
*/ */
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
&typlen, &typbyval, &typalign);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry); ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++) for (i = 0; i < tgt_nitems; i++)
{ {
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
ptr = (char *) att_align(ptr, typalign);
} }
/* /*
...@@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(sql_text); PG_RETURN_TEXT_P(sql_text);
} }
/* /*
* dblink_current_query * dblink_current_query
* return the current query string * return the current query string
...@@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS) ...@@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS)
} }
/*
* dblink_replace_text
* replace all occurences of 'old_sub_str' in 'orig_str'
* with 'new_sub_str' to form 'new_str'
*
* returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
* otherwise returns 'new_str'
*/
PG_FUNCTION_INFO_V1(dblink_replace_text);
Datum
dblink_replace_text(PG_FUNCTION_ARGS)
{
text *left_text;
text *right_text;
text *buf_text;
text *ret_text;
char *ret_str;
int curr_posn;
text *src_text = PG_GETARG_TEXT_P(0);
int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
text *from_sub_text = PG_GETARG_TEXT_P(1);
int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
text *to_sub_text = PG_GETARG_TEXT_P(2);
char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
StringInfo str = makeStringInfo();
if (src_text_len == 0 || from_sub_text_len == 0)
PG_RETURN_TEXT_P(src_text);
buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
while (curr_posn > 0)
{
left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
appendStringInfo(str, "%s",
DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
appendStringInfo(str, "%s", to_sub_str);
pfree(buf_text);
pfree(left_text);
buf_text = right_text;
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
}
appendStringInfo(str, "%s",
DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
pfree(buf_text);
ret_str = pstrdup(str->data);
ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
PG_RETURN_TEXT_P(ret_text);
}
/************************************************************* /*************************************************************
* internal functions * internal functions
*/ */
...@@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt) ...@@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt)
return retval; return retval;
} }
/*
* init_dblink_array_results
* - create an empty dblink_array_results data structure
*/
static dblink_array_results *
init_dblink_array_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_array_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
MemSet(retval, 0, sizeof(dblink_array_results));
retval->elem_num = -1;
retval->num_elems = 0;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/* /*
* get_pkey_attnames * get_pkey_attnames
* *
...@@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts)
Relation rel; Relation rel;
TupleDesc tupdesc; TupleDesc tupdesc;
/* /* open relation using relid, get tupdesc */
* Open relation using relid, get tupdesc
*/
rel = relation_open(relid, AccessShareLock); rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att; tupdesc = rel->rd_att;
/* /* initialize numatts to 0 in case no primary key exists */
* Initialize numatts to 0 in case no primary key
* exists
*/
*numatts = 0; *numatts = 0;
/* /* use relid to get all related indexes */
* Use relid to get all related indexes
*/
indexRelation = heap_openr(IndexRelationName, AccessShareLock); indexRelation = heap_openr(IndexRelationName, AccessShareLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
F_OIDEQ, ObjectIdGetDatum(relid)); F_OIDEQ, ObjectIdGetDatum(relid));
...@@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
{ {
Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
/* /* we're only interested if it is the primary key */
* We're only interested if it is the primary key
*/
if (index->indisprimary == TRUE) if (index->indisprimary == TRUE)
{ {
i = 0; i = 0;
...@@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
if (*numatts > 0) if (*numatts > 0)
{ {
result = (char **) palloc(*numatts * sizeof(char *)); result = (char **) palloc(*numatts * sizeof(char *));
for (i = 0; i < *numatts; i++) for (i = 0; i < *numatts; i++)
result[i] = SPI_fname(tupdesc, index->indkey[i]); result[i] = SPI_fname(tupdesc, index->indkey[i]);
} }
...@@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
return result; return result;
} }
/*
* get_strtok
*
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
static char *
get_strtok(char *fldtext, char *fldsep, int fldnum)
{
int j = 0;
char *result;
if (fldnum < 0)
{
elog(ERROR, "get_strtok: field number < 0 not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
result = strtok(fldtext, fldsep);
for (j = 1; j < fldnum + 1; j++)
{
result = strtok(NULL, fldsep);
if (result == NULL)
return NULL;
}
return pstrdup(result);
}
static char * static char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{ {
...@@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval ...@@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
natts = tupdesc->natts; natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
if (!tuple)
elog(ERROR, "dblink_build_sql_insert: row not found");
appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
...@@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval ...@@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
natts = tupdesc->natts; natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
if (!tuple)
elog(ERROR, "dblink_build_sql_update: row not found");
appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
...@@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p ...@@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
*/ */
rel = relation_open(relid, AccessShareLock); rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel); relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att; tupdesc = CreateTupleDescCopy(rel->rd_att);
relation_close(rel, AccessShareLock);
/* /*
* Connect to SPI manager * Connect to SPI manager
...@@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p ...@@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
static Oid static Oid
get_relid_from_relname(text *relname_text) get_relid_from_relname(text *relname_text)
{ {
#ifdef NamespaceRelationName
RangeVar *relvar; RangeVar *relvar;
Relation rel; Relation rel;
Oid relid; Oid relid;
...@@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text) ...@@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text)
rel = heap_openrv(relvar, AccessShareLock); rel = heap_openrv(relvar, AccessShareLock);
relid = RelationGetRelid(rel); relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock); relation_close(rel, AccessShareLock);
#else
char *relname;
Relation rel;
Oid relid;
relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
rel = relation_openr(relname, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
#endif /* NamespaceRelationName */
return relid; return relid;
} }
...@@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results) ...@@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results)
res_id_index = 0; res_id_index = 0;
} }
static TupleDesc
pgresultGetTupleDesc(PGresult *res)
{
int natts;
AttrNumber attnum;
TupleDesc desc;
char *attname;
int32 atttypmod;
int attdim;
bool attisset;
Oid atttypid;
int i;
/*
* allocate a new tuple descriptor
*/
natts = PQnfields(res);
if (natts < 1)
elog(ERROR, "cannot create a description for empty results");
desc = CreateTemplateTupleDesc(natts, WITHOUTOID);
attnum = 0;
for (i = 0; i < natts; i++)
{
/*
* for each field, get the name and type information from the query
* result and have TupleDescInitEntry fill in the attribute
* information we need.
*/
attnum++;
attname = PQfname(res, i);
atttypid = PQftype(res, i);
atttypmod = PQfmod(res, i);
if (PQfsize(res, i) != get_typlen(atttypid))
elog(ERROR, "Size of remote field \"%s\" does not match size "
"of local type \"%s\"",
attname,
format_type_with_typemod(atttypid, atttypmod));
attdim = 0;
attisset = false;
TupleDescInitEntry(desc, attnum, attname, atttypid,
atttypmod, attdim, attisset);
}
return desc;
}
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
* *
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, * Joe Conway <mail@joeconway.com>
*
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -28,38 +30,6 @@ ...@@ -28,38 +30,6 @@
#ifndef DBLINK_H #ifndef DBLINK_H
#define DBLINK_H #define DBLINK_H
#include <string.h>
#include "postgres.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "fmgr.h"
#include "access/tupdesc.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/pg_index.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/array.h"
#include "utils/syscache.h"
#ifdef NamespaceRelationName
#include "catalog/namespace.h"
#endif /* NamespaceRelationName */
/*
* Max SQL statement size
*/
#define DBLINK_MAX_SQLSTATE_SIZE 16384
/* /*
* This struct holds the results of the remote query. * This struct holds the results of the remote query.
* Use fn_extra to hold a pointer to it across calls * Use fn_extra to hold a pointer to it across calls
...@@ -82,43 +52,27 @@ typedef struct ...@@ -82,43 +52,27 @@ typedef struct
PGresult *res; PGresult *res;
} dblink_results; } dblink_results;
/*
* This struct holds results in the form of an array.
* Use fn_extra to hold a pointer to it across calls
*/
typedef struct
{
/*
* elem being accessed
*/
int elem_num;
/*
* number of elems
*/
int num_elems;
/*
* the actual array
*/
void *res;
} dblink_array_results;
/* /*
* External declarations * External declarations
*/ */
/* deprecated */
extern Datum dblink(PG_FUNCTION_ARGS); extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS); extern Datum dblink_tok(PG_FUNCTION_ARGS);
extern Datum dblink_strtok(PG_FUNCTION_ARGS);
/* supported */
extern Datum dblink_connect(PG_FUNCTION_ARGS);
extern Datum dblink_disconnect(PG_FUNCTION_ARGS);
extern Datum dblink_open(PG_FUNCTION_ARGS);
extern Datum dblink_close(PG_FUNCTION_ARGS);
extern Datum dblink_fetch(PG_FUNCTION_ARGS);
extern Datum dblink_record(PG_FUNCTION_ARGS);
extern Datum dblink_exec(PG_FUNCTION_ARGS);
extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
extern Datum dblink_last_oid(PG_FUNCTION_ARGS); extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
extern Datum dblink_current_query(PG_FUNCTION_ARGS); extern Datum dblink_current_query(PG_FUNCTION_ARGS);
extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
extern char *debug_query_string; extern char *debug_query_string;
......
CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int -- Uncomment the following 9 lines to use original DEPRECATED functions
AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' --CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
-- AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
-- WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
-- AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
-- WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
-- AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
-- WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text
AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text
AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text
AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'c'
WITH (iscachable, isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof text CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record
AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text
AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'c'
WITH (isstrict);
-- Note: if this is a first time install of dblink, the following DROP
-- FUNCTION line is expected to fail.
-- Comment out the following 4 lines if the DEPRECATED functions are used.
DROP FUNCTION dblink (text,text);
CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record
AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink (text) RETURNS setof record
AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_exec (text,text) RETURNS text
AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_exec (text) RETURNS text
AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid CREATE TYPE dblink_pkey_results AS (position int4, colname text);
AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof dblink_pkey_results
AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text
...@@ -32,7 +69,3 @@ CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _tex ...@@ -32,7 +69,3 @@ CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _tex
CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c'; AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
WITH (iscachable, isstrict);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment