Commit 3bf6b8f0 authored by Bruce Momjian's avatar Bruce Momjian

Attached is an update to contrib/dblink. Please apply if there are no

objections.

Major changes:
   - removed cursor wrap around input sql to allow for remote
     execution of INSERT/UPDATE/DELETE
   - dblink now returns a resource id instead of a real pointer
   - added several utility functions

I'm still hoping to add explicit cursor open/fetch/close support before
7.3 is released, but I need a bit more time on that.

On a somewhat unrelated topic, I never got any feedback on the
unknownin/out patch and the mb_substring patch. Is there anything else I
need to do to get those applied?

Joe Conway
parent 30571b54
......@@ -3,7 +3,8 @@
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
......@@ -25,12 +26,19 @@
*/
Version 0.3 (14 June, 2001):
Function to test returning data set from remote database
Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
Version 0.4 (7 April, 2002):
Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
various utility functions.
Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
Release Notes:
Version 0.4
- removed cursor wrap around input sql to allow for remote
execution of INSERT/UPDATE/DELETE
- dblink now returns a resource id instead of a real pointer
- added several utility functions -- see below
Version 0.3
- fixed dblink invalid pointer causing corrupt elog message
- fixed dblink_tok improper handling of null results
......@@ -51,14 +59,36 @@ Installation:
installs following functions into database template1:
dblink() - returns a pointer to results from remote query
dblink_tok() - extracts and returns individual field results
dblink(text,text) RETURNS setof int
- returns a resource id for results from remote query
dblink_tok(int,int) RETURNS text
- extracts and returns individual field results
dblink_strtok(text,text,int) RETURNS text
- extracts and returns individual token from delimited text
dblink_get_pkey(name) RETURNS setof text
- returns the field names of a relation's primary key fields
dblink_last_oid(int) RETURNS oid
- returns the last inserted oid
dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
- builds an insert statement using a local tuple, replacing the
selection key field values with alternate supplied values
dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
- builds a delete statement using supplied values for selection
key field values
dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
- builds an update statement using a local tuple, replacing the
selection key field values with alternate supplied values
dblink_current_query() RETURNS text
- returns the current query string
dblink_replace(text,text,text) RETURNS text
- replace all occurences of substring-a in the input-string
with substring-b
Documentation
==================================================================
Name
dblink -- Returns a pointer to a data set from a remote database
dblink -- Returns a resource id for a data set from a remote database
Synopsis
......@@ -78,7 +108,7 @@ Inputs
Outputs
Returns setof int (pointer)
Returns setof int (res_id)
Example usage
......@@ -94,13 +124,13 @@ dblink_tok -- Returns individual select field results from a dblink remote query
Synopsis
dblink_tok(int pointer, int fnumber)
dblink_tok(int res_id, int fnumber)
Inputs
pointer
res_id
a pointer returned by a call to dblink()
a resource id returned by a call to dblink()
fnumber
......@@ -131,6 +161,255 @@ Then you can simply write:
select f1, f2 from myremotetable where f1 like 'bytea%';
==================================================================
Name
dblink_strtok -- Extracts and returns individual token from delimited text
Synopsis
dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
Inputs
inputstring
any string you want to parse a token out of;
e.g. 'f=1&g=3&h=4'
delimiter
a single character to use as the delimiter;
e.g. '&' or '='
posn
the position of the token of interest, 0 based;
e.g. 1
Outputs
Returns text
Example usage
test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
dblink_strtok
---------------
3
(1 row)
==================================================================
Name
dblink_get_pkey -- returns the field names of a relation's primary
key fields
Synopsis
dblink_get_pkey(name relname) RETURNS setof text
Inputs
relname
any relation name;
e.g. 'foobar'
Outputs
Returns setof text -- one row for each primary key field, in order of
precedence
Example usage
test=# select dblink_get_pkey('foobar');
dblink_get_pkey
-----------------
f1
f2
f3
f4
f5
(5 rows)
==================================================================
Name
dblink_last_oid -- Returns last inserted oid
Synopsis
dblink_last_oid(int res_id) RETURNS oid
Inputs
res_id
any resource id returned by dblink function;
Outputs
Returns oid of last inserted tuple
Example usage
test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'insert into mytable (f1, f2) values (1,2)'));
dblink_last_oid
----------------
16553
(1 row)
==================================================================
Name
dblink_build_sql_insert -- builds an insert statement using a local
tuple, replacing the selection key field
values with alternate supplied values
dblink_build_sql_delete -- builds a delete statement using supplied
values for selection key field values
dblink_build_sql_update -- builds an update statement using a local
tuple, replacing the selection key field
values with alternate supplied values
Synopsis
dblink_build_sql_insert(name relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text src_pk_att_vals_array
,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_delete(name relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_update(name relname
,int2vector primary_key_attnums
,int2 num_primary_key_atts
,_text src_pk_att_vals_array
,_text tgt_pk_att_vals_array) RETURNS text
Inputs
relname
any relation name;
e.g. 'foobar'
primary_key_attnums
vector of primary key attnums (1 based, see pg_index.indkey);
e.g. '1 2'
num_primary_key_atts
number of primary key attnums in the vector; e.g. 2
src_pk_att_vals_array
array of primary key values, used to look up the local matching
tuple, the values of which are then used to construct the SQL
statement
tgt_pk_att_vals_array
array of primary key values, used to replace the local tuple
values in the SQL statement
Outputs
Returns text -- requested SQL statement
Example usage
test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
dblink_build_sql_insert
--------------------------------------------------
INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
(1 row)
test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
dblink_build_sql_delete
---------------------------------------------
DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
(1 row)
test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
dblink_build_sql_update
-------------------------------------------------------------
UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
(1 row)
==================================================================
Name
dblink_current_query -- returns the current query string
Synopsis
dblink_current_query () RETURNS text
Inputs
None
Outputs
Returns text -- a copy of the currently executing query
Example usage
test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
dblink_current_query
-----------------------------------------------------------------------------------------------------------------------------------------------------
select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
(1 row)
==================================================================
Name
dblink_replace -- replace all occurences of substring-a in the
input-string with substring-b
Synopsis
dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
Inputs
input-string
the starting string, before replacement of substring-a
substring-a
the substring to find and replace
substring-b
the substring to be substituted in place of substring-a
Outputs
Returns text -- a copy of the starting string, but with all occurences of
substring-a replaced with substring-b
Example usage
test=# select dblink_replace('12345678901234567890','56','hello');
dblink_replace
----------------------------
1234hello78901234hello7890
(1 row)
==================================================================
-- Joe Conway
......@@ -3,7 +3,8 @@
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
......@@ -26,23 +27,23 @@
#include "dblink.h"
/* Global */
List *res_id = NIL;
int res_id_index = 0;
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *curstr = "DECLARE mycursor CURSOR FOR ";
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
elog(ERROR, "dblink: NULL arguments are not permitted");
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
......@@ -61,21 +62,10 @@ dblink(PG_FUNCTION_ARGS)
elog(ERROR, "dblink: connection error: %s", msg);
}
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: begin error: %s", msg);
}
PQclear(res);
execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
execstatement = (char *) palloc(strlen(sqlstatement) + 1);
if (execstatement != NULL)
{
strcpy(execstatement, curstr);
strcat(execstatement, sqlstatement);
strcpy(execstatement, sqlstatement);
strcat(execstatement, "\0");
}
else
......@@ -94,70 +84,36 @@ dblink(PG_FUNCTION_ARGS)
/*
* got results, start fetching them
*/
PQclear(res);
res = PQexec(conn, "FETCH ALL in mycursor");
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
}
ntuples = PQntuples(res);
if (ntuples > 0)
{
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res = res;
res = NULL;
fcinfo->flinfo->fn_extra = (void *) results;
results = NULL;
results = fcinfo->flinfo->fn_extra;
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
}
else
{
/*
* increment resource index
*/
res_id_index++;
PQclear(res);
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res_id_index = res_id_index;
results->res = res;
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/*
* Append node to res_id to hold pointer to results.
* Needed by dblink_tok to access the data
*/
append_res_ptr(results);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/*
* save pointer to results for the next function manager call
*/
fcinfo->flinfo->fn_extra = (void *) results;
/* close the connection to the database and cleanup */
PQfinish(conn);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_NULL();
}
PG_RETURN_INT32(res_id_index);
}
}
else
......@@ -165,9 +121,10 @@ dblink(PG_FUNCTION_ARGS)
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
res_id_index = results->res_id_index;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples)
......@@ -179,18 +136,19 @@ dblink(PG_FUNCTION_ARGS)
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
PG_RETURN_INT32(res_id_index);
}
else
{
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
remove_res_ptr(results);
PQclear(results->res);
pfree(results);
fcinfo->flinfo->fn_extra = NULL;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
......@@ -214,36 +172,37 @@ Datum
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
elog(ERROR, "dblink: NULL arguments are not permitted");
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
results = (dblink_results *) PG_GETARG_POINTER(0);
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
elog(ERROR, "dblink: function called with invalid result pointer");
{
if (res_id != NIL)
{
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
}
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0)
elog(ERROR, "dblink: field number < 0 not permitted");
elog(ERROR, "dblink_tok: field number < 0 not permitted");
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1))
elog(ERROR, "dblink: field number %d does not exist", fldnum);
elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
{
PG_RETURN_NULL();
}
else
{
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
......@@ -259,12 +218,621 @@ dblink_tok(PG_FUNCTION_ARGS)
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
}
}
/*
* dblink_strtok
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_strtok);
Datum
dblink_strtok(PG_FUNCTION_ARGS)
{
char *fldtext;
char *fldsep;
int fldnum;
char *buffer;
text *result_text;
fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
fldnum = PG_GETARG_INT32(2);
if (fldtext[0] == '\0')
{
elog(ERROR, "get_strtok: blank list not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
buffer = get_strtok(fldtext, fldsep, fldnum);
pfree(fldtext);
pfree(fldsep);
if (buffer == NULL)
{
PG_RETURN_NULL();
}
else
{
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
pfree(buffer);
PG_RETURN_TEXT_P(result_text);
}
}
/*
* dblink_get_pkey
*
* Return comma delimited list of primary key
* fields for the supplied relation,
* or NULL if none exists.
*/
PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
char *relname;
Oid relid;
char **result;
text *result_text;
int16 numatts;
ReturnSetInfo *rsi;
dblink_array_results *ret_set;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
if (fcinfo->flinfo->fn_extra == NULL)
{
relname = NameStr(*PG_GETARG_NAME(0));
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
relname);
/*
* get an array of attnums.
*/
result = get_pkey_attnames(relid, &numatts);
if ((result != NULL) && (numatts > 0))
{
ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
ret_set->elem_num = 0;
ret_set->num_elems = numatts;
ret_set->res = result;
fcinfo->flinfo->fn_extra = (void *) ret_set;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
PG_RETURN_TEXT_P(result_text);
}
else
{
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
else
{
/*
* check for more results
*/
ret_set = fcinfo->flinfo->fn_extra;
ret_set->elem_num++;
result = ret_set->res;
if (ret_set->elem_num < ret_set->num_elems)
{
/*
* fetch next one
*/
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
PG_RETURN_TEXT_P(result_text);
}
else
{
int i;
/*
* or if no more, clean things up
*/
for (i = 0; i < ret_set->num_elems; i++)
pfree(result[i]);
pfree(ret_set->res);
pfree(ret_set);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* dblink_last_oid
* return last inserted oid
*/
PG_FUNCTION_INFO_V1(dblink_last_oid);
Datum
dblink_last_oid(PG_FUNCTION_ARGS)
{
dblink_results *results;
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
{
if (res_id != NIL)
{
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
}
PG_RETURN_OID(PQoidValue(results->res));
}
/*
* dblink_build_sql_insert
*
* Used to generate an SQL insert statement
* based on an existing tuple in a local relation.
* This is useful for selectively replicating data
* to another server via dblink.
*
* API:
* <relname> - name of local table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the local tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <src_pkattvals_arry> - text array of key values which will be used
* to identify the local tuple of interest
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely. These are substituted
* for their counterparts in src_pkattvals_arry
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
Oid relid;
char *relname;
int16 *pkattnums;
int16 pknumatts;
char **src_pkattvals;
char **tgt_pkattvals;
ArrayType *src_pkattvals_arry;
ArrayType *tgt_pkattvals_arry;
int src_ndim;
int *src_dim;
int src_nitems;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0));
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be at least one key attribute
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
/*
* Source array is made up of key values that will be used to
* locate the tuple of interest from the local system.
*/
src_ndim = ARR_NDIM(src_pkattvals_arry);
src_dim = ARR_DIMS(src_pkattvals_arry);
src_nitems = ArrayGetNItems(src_ndim, src_dim);
/*
* There should be one source array key value for each key attnum
*/
if (src_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input source array
*/
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_build_sql_delete
*
* Used to generate an SQL delete statement.
* This is useful for selectively replicating a
* delete to another server via dblink.
*
* API:
* <relname> - name of remote table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the remote tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely.
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)
{
Oid relid;
char *relname;
int16 *pkattnums;
int16 pknumatts;
char **tgt_pkattvals;
ArrayType *tgt_pkattvals_arry;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0));
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be at least one key attribute
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_build_sql_update
*
* Used to generate an SQL update statement
* based on an existing tuple in a local relation.
* This is useful for selectively replicating data
* to another server via dblink.
*
* API:
* <relname> - name of local table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the local tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <src_pkattvals_arry> - text array of key values which will be used
* to identify the local tuple of interest
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely. These are substituted
* for their counterparts in src_pkattvals_arry
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_update);
Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)
{
Oid relid;
char *relname;
int16 *pkattnums;
int16 pknumatts;
char **src_pkattvals;
char **tgt_pkattvals;
ArrayType *src_pkattvals_arry;
ArrayType *tgt_pkattvals_arry;
int src_ndim;
int *src_dim;
int src_nitems;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0));
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be one source array key values for each key attnum
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
/*
* Source array is made up of key values that will be used to
* locate the tuple of interest from the local system.
*/
src_ndim = ARR_NDIM(src_pkattvals_arry);
src_dim = ARR_DIMS(src_pkattvals_arry);
src_nitems = ArrayGetNItems(src_ndim, src_dim);
/*
* There should be one source array key value for each key attnum
*/
if (src_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input source array
*/
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_current_query
* return the current query string
* to allow its use in (among other things)
* rewrite rules
*/
PG_FUNCTION_INFO_V1(dblink_current_query);
Datum
dblink_current_query(PG_FUNCTION_ARGS)
{
text *result_text;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
PG_RETURN_TEXT_P(result_text);
}
/*
* dblink_replace_text
* replace all occurences of 'old_sub_str' in 'orig_str'
* with 'new_sub_str' to form 'new_str'
*
* returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
* otherwise returns 'new_str'
*/
PG_FUNCTION_INFO_V1(dblink_replace_text);
Datum
dblink_replace_text(PG_FUNCTION_ARGS)
{
text *left_text;
text *right_text;
text *buf_text;
text *ret_text;
char *ret_str;
int curr_posn;
text *src_text = PG_GETARG_TEXT_P(0);
int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
text *from_sub_text = PG_GETARG_TEXT_P(1);
int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
text *to_sub_text = PG_GETARG_TEXT_P(2);
char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
StringInfo str = makeStringInfo();
if (src_text_len == 0 || from_sub_text_len == 0)
PG_RETURN_TEXT_P(src_text);
buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
while (curr_posn > 0)
{
left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
appendStringInfo(str, to_sub_str);
pfree(buf_text);
pfree(left_text);
buf_text = right_text;
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
}
appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
pfree(buf_text);
ret_str = pstrdup(str->data);
ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
PG_RETURN_TEXT_P(ret_text);
}
/*************************************************************
* internal functions
*/
......@@ -285,9 +853,556 @@ init_dblink_results(MemoryContext fn_mcxt)
MemSet(retval, 0, sizeof(dblink_results));
retval->tup_num = -1;
retval->res_id_index =-1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/*
* init_dblink_array_results
* - create an empty dblink_array_results data structure
*/
dblink_array_results *
init_dblink_array_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_array_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
MemSet(retval, 0, sizeof(dblink_array_results));
retval->elem_num = -1;
retval->num_elems = 0;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/*
* get_pkey_attnames
*
* Get the primary key attnames for the given relation.
* Return NULL, and set numatts = 0, if no primary key exists.
*/
char **
get_pkey_attnames(Oid relid, int16 *numatts)
{
Relation indexRelation;
ScanKeyData entry;
HeapScanDesc scan;
HeapTuple indexTuple;
int i;
char **result = NULL;
Relation rel;
TupleDesc tupdesc;
/*
* Open relation using relid, get tupdesc
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
/*
* Initialize numatts to 0 in case no primary key
* exists
*/
*numatts = 0;
/*
* Use relid to get all related indexes
*/
indexRelation = heap_openr(IndexRelationName, AccessShareLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
F_OIDEQ, ObjectIdGetDatum(relid));
scan = heap_beginscan(indexRelation, false, SnapshotNow,
1, &entry);
while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
{
Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
/*
* We're only interested if it is the primary key
*/
if (index->indisprimary == TRUE)
{
i = 0;
while (index->indkey[i++] != 0)
(*numatts)++;
if (*numatts > 0)
{
result = (char **) palloc(*numatts * sizeof(char *));
for (i = 0; i < *numatts; i++)
result[i] = SPI_fname(tupdesc, index->indkey[i]);
}
break;
}
}
heap_endscan(scan);
heap_close(indexRelation, AccessShareLock);
relation_close(rel, AccessShareLock);
return result;
}
/*
* get_strtok
*
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
char *
get_strtok(char *fldtext, char *fldsep, int fldnum)
{
int j = 0;
char *result;
if (fldnum < 0)
{
elog(ERROR, "get_strtok: field number < 0 not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
result = strtok(fldtext, fldsep);
for (j = 1; j < fldnum + 1; j++)
{
result = strtok(NULL, fldsep);
if (result == NULL)
return NULL;
}
return pstrdup(result);
}
char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql = NULL;
char *val = NULL;
int16 key;
unsigned int i;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
for (i = 0; i < natts; i++)
{
if (i > 0)
appendStringInfo(str, ",");
appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
}
appendStringInfo(str, ") VALUES(");
/*
* remember attvals are 1 based
*/
for (i = 0; i < natts; i++)
{
if (i > 0)
appendStringInfo(str, ",");
if (tgt_pkattvals != NULL)
key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
else
key = -1;
if (key > -1)
val = pstrdup(tgt_pkattvals[key]);
else
val = SPI_getvalue(tuple, tupdesc, i + 1);
if (val != NULL)
{
appendStringInfo(str, quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "NULL");
}
appendStringInfo(str, ")");
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
char *
get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{
Relation rel;
char *relname;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql = NULL;
char *val = NULL;
unsigned int i;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
if (tgt_pkattvals != NULL)
val = pstrdup(tgt_pkattvals[i]);
else
elog(ERROR, "Target key array must not be NULL");
if (val != NULL)
{
appendStringInfo(str, "=");
appendStringInfo(str, quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
char *
get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql = NULL;
char *val = NULL;
int16 key;
int i;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
for (i = 0; i < natts; i++)
{
if (i > 0)
appendStringInfo(str, ",");
appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
appendStringInfo(str, "=");
if (tgt_pkattvals != NULL)
key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
else
key = -1;
if (key > -1)
val = pstrdup(tgt_pkattvals[key]);
else
val = SPI_getvalue(tuple, tupdesc, i + 1);
if (val != NULL)
{
appendStringInfo(str, quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "NULL");
}
appendStringInfo(str, " WHERE ");
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
if (tgt_pkattvals != NULL)
val = pstrdup(tgt_pkattvals[i]);
else
val = SPI_getvalue(tuple, tupdesc, pkattnum);
if (val != NULL)
{
appendStringInfo(str, "=");
appendStringInfo(str, quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
/*
* Return a properly quoted literal value.
* Uses quote_literal in quote.c
*/
static char *
quote_literal_cstr(char *rawstr)
{
text *rawstr_text;
text *result_text;
char *result;
rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
return result;
}
/*
* Return a properly quoted identifier.
* Uses quote_ident in quote.c
*/
static char *
quote_ident_cstr(char *rawstr)
{
text *rawstr_text;
text *result_text;
char *result;
rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
return result;
}
int16
get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
{
int i;
/*
* Not likely a long list anyway, so just scan for
* the value
*/
for (i = 0; i < pknumatts; i++)
if (key == pkattnums[i])
return i;
return -1;
}
HeapTuple
get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
{
Relation rel;
char *relname;
TupleDesc tupdesc;
StringInfo str = makeStringInfo();
char *sql = NULL;
int ret;
HeapTuple tuple;
int i;
char *val = NULL;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
/*
* Connect to SPI manager
*/
if ((ret = SPI_connect()) < 0)
elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
/*
* Build sql statement to look up tuple of interest
* Use src_pkattvals as the criteria.
*/
appendStringInfo(str, "SELECT * from %s WHERE ", relname);
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
val = pstrdup(src_pkattvals[i]);
if (val != NULL)
{
appendStringInfo(str, "=");
appendStringInfo(str, quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
/*
* Retrieve the desired tuple
*/
ret = SPI_exec(sql, 0);
pfree(sql);
/*
* Only allow one qualifying tuple
*/
if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
{
elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
}
else if (ret == SPI_OK_SELECT && SPI_processed == 1)
{
SPITupleTable *tuptable = SPI_tuptable;
tuple = SPI_copytuple(tuptable->vals[0]);
return tuple;
}
else
{
/*
* no qualifying tuples
*/
return NULL;
}
/*
* never reached, but keep compiler quiet
*/
return NULL;
}
Oid
get_relid_from_relname(char *relname)
{
#ifdef NamespaceRelationName
Oid relid;
relid = RelnameGetRelid(relname);
#else
Relation rel;
Oid relid;
rel = relation_openr(relname, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
#endif /* NamespaceRelationName */
return relid;
}
dblink_results *
get_res_ptr(int32 res_id_index)
{
List *ptr;
/*
* short circuit empty list
*/
if(res_id == NIL)
return NULL;
/*
* OK, should be good to go
*/
foreach(ptr, res_id)
{
dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
if (this_res_id->res_id_index == res_id_index)
return this_res_id;
}
return NULL;
}
/*
* Add node to global List res_id
*/
void
append_res_ptr(dblink_results *results)
{
res_id = lappend(res_id, results);
}
/*
* Remove node from global List
* using res_id_index
*/
void
remove_res_ptr(dblink_results *results)
{
res_id = lremove(results, res_id);
if (res_id == NIL)
res_id_index = 0;
}
......@@ -3,7 +3,8 @@
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
......@@ -33,10 +34,31 @@
#include "libpq-int.h"
#include "fmgr.h"
#include "access/tupdesc.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/pg_index.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/array.h"
#include "utils/syscache.h"
#ifdef NamespaceRelationName
#include "catalog/namespace.h"
#endif /* NamespaceRelationName */
/*
* Max SQL statement size
*/
#define DBLINK_MAX_SQLSTATE_SIZE 16384
/*
* This struct holds the results of the remote query.
......@@ -49,22 +71,75 @@ typedef struct
*/
int tup_num;
/*
* resource index number for this context
*/
int res_id_index;
/*
* the actual query results
*/
PGresult *res;
} dblink_results;
/*
* This struct holds results in the form of an array.
* Use fn_extra to hold a pointer to it across calls
*/
typedef struct
{
/*
* elem being accessed
*/
int elem_num;
/*
* number of elems
*/
int num_elems;
/*
* the actual array
*/
void *res;
} dblink_array_results;
/*
* External declarations
*/
extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS);
extern Datum dblink_strtok(PG_FUNCTION_ARGS);
extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
extern Datum dblink_current_query(PG_FUNCTION_ARGS);
extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
/*
* Internal declarations
*/
dblink_results *init_dblink_results(MemoryContext fn_mcxt);
dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
char **get_pkey_attnames(Oid relid, int16 *numatts);
char *get_strtok(char *fldtext, char *fldsep, int fldnum);
char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
Oid get_relid_from_relname(char *relname);
dblink_results *get_res_ptr(int32 res_id_index);
void append_res_ptr(dblink_results *results);
void remove_res_ptr(dblink_results *results);
extern char *debug_query_string;
#endif /* DBLINK_H */
CREATE FUNCTION dblink (text,text) RETURNS setof int
AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
WITH (isstrict);
CREATE FUNCTION dblink_tok (int,int) RETURNS text
AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
WITH (iscachable, isstrict);
CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
WITH (iscachable, isstrict);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment