Commit 8f337e86 authored by Bruce Momjian's avatar Bruce Momjian

Please apply attached patch to contrib/dblink. It adds named persistent

connections to dblink.

Shridhar Daithanka
parent 92798de0
...@@ -4,8 +4,11 @@ ...@@ -4,8 +4,11 @@
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Joe Conway <mail@joeconway.com> * Joe Conway <mail@joeconway.com>
* And contributors:
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
* *
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -27,14 +30,16 @@ ...@@ -27,14 +30,16 @@
* *
*/ */
Version 0.5 (25 August, 2002): Version 0.6 (14 June, 2003):
Major overhaul to work with new backend "table function" capability. Removed Completely removed previously deprecated functions. Added ability
dblink_strtok() and dblink_replace() functions because they are now to create "named" persistent connections in addition to the single global
available as backend functions (split() and replace() respectively). "unnamed" persistent connection.
Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version Tested under Linux (Red Hat 9) and PostgreSQL 7.4devel.
is no longer backwards portable to PostgreSQL 7.2.
Release Notes: Release Notes:
Version 0.6
- functions deprecated in 0.5 have been removed
- added ability to create "named" persistent connections
Version 0.5 Version 0.5
- dblink now supports use directly as a table function; this is the new - dblink now supports use directly as a table function; this is the new
preferred usage going forward preferred usage going forward
...@@ -87,35 +92,51 @@ Installation: ...@@ -87,35 +92,51 @@ Installation:
connection connection
------------ ------------
dblink_connect(text) RETURNS text dblink_connect(text) RETURNS text
- opens a connection that will persist for duration of current - opens an unnamed connection that will persist for duration of
current backend or until it is disconnected
dblink_connect(text,text) RETURNS text
- opens a named connection that will persist for duration of current
backend or until it is disconnected backend or until it is disconnected
dblink_disconnect() RETURNS text dblink_disconnect() RETURNS text
- disconnects a persistent connection - disconnects the unnamed persistent connection
dblink_disconnect(text) RETURNS text
- disconnects a named persistent connection
cursor cursor
------------ ------------
dblink_open(text,text) RETURNS text dblink_open(text,text) RETURNS text
- opens a cursor using connection already opened with dblink_connect() - opens a cursor using unnamed connection already opened with
that will persist for duration of current backend or until it is dblink_connect() that will persist for duration of current backend
closed or until it is closed
dblink_open(text,text,text) RETURNS text
- opens a cursor using a named connection already opened with
dblink_connect() that will persist for duration of current backend
or until it is closed
dblink_fetch(text, int) RETURNS setof record dblink_fetch(text, int) RETURNS setof record
- fetches data from an already opened cursor - fetches data from an already opened cursor on the unnamed connection
dblink_fetch(text, text, int) RETURNS setof record
- fetches data from an already opened cursor on a named connection
dblink_close(text) RETURNS text dblink_close(text) RETURNS text
- closes a cursor - closes a cursor on the unnamed connection
dblink_close(text,text) RETURNS text
- closes a cursor on a named connection
query query
------------ ------------
dblink(text,text) RETURNS setof record dblink(text,text) RETURNS setof record
- returns a set of results from remote SELECT query - returns a set of results from remote SELECT query; the first argument
(Note: comment out in dblink.sql to use deprecated version) is either a connection string, or the name of an already opened
persistant connection
dblink(text) RETURNS setof record dblink(text) RETURNS setof record
- returns a set of results from remote SELECT query, using connection - returns a set of results from remote SELECT query, using the unnamed
already opened with dblink_connect() connection already opened with dblink_connect()
execute execute
------------ ------------
dblink_exec(text, text) RETURNS text dblink_exec(text, text) RETURNS text
- executes an INSERT/UPDATE/DELETE query remotely - executes an INSERT/UPDATE/DELETE query remotely; the first argument
is either a connection string, or the name of an already opened
persistant connection
dblink_exec(text) RETURNS text dblink_exec(text) RETURNS text
- executes an INSERT/UPDATE/DELETE query remotely, using connection - executes an INSERT/UPDATE/DELETE query remotely, using connection
already opened with dblink_connect() already opened with dblink_connect()
...@@ -136,19 +157,6 @@ Installation: ...@@ -136,19 +157,6 @@ Installation:
- builds an update statement using a local tuple, replacing the - builds an update statement using a local tuple, replacing the
selection key field values with alternate supplied values selection key field values with alternate supplied values
Not installed by default
deprecated
------------
dblink(text,text) RETURNS setof int
- *DEPRECATED* returns a resource id for results from remote query
(Note: must uncomment in dblink.sql to use)
dblink_tok(int,int) RETURNS text
- *DEPRECATED* extracts and returns individual field results; used
only in conjunction with the *DEPRECATED* form of dblink
(Note: must uncomment in dblink.sql to use)
dblink_last_oid(int) RETURNS oid
- *DEPRECATED* returns the last inserted oid
Documentation: Documentation:
Note: Parameters representing relation names must include double Note: Parameters representing relation names must include double
......
...@@ -4,8 +4,11 @@ ...@@ -4,8 +4,11 @@
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Joe Conway <mail@joeconway.com> * Joe Conway <mail@joeconway.com>
* And contributors:
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
* *
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -27,9 +30,7 @@ ...@@ -27,9 +30,7 @@
* *
*/ */
#include "postgres.h" #include "postgres.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "fmgr.h" #include "fmgr.h"
#include "funcapi.h" #include "funcapi.h"
#include "access/tupdesc.h" #include "access/tupdesc.h"
...@@ -51,13 +52,27 @@ ...@@ -51,13 +52,27 @@
#include "utils/array.h" #include "utils/array.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/palloc.h"
#include "utils/dynahash.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "dblink.h" #include "dblink.h"
typedef struct remoteConn
{
PGconn *con; /* Hold the remote connection */
bool remoteTrFlag; /* Indicates whether or not a transaction
* on remote database is in progress*/
} remoteConn;
/* /*
* Internal declarations * Internal declarations
*/ */
static dblink_results *init_dblink_results(MemoryContext fn_mcxt); static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static bool createNewConnection(const char *name,remoteConn *con);
static void deleteConnection(const char *name);
static char **get_pkey_attnames(Oid relid, int16 *numatts); static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
...@@ -67,17 +82,32 @@ static char *quote_ident_cstr(char *rawstr); ...@@ -67,17 +82,32 @@ static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text); static Oid get_relid_from_relname(text *relname_text);
static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results * results);
static void remove_res_ptr(dblink_results * results);
static TupleDesc pgresultGetTupleDesc(PGresult *res); static TupleDesc pgresultGetTupleDesc(PGresult *res);
static char *generate_relation_name(Oid relid); static char *generate_relation_name(Oid relid);
/* Global */ /* Global */
List *res_id = NIL; List *res_id = NIL;
int res_id_index = 0; int res_id_index = 0;
PGconn *persistent_conn = NULL; PGconn *persistent_conn = NULL;
static HTAB *remoteConnHash=NULL;
/*
Following is list that holds multiple remote connections.
Calling convention of each dblink function changes to accept
connection name as the first parameter. The connection list is
much like ecpg e.g. a mapping between a name and a PGconn object.
*/
typedef struct remoteConnHashEnt
{
char name[NAMEDATALEN];
remoteConn *rcon;
} remoteConnHashEnt;
/* initial number of connection hashes */
#define NUMCONN 16
/* general utility */
#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
#define xpfree(var_) \ #define xpfree(var_) \
...@@ -88,6 +118,41 @@ PGconn *persistent_conn = NULL; ...@@ -88,6 +118,41 @@ PGconn *persistent_conn = NULL;
var_ = NULL; \ var_ = NULL; \
} \ } \
} while (0) } while (0)
#define DBLINK_RES_ERROR(p1, p2) \
do { \
msg = pstrdup(PQerrorMessage(conn)); \
if (res) \
PQclear(res); \
elog(ERROR, "%s: %s: %s", p1, p2, msg); \
} while (0)
#define DBLINK_CONN_NOT_AVAIL(p1) \
do { \
if(conname) \
elog(ERROR, "%s: connection %s not available", p1, conname); \
else \
elog(ERROR, "%s: connection not available", p1); \
} while (0)
#define DBLINK_GET_CONN(p1) \
do { \
char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \
rcon = getConnectionByName(conname_or_str); \
if(rcon) \
{ \
conn = rcon->con; \
freeconn = false; \
} \
else \
{ \
connstr = conname_or_str; \
conn = PQconnectdb(connstr); \
if (PQstatus(conn) == CONNECTION_BAD) \
{ \
msg = pstrdup(PQerrorMessage(conn)); \
PQfinish(conn); \
elog(ERROR, "%s: connection error: %s", p1, msg); \
} \
} \
} while (0)
/* /*
...@@ -97,28 +162,52 @@ PG_FUNCTION_INFO_V1(dblink_connect); ...@@ -97,28 +162,52 @@ PG_FUNCTION_INFO_V1(dblink_connect);
Datum Datum
dblink_connect(PG_FUNCTION_ARGS) dblink_connect(PG_FUNCTION_ARGS)
{ {
char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); char *connstr = NULL;
char *connname = NULL;
char *msg; char *msg;
text *result_text;
MemoryContext oldcontext; MemoryContext oldcontext;
PGconn *conn = NULL;
remoteConn *rcon = NULL;
if (persistent_conn != NULL) if(PG_NARGS()==2)
PQfinish(persistent_conn); {
connstr = GET_STR(PG_GETARG_TEXT_P(1));
connname = GET_STR(PG_GETARG_TEXT_P(0));
}
else if(PG_NARGS()==1)
connstr = GET_STR(PG_GETARG_TEXT_P(0));
oldcontext = MemoryContextSwitchTo(TopMemoryContext); oldcontext = MemoryContextSwitchTo(TopMemoryContext);
persistent_conn = PQconnectdb(connstr);
if(connname)
rcon=(remoteConn *) palloc(sizeof(remoteConn));
conn = PQconnectdb(connstr);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
if (PQstatus(persistent_conn) == CONNECTION_BAD) if (PQstatus(conn) == CONNECTION_BAD)
{ {
msg = pstrdup(PQerrorMessage(persistent_conn)); msg = pstrdup(PQerrorMessage(conn));
PQfinish(persistent_conn); PQfinish(conn);
persistent_conn = NULL; if(rcon)
pfree(rcon);
elog(ERROR, "dblink_connect: connection error: %s", msg); elog(ERROR, "dblink_connect: connection error: %s", msg);
} }
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); if(connname)
PG_RETURN_TEXT_P(result_text); {
rcon->con = conn;
if(createNewConnection(connname, rcon) == false)
{
PQfinish(conn);
pfree(rcon);
elog(ERROR, "dblink_connect: cannot save named connection");
}
}
else
persistent_conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
} }
/* /*
...@@ -128,15 +217,37 @@ PG_FUNCTION_INFO_V1(dblink_disconnect); ...@@ -128,15 +217,37 @@ PG_FUNCTION_INFO_V1(dblink_disconnect);
Datum Datum
dblink_disconnect(PG_FUNCTION_ARGS) dblink_disconnect(PG_FUNCTION_ARGS)
{ {
text *result_text; char *str = NULL;
remoteConn *rcon = NULL;
PGconn *conn = NULL;
if (PG_NARGS() ==1 )
{
str = GET_STR(PG_GETARG_TEXT_P(0));
rcon = getConnectionByName(str);
if (rcon)
conn = rcon->con;
}
else
conn = persistent_conn;
if (persistent_conn != NULL) if (!conn)
PQfinish(persistent_conn); {
if (str)
elog(ERROR,"dblink_disconnect: connection named \"%s\" not found",
str);
else
elog(ERROR,"dblink_disconnect: connection not found");
}
persistent_conn = NULL; PQfinish(conn);
if (rcon)
{
deleteConnection(str);
pfree(rcon);
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(GET_TEXT("OK"));
PG_RETURN_TEXT_P(result_text);
} }
/* /*
...@@ -149,27 +260,35 @@ dblink_open(PG_FUNCTION_ARGS) ...@@ -149,27 +260,35 @@ dblink_open(PG_FUNCTION_ARGS)
char *msg; char *msg;
PGresult *res = NULL; PGresult *res = NULL;
PGconn *conn = NULL; PGconn *conn = NULL;
text *result_text; char *curname = NULL;
char *curname = GET_STR(PG_GETARG_TEXT_P(0)); char *sql = NULL;
char *sql = GET_STR(PG_GETARG_TEXT_P(1)); char *conname = NULL;
StringInfo str = makeStringInfo(); StringInfo str = makeStringInfo();
remoteConn *rcon = NULL;
if (persistent_conn != NULL) if(PG_NARGS() == 2)
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = persistent_conn; conn = persistent_conn;
else }
elog(ERROR, "dblink_open: no connection available"); else if(PG_NARGS() == 3)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1));
sql = GET_STR(PG_GETARG_TEXT_P(2));
rcon = getConnectionByName(conname);
if (rcon)
conn = rcon->con;
}
if (!conn)
DBLINK_CONN_NOT_AVAIL("dblink_open");
res = PQexec(conn, "BEGIN"); res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ DBLINK_RES_ERROR("dblink_open", "begin error");
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
persistent_conn = NULL;
elog(ERROR, "dblink_open: begin error: %s", msg);
}
PQclear(res); PQclear(res);
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
...@@ -177,19 +296,11 @@ dblink_open(PG_FUNCTION_ARGS) ...@@ -177,19 +296,11 @@ dblink_open(PG_FUNCTION_ARGS)
if (!res || if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK && (PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK)) PQresultStatus(res) != PGRES_TUPLES_OK))
{ DBLINK_RES_ERROR("dblink_open", "sql error");
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
persistent_conn = NULL;
elog(ERROR, "dblink: sql error: %s", msg); PQclear(res);
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(GET_TEXT("OK"));
PG_RETURN_TEXT_P(result_text);
} }
/* /*
...@@ -201,49 +312,46 @@ dblink_close(PG_FUNCTION_ARGS) ...@@ -201,49 +312,46 @@ dblink_close(PG_FUNCTION_ARGS)
{ {
PGconn *conn = NULL; PGconn *conn = NULL;
PGresult *res = NULL; PGresult *res = NULL;
char *curname = GET_STR(PG_GETARG_TEXT_P(0)); char *curname = NULL;
char *conname = NULL;
StringInfo str = makeStringInfo(); StringInfo str = makeStringInfo();
text *result_text;
char *msg; char *msg;
remoteConn *rcon = NULL;
if (persistent_conn != NULL) if (PG_NARGS() == 1)
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
conn = persistent_conn; conn = persistent_conn;
else }
elog(ERROR, "dblink_close: no connection available"); else if (PG_NARGS()==2)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1));
rcon = getConnectionByName(conname);
if(rcon)
conn = rcon->con;
}
if (!conn)
DBLINK_CONN_NOT_AVAIL("dblink_close");
appendStringInfo(str, "CLOSE %s", curname); appendStringInfo(str, "CLOSE %s", curname);
/* close the cursor */ /* close the cursor */
res = PQexec(conn, str->data); res = PQexec(conn, str->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{ DBLINK_RES_ERROR("dblink_close", "sql error");
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_close: sql error: %s", msg);
}
PQclear(res); PQclear(res);
/* commit the transaction */ /* commit the transaction */
res = PQexec(conn, "COMMIT"); res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ DBLINK_RES_ERROR("dblink_close", "commit error");
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_close: commit error: %s", msg);
}
PQclear(res); PQclear(res);
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(GET_TEXT("OK"));
PG_RETURN_TEXT_P(result_text);
} }
/* /*
...@@ -262,6 +370,8 @@ dblink_fetch(PG_FUNCTION_ARGS) ...@@ -262,6 +370,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *msg; char *msg;
PGresult *res = NULL; PGresult *res = NULL;
MemoryContext oldcontext; MemoryContext oldcontext;
char *conname = NULL;
remoteConn *rcon=NULL;
/* stuff done only on the first call of the function */ /* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) if (SRF_IS_FIRSTCALL())
...@@ -271,8 +381,28 @@ dblink_fetch(PG_FUNCTION_ARGS) ...@@ -271,8 +381,28 @@ dblink_fetch(PG_FUNCTION_ARGS)
Oid funcid = fcinfo->flinfo->fn_oid; Oid funcid = fcinfo->flinfo->fn_oid;
PGconn *conn = NULL; PGconn *conn = NULL;
StringInfo str = makeStringInfo(); StringInfo str = makeStringInfo();
char *curname = GET_STR(PG_GETARG_TEXT_P(0)); char *curname = NULL;
int howmany = PG_GETARG_INT32(1); int howmany = 0;
if (PG_NARGS() == 3)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1));
howmany = PG_GETARG_INT32(2);
rcon = getConnectionByName(conname);
if(rcon)
conn = rcon->con;
}
else if (PG_NARGS() == 2)
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
conn = persistent_conn;
}
if(!conn)
DBLINK_CONN_NOT_AVAIL("dblink_fetch");
/* create a function context for cross-call persistence */ /* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT(); funcctx = SRF_FIRSTCALL_INIT();
...@@ -283,11 +413,6 @@ dblink_fetch(PG_FUNCTION_ARGS) ...@@ -283,11 +413,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
*/ */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_fetch: no connection available");
appendStringInfo(str, "FETCH %d FROM %s", howmany, curname); appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
res = PQexec(conn, str->data); res = PQexec(conn, str->data);
...@@ -295,19 +420,13 @@ dblink_fetch(PG_FUNCTION_ARGS) ...@@ -295,19 +420,13 @@ dblink_fetch(PG_FUNCTION_ARGS)
(PQresultStatus(res) != PGRES_COMMAND_OK && (PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK)) PQresultStatus(res) != PGRES_TUPLES_OK))
{ {
msg = pstrdup(PQerrorMessage(conn)); DBLINK_RES_ERROR("dblink_fetch", "sql error");
PQclear(res);
PQfinish(persistent_conn);
persistent_conn = NULL;
elog(ERROR, "dblink_fetch: sql error: %s", msg);
} }
else if (PQresultStatus(res) == PGRES_COMMAND_OK) else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{ {
/* cursor does not exist - closed already or bad name */ /* cursor does not exist - closed already or bad name */
PQclear(res); PQclear(res);
elog(ERROR, "dblink_fetch: cursor %s does not exist", curname); elog(ERROR, "dblink_fetch: cursor not found: %s", curname);
} }
funcctx->max_calls = PQntuples(res); funcctx->max_calls = PQntuples(res);
...@@ -380,8 +499,8 @@ dblink_fetch(PG_FUNCTION_ARGS) ...@@ -380,8 +499,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
SRF_RETURN_NEXT(funcctx, result); SRF_RETURN_NEXT(funcctx, result);
} }
else else
/* do when there is no more left */
{ {
/* do when there is no more left */
PQclear(res); PQclear(res);
SRF_RETURN_DONE(funcctx); SRF_RETURN_DONE(funcctx);
} }
...@@ -405,6 +524,7 @@ dblink_record(PG_FUNCTION_ARGS) ...@@ -405,6 +524,7 @@ dblink_record(PG_FUNCTION_ARGS)
bool is_sql_cmd = false; bool is_sql_cmd = false;
char *sql_cmd_status = NULL; char *sql_cmd_status = NULL;
MemoryContext oldcontext; MemoryContext oldcontext;
bool freeconn = true;
/* stuff done only on the first call of the function */ /* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) if (SRF_IS_FIRSTCALL())
...@@ -415,6 +535,8 @@ dblink_record(PG_FUNCTION_ARGS) ...@@ -415,6 +535,8 @@ dblink_record(PG_FUNCTION_ARGS)
PGconn *conn = NULL; PGconn *conn = NULL;
char *connstr = NULL; char *connstr = NULL;
char *sql = NULL; char *sql = NULL;
char *conname = NULL;
remoteConn *rcon=NULL;
/* create a function context for cross-call persistence */ /* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT(); funcctx = SRF_FIRSTCALL_INIT();
...@@ -425,70 +547,51 @@ dblink_record(PG_FUNCTION_ARGS) ...@@ -425,70 +547,51 @@ dblink_record(PG_FUNCTION_ARGS)
*/ */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (fcinfo->nargs == 2) if (PG_NARGS() == 2)
{ {
connstr = GET_STR(PG_GETARG_TEXT_P(0)); DBLINK_GET_CONN("dblink");
sql = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
} }
else if (fcinfo->nargs == 1) else if (PG_NARGS() == 1)
{ {
conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink: no connection available");
} }
else else
elog(ERROR, "dblink: wrong number of arguments"); elog(ERROR, "dblink: wrong number of arguments");
if(!conn)
DBLINK_CONN_NOT_AVAIL("dblink_record");
res = PQexec(conn, sql); res = PQexec(conn, sql);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
DBLINK_RES_ERROR("dblink", "sql error");
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{ {
msg = pstrdup(PQerrorMessage(conn)); is_sql_cmd = true;
PQclear(res);
PQfinish(conn); /* need a tuple descriptor representing one TEXT column */
if (fcinfo->nargs == 1) tupdesc = CreateTemplateTupleDesc(1, false);
persistent_conn = NULL; TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0, false);
elog(ERROR, "dblink: sql error: %s", msg); /*
* and save a copy of the command status string to return
* as our result tuple
*/
sql_cmd_status = PQcmdStatus(res);
funcctx->max_calls = 1;
} }
else else
{ funcctx->max_calls = PQntuples(res);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0, false);
/*
* and save a copy of the command status string to return
* as our result tuple
*/
sql_cmd_status = PQcmdStatus(res);
funcctx->max_calls = 1;
}
else
funcctx->max_calls = PQntuples(res);
/* got results, keep track of them */ /* got results, keep track of them */
funcctx->user_fctx = res; funcctx->user_fctx = res;
/* if needed, close the connection to the database and cleanup */ /* if needed, close the connection to the database and cleanup */
if (fcinfo->nargs == 2) if (freeconn && PG_NARGS() == 2)
PQfinish(conn); PQfinish(conn);
}
/* fast track when no results */ /* fast track when no results */
if (funcctx->max_calls < 1) if (funcctx->max_calls < 1)
...@@ -567,8 +670,8 @@ dblink_record(PG_FUNCTION_ARGS) ...@@ -567,8 +670,8 @@ dblink_record(PG_FUNCTION_ARGS)
SRF_RETURN_NEXT(funcctx, result); SRF_RETURN_NEXT(funcctx, result);
} }
else else
/* do when there is no more left */
{ {
/* do when there is no more left */
PQclear(res); PQclear(res);
SRF_RETURN_DONE(funcctx); SRF_RETURN_DONE(funcctx);
} }
...@@ -583,272 +686,62 @@ dblink_exec(PG_FUNCTION_ARGS) ...@@ -583,272 +686,62 @@ dblink_exec(PG_FUNCTION_ARGS)
{ {
char *msg; char *msg;
PGresult *res = NULL; PGresult *res = NULL;
char *sql_cmd_status = NULL; text *sql_cmd_status = NULL;
TupleDesc tupdesc = NULL; TupleDesc tupdesc = NULL;
text *result_text;
PGconn *conn = NULL; PGconn *conn = NULL;
char *connstr = NULL; char *connstr = NULL;
char *sql = NULL; char *sql = NULL;
char *conname = NULL;
remoteConn *rcon=NULL;
bool freeconn = true;
if (fcinfo->nargs == 2) if (PG_NARGS() == 2)
{ {
connstr = GET_STR(PG_GETARG_TEXT_P(0)); DBLINK_GET_CONN("dblink_exec");
sql = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink_exec: connection error: %s", msg);
}
} }
else if (fcinfo->nargs == 1) else if (PG_NARGS() == 1)
{ {
conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
if (persistent_conn != NULL)
conn = persistent_conn;
else
elog(ERROR, "dblink_exec: no connection available");
} }
else else
elog(ERROR, "dblink_exec: wrong number of arguments"); elog(ERROR, "dblink_exec: wrong number of arguments");
if(!conn)
DBLINK_CONN_NOT_AVAIL("dblink_exec");
res = PQexec(conn, sql); res = PQexec(conn, sql);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
DBLINK_RES_ERROR("dblink_exec", "sql error");
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{ {
msg = pstrdup(PQerrorMessage(conn)); /* need a tuple descriptor representing one TEXT column */
PQclear(res); tupdesc = CreateTemplateTupleDesc(1, false);
PQfinish(conn); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
if (fcinfo->nargs == 1) TEXTOID, -1, 0, false);
persistent_conn = NULL;
elog(ERROR, "dblink_exec: sql error: %s", msg); /*
* and save a copy of the command status string to return as
* our result tuple
*/
sql_cmd_status = GET_TEXT(PQcmdStatus(res));
} }
else else
{ elog(ERROR, "dblink_exec: queries returning results not allowed");
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0, false);
/*
* and save a copy of the command status string to return as
* our result tuple
*/
sql_cmd_status = PQcmdStatus(res);
}
else
elog(ERROR, "dblink_exec: queries returning results not allowed");
}
PQclear(res); PQclear(res);
/* if needed, close the connection to the database and cleanup */ /* if needed, close the connection to the database and cleanup */
if (fcinfo->nargs == 2) if (freeconn && fcinfo->nargs == 2)
PQfinish(conn); PQfinish(conn);
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); PG_RETURN_TEXT_P(sql_cmd_status);
PG_RETURN_TEXT_P(result_text);
} }
/*
* Note: this original version of dblink is DEPRECATED;
* it *will* be removed in favor of the new version on next release
*/
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
if (fcinfo->flinfo->fn_extra == NULL)
{
conn = PQconnectdb(optstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
execstatement = (char *) palloc(strlen(sqlstatement) + 1);
if (execstatement != NULL)
{
strcpy(execstatement, sqlstatement);
strcat(execstatement, "\0");
}
else
elog(ERROR, "dblink: insufficient memory");
res = PQexec(conn, execstatement);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
}
else
{
/*
* got results, start fetching them
*/
ntuples = PQntuples(res);
/*
* increment resource index
*/
res_id_index++;
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res_id_index = res_id_index;
results->res = res;
/*
* Append node to res_id to hold pointer to results. Needed by
* dblink_tok to access the data
*/
append_res_ptr(results);
/*
* save pointer to results for the next function manager call
*/
fcinfo->flinfo->fn_extra = (void *) results;
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_INT32(res_id_index);
}
}
else
{
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
res_id_index = results->res_id_index;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples)
{
/*
* fetch them if available
*/
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_INT32(res_id_index);
}
else
{
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
remove_res_ptr(results);
PQclear(results->res);
pfree(results);
fcinfo->flinfo->fn_extra = NULL;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* Note: dblink_tok is DEPRECATED;
* it *will* be removed in favor of the new version on next release
*
* dblink_tok
* parse dblink output string
* return fldnum item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
{
if (res_id != NIL)
{
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
}
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0)
elog(ERROR, "dblink_tok: field number < 0 not permitted");
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1))
elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
PG_RETURN_NULL();
else
{
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
if (result != NULL)
{
strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
strcat(result, "\0");
}
else
elog(ERROR, "dblink: insufficient memory");
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
}
}
/* /*
* dblink_get_pkey * dblink_get_pkey
...@@ -923,7 +816,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS) ...@@ -923,7 +816,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
funcctx->user_fctx = results; funcctx->user_fctx = results;
} }
else else
/* fast track when no results */ /* fast track when no results */
SRF_RETURN_DONE(funcctx); SRF_RETURN_DONE(funcctx);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
...@@ -965,37 +858,10 @@ dblink_get_pkey(PG_FUNCTION_ARGS) ...@@ -965,37 +858,10 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
SRF_RETURN_NEXT(funcctx, result); SRF_RETURN_NEXT(funcctx, result);
} }
else else
/* do when there is no more left */
SRF_RETURN_DONE(funcctx);
}
/*
* Note: dblink_last_oid is DEPRECATED;
* it *will* be removed on next release
*
* dblink_last_oid
* return last inserted oid
*/
PG_FUNCTION_INFO_V1(dblink_last_oid);
Datum
dblink_last_oid(PG_FUNCTION_ARGS)
{
dblink_results *results;
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
{ {
if (res_id != NIL) /* do when there is no more left */
{ SRF_RETURN_DONE(funcctx);
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
} }
PG_RETURN_OID(PQoidValue(results->res));
} }
...@@ -1043,7 +909,6 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) ...@@ -1043,7 +909,6 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
int i; int i;
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text;
int16 typlen; int16 typlen;
bool typbyval; bool typbyval;
char typalign; char typalign;
...@@ -1138,15 +1003,10 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) ...@@ -1138,15 +1003,10 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
*/ */
sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/* /*
* And send it * And send it
*/ */
PG_RETURN_TEXT_P(sql_text); PG_RETURN_TEXT_P(GET_TEXT(sql));
} }
...@@ -1182,7 +1042,6 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) ...@@ -1182,7 +1042,6 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
int i; int i;
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text;
int16 typlen; int16 typlen;
bool typbyval; bool typbyval;
char typalign; char typalign;
...@@ -1246,15 +1105,10 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) ...@@ -1246,15 +1105,10 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
*/ */
sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/* /*
* And send it * And send it
*/ */
PG_RETURN_TEXT_P(sql_text); PG_RETURN_TEXT_P(GET_TEXT(sql));
} }
...@@ -1299,7 +1153,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -1299,7 +1153,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
int i; int i;
char *ptr; char *ptr;
char *sql; char *sql;
text *sql_text;
int16 typlen; int16 typlen;
bool typbyval; bool typbyval;
char typalign; char typalign;
...@@ -1394,15 +1247,10 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -1394,15 +1247,10 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
*/ */
sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/* /*
* And send it * And send it
*/ */
PG_RETURN_TEXT_P(sql_text); PG_RETURN_TEXT_P(GET_TEXT(sql));
} }
/* /*
...@@ -1415,10 +1263,7 @@ PG_FUNCTION_INFO_V1(dblink_current_query); ...@@ -1415,10 +1263,7 @@ PG_FUNCTION_INFO_V1(dblink_current_query);
Datum Datum
dblink_current_query(PG_FUNCTION_ARGS) dblink_current_query(PG_FUNCTION_ARGS)
{ {
text *result_text; PG_RETURN_TEXT_P(GET_TEXT(debug_query_string));
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
PG_RETURN_TEXT_P(result_text);
} }
...@@ -1427,29 +1272,6 @@ dblink_current_query(PG_FUNCTION_ARGS) ...@@ -1427,29 +1272,6 @@ dblink_current_query(PG_FUNCTION_ARGS)
*/ */
/*
* init_dblink_results
* - create an empty dblink_results data structure
*/
static dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_results *) palloc0(sizeof(dblink_results));
retval->tup_num = -1;
retval->res_id_index = -1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/* /*
* get_pkey_attnames * get_pkey_attnames
* *
...@@ -1488,7 +1310,10 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -1488,7 +1310,10 @@ get_pkey_attnames(Oid relid, int16 *numatts)
/* we're only interested if it is the primary key */ /* we're only interested if it is the primary key */
if (index->indisprimary == TRUE) if (index->indisprimary == TRUE)
{ {
*numatts = index->indnatts; i = 0;
while (index->indkey[i++] != 0)
(*numatts)++;
if (*numatts > 0) if (*numatts > 0)
{ {
result = (char **) palloc(*numatts * sizeof(char *)); result = (char **) palloc(*numatts * sizeof(char *));
...@@ -1907,52 +1732,6 @@ get_relid_from_relname(text *relname_text) ...@@ -1907,52 +1732,6 @@ get_relid_from_relname(text *relname_text)
return relid; return relid;
} }
static dblink_results *
get_res_ptr(int32 res_id_index)
{
List *ptr;
/*
* short circuit empty list
*/
if (res_id == NIL)
return NULL;
/*
* OK, should be good to go
*/
foreach(ptr, res_id)
{
dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
if (this_res_id->res_id_index == res_id_index)
return this_res_id;
}
return NULL;
}
/*
* Add node to global List res_id
*/
static void
append_res_ptr(dblink_results * results)
{
res_id = lappend(res_id, results);
}
/*
* Remove node from global List
* using res_id_index
*/
static void
remove_res_ptr(dblink_results * results)
{
res_id = lremove(results, res_id);
if (res_id == NIL)
res_id_index = 0;
}
static TupleDesc static TupleDesc
pgresultGetTupleDesc(PGresult *res) pgresultGetTupleDesc(PGresult *res)
{ {
...@@ -2039,3 +1818,91 @@ generate_relation_name(Oid relid) ...@@ -2039,3 +1818,91 @@ generate_relation_name(Oid relid)
return result; return result;
} }
static remoteConn *
getConnectionByName(const char *name)
{
remoteConnHashEnt *hentry;
char key[NAMEDATALEN];
if(!remoteConnHash)
remoteConnHash=createConnHash();
MemSet(key, 0, NAMEDATALEN);
snprintf(key, NAMEDATALEN - 1, "%s", name);
hentry = (remoteConnHashEnt*) hash_search(remoteConnHash,
key, HASH_FIND, NULL);
if(hentry)
return(hentry->rcon);
return(NULL);
}
static HTAB *
createConnHash(void)
{
HASHCTL ctl;
HTAB *ptr;
ctl.keysize = NAMEDATALEN;
ctl.entrysize = sizeof(remoteConnHashEnt);
ptr=hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
if(!ptr)
elog(ERROR,"Can not create connections hash table. Out of memory");
return(ptr);
}
static bool
createNewConnection(const char *name, remoteConn *con)
{
remoteConnHashEnt *hentry;
bool found;
char key[NAMEDATALEN];
if(!remoteConnHash)
remoteConnHash=createConnHash();
MemSet(key, 0, NAMEDATALEN);
snprintf(key, NAMEDATALEN - 1, "%s", name);
hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
HASH_ENTER, &found);
if(!hentry)
elog(ERROR, "failed to create connection");
if(found)
{
elog(NOTICE, "cannot use a connection name more than once");
return false;
}
hentry->rcon = con;
strncpy(hentry->name, name, NAMEDATALEN - 1);
return true;
}
static void
deleteConnection(const char *name)
{
remoteConnHashEnt *hentry;
bool found;
char key[NAMEDATALEN];
if(!remoteConnHash)
remoteConnHash=createConnHash();
MemSet(key, 0, NAMEDATALEN);
snprintf(key, NAMEDATALEN - 1, "%s", name);
hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
key, HASH_REMOVE, &found);
if(!hentry)
elog(WARNING,"Trying to delete a connection that does not exist");
}
...@@ -4,8 +4,11 @@ ...@@ -4,8 +4,11 @@
* Functions returning results from a remote database * Functions returning results from a remote database
* *
* Joe Conway <mail@joeconway.com> * Joe Conway <mail@joeconway.com>
* And contributors:
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
* *
* Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
* Permission to use, copy, modify, and distribute this software and its * Permission to use, copy, modify, and distribute this software and its
...@@ -30,36 +33,9 @@ ...@@ -30,36 +33,9 @@
#ifndef DBLINK_H #ifndef DBLINK_H
#define DBLINK_H #define DBLINK_H
/*
* This struct holds the results of the remote query.
* Use fn_extra to hold a pointer to it across calls
*/
typedef struct
{
/*
* last tuple number accessed
*/
int tup_num;
/*
* resource index number for this context
*/
int res_id_index;
/*
* the actual query results
*/
PGresult *res;
} dblink_results;
/* /*
* External declarations * External declarations
*/ */
/* deprecated */
extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS);
/* supported */
extern Datum dblink_connect(PG_FUNCTION_ARGS); extern Datum dblink_connect(PG_FUNCTION_ARGS);
extern Datum dblink_disconnect(PG_FUNCTION_ARGS); extern Datum dblink_disconnect(PG_FUNCTION_ARGS);
extern Datum dblink_open(PG_FUNCTION_ARGS); extern Datum dblink_open(PG_FUNCTION_ARGS);
...@@ -68,7 +44,6 @@ extern Datum dblink_fetch(PG_FUNCTION_ARGS); ...@@ -68,7 +44,6 @@ extern Datum dblink_fetch(PG_FUNCTION_ARGS);
extern Datum dblink_record(PG_FUNCTION_ARGS); extern Datum dblink_record(PG_FUNCTION_ARGS);
extern Datum dblink_exec(PG_FUNCTION_ARGS); extern Datum dblink_exec(PG_FUNCTION_ARGS);
extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
......
--
-- Uncomment the following commented lines to use original DEPRECATED functions
--
--CREATE OR REPLACE FUNCTION dblink (text,text)
--RETURNS setof int
--AS 'MODULE_PATHNAME','dblink'
--LANGUAGE 'C' WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_tok (int,int)
--RETURNS text
--AS 'MODULE_PATHNAME','dblink_tok'
--LANGUAGE 'C' WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_last_oid (int)
--RETURNS oid
--AS 'MODULE_PATHNAME','dblink_last_oid'
--LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_connect (text) CREATE OR REPLACE FUNCTION dblink_connect (text)
RETURNS text RETURNS text
AS 'MODULE_PATHNAME','dblink_connect' AS 'MODULE_PATHNAME','dblink_connect'
LANGUAGE 'C' WITH (isstrict); LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_connect (text, text)
RETURNS text
AS 'MODULE_PATHNAME','dblink_connect'
LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_disconnect () CREATE OR REPLACE FUNCTION dblink_disconnect ()
RETURNS text RETURNS text
AS 'MODULE_PATHNAME','dblink_disconnect' AS 'MODULE_PATHNAME','dblink_disconnect'
LANGUAGE 'C' WITH (isstrict); LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_disconnect (text)
RETURNS text
AS 'MODULE_PATHNAME','dblink_disconnect'
LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_open (text,text) CREATE OR REPLACE FUNCTION dblink_open (text,text)
RETURNS text RETURNS text
AS 'MODULE_PATHNAME','dblink_open' AS 'MODULE_PATHNAME','dblink_open'
LANGUAGE 'C' WITH (isstrict); LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_open (text,text,text)
RETURNS text
AS 'MODULE_PATHNAME','dblink_open'
LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_fetch (text,int) CREATE OR REPLACE FUNCTION dblink_fetch (text,int)
RETURNS setof record RETURNS setof record
AS 'MODULE_PATHNAME','dblink_fetch' AS 'MODULE_PATHNAME','dblink_fetch'
LANGUAGE 'C' WITH (isstrict); LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_fetch (text,text,int)
RETURNS setof record
AS 'MODULE_PATHNAME','dblink_fetch'
LANGUAGE 'C' WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_close (text) CREATE OR REPLACE FUNCTION dblink_close (text)
RETURNS text RETURNS text
AS 'MODULE_PATHNAME','dblink_close' AS 'MODULE_PATHNAME','dblink_close'
LANGUAGE 'C' WITH (isstrict); LANGUAGE 'C' WITH (isstrict);
-- Note: if this is not a first time install of dblink, uncomment the CREATE OR REPLACE FUNCTION dblink_close (text,text)
-- following DROP which prepares the database for the new, non-deprecated RETURNS text
-- version. AS 'MODULE_PATHNAME','dblink_close'
--DROP FUNCTION dblink (text,text); LANGUAGE 'C' WITH (isstrict);
-- Comment out the following 3 lines if the DEPRECATED functions are used.
CREATE OR REPLACE FUNCTION dblink (text,text) CREATE OR REPLACE FUNCTION dblink (text,text)
RETURNS setof record RETURNS setof record
AS 'MODULE_PATHNAME','dblink_record' AS 'MODULE_PATHNAME','dblink_record'
......
...@@ -6,21 +6,35 @@ dblink_connect -- Opens a persistent connection to a remote database ...@@ -6,21 +6,35 @@ dblink_connect -- Opens a persistent connection to a remote database
Synopsis Synopsis
dblink_connect(text connstr) dblink_connect(text connstr)
dblink_connect(text connname, text connstr)
Inputs Inputs
connname
if 2 arguments are given, the first is used as a name for a persistent
connection
connstr connstr
standard libpq format connection string, standard libpq format connection string,
e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
if only one argument is given, the connection is unnamed; only one unnamed
connection can exist at a time
Outputs Outputs
Returns status = "OK" Returns status = "OK"
Example usage Example usage
test=# select dblink_connect('dbname=template1'); select dblink_connect('dbname=template1');
dblink_connect
----------------
OK
(1 row)
select dblink_connect('myconn','dbname=template1');
dblink_connect dblink_connect
---------------- ----------------
OK OK
...@@ -29,15 +43,18 @@ test=# select dblink_connect('dbname=template1'); ...@@ -29,15 +43,18 @@ test=# select dblink_connect('dbname=template1');
================================================================== ==================================================================
Name Name
dblink_disconnect -- Closes the persistent connection to a remote database dblink_disconnect -- Closes a persistent connection to a remote database
Synopsis Synopsis
dblink_disconnect() dblink_disconnect()
dblink_disconnect(text connname)
Inputs Inputs
none connname
if an argument is given, it is used as a name for a persistent
connection to close; otherwiase the unnamed connection is closed
Outputs Outputs
...@@ -51,3 +68,8 @@ test=# select dblink_disconnect(); ...@@ -51,3 +68,8 @@ test=# select dblink_disconnect();
OK OK
(1 row) (1 row)
select dblink_disconnect('myconn');
dblink_disconnect
-------------------
OK
(1 row)
...@@ -6,9 +6,14 @@ dblink_open -- Opens a cursor on a remote database ...@@ -6,9 +6,14 @@ dblink_open -- Opens a cursor on a remote database
Synopsis Synopsis
dblink_open(text cursorname, text sql) dblink_open(text cursorname, text sql)
dblink_open(text connname, text cursorname, text sql)
Inputs Inputs
connname
if three arguments are present, the first is taken as the specific
connection name to use; otherwise the unnamed connection is assumed
cursorname cursorname
a reference name for the cursor a reference name for the cursor
...@@ -52,9 +57,14 @@ dblink_fetch -- Returns a set from an open cursor on a remote database ...@@ -52,9 +57,14 @@ dblink_fetch -- Returns a set from an open cursor on a remote database
Synopsis Synopsis
dblink_fetch(text cursorname, int32 howmany) dblink_fetch(text cursorname, int32 howmany)
dblink_fetch(text connname, text cursorname, int32 howmany)
Inputs Inputs
connname
if three arguments are present, the first is taken as the specific
connection name to use; otherwise the unnamed connection is assumed
cursorname cursorname
The reference name for the cursor The reference name for the cursor
...@@ -123,9 +133,14 @@ dblink_close -- Closes a cursor on a remote database ...@@ -123,9 +133,14 @@ dblink_close -- Closes a cursor on a remote database
Synopsis Synopsis
dblink_close(text cursorname) dblink_close(text cursorname)
dblink_close(text connname, text cursorname)
Inputs Inputs
connname
if two arguments are present, the first is taken as the specific
connection name to use; otherwise the unnamed connection is assumed
cursorname cursorname
a reference name for the cursor a reference name for the cursor
...@@ -135,7 +150,8 @@ Outputs ...@@ -135,7 +150,8 @@ Outputs
Returns status = "OK" Returns status = "OK"
Note Note
dblink_connect(text connstr) must be executed first. dblink_connect(text connstr) or dblink_connect(text connname, text connstr)
must be executed first.
Example usage Example usage
...@@ -157,3 +173,20 @@ test=# select dblink_close('foo'); ...@@ -157,3 +173,20 @@ test=# select dblink_close('foo');
OK OK
(1 row) (1 row)
select dblink_connect('myconn','dbname=regression');
dblink_connect
----------------
OK
(1 row)
select dblink_open('myconn','foo','select proname, prosrc from pg_proc');
dblink_open
-------------
OK
(1 row)
select dblink_close('myconn','foo');
dblink_close
--------------
OK
(1 row)
...@@ -6,22 +6,23 @@ dblink_exec -- Executes an UPDATE/INSERT/DELETE on a remote database ...@@ -6,22 +6,23 @@ dblink_exec -- Executes an UPDATE/INSERT/DELETE on a remote database
Synopsis Synopsis
dblink_exec(text connstr, text sql) dblink_exec(text connstr, text sql)
- or - dblink_exec(text connname, text sql)
dblink_exec(text sql) dblink_exec(text sql)
Inputs Inputs
connname
connstr connstr
If two arguments are present, the first is first assumed to be a specific
connection name to use. If the name is not found, the argument is then
assumed to be a valid connection string, of standard libpq format,
e.g.: "hostaddr=127.0.0.1 dbname=mydb user=postgres password=mypasswd"
standard libpq format connection string, If only one argument is used, then the unnamed connection is used.
e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
If the second form is used, then the dblink_connect(text connstr) must be
executed first.
sql sql
sql statement that you wish to execute on the remote host, e.g.: sql statement that you wish to execute on the remote host, e.g.:
insert into foo values(0,'a','{"a0","b0","c0"}'); insert into foo values(0,'a','{"a0","b0","c0"}');
Outputs Outputs
...@@ -36,14 +37,26 @@ Notes ...@@ -36,14 +37,26 @@ Notes
Example usage Example usage
test=# select dblink_connect('dbname=dblink_test_slave'); select dblink_connect('dbname=dblink_test_slave');
dblink_connect dblink_connect
---------------- ----------------
OK OK
(1 row) (1 row)
test=# select dblink_exec('insert into foo values(21,''z'',''{"a0","b0","c0"}'');'); select dblink_exec('insert into foo values(21,''z'',''{"a0","b0","c0"}'');');
dblink_exec dblink_exec
----------------- -----------------
INSERT 943366 1 INSERT 943366 1
(1 row) (1 row)
select dblink_connect('myconn','dbname=regression');
dblink_connect
----------------
OK
(1 row)
select dblink_exec('myconn','insert into foo values(21,''z'',''{"a0","b0","c0"}'');');
dblink_exec
------------------
INSERT 6432584 1
(1 row)
...@@ -6,17 +6,19 @@ dblink -- Returns a set from a remote database ...@@ -6,17 +6,19 @@ dblink -- Returns a set from a remote database
Synopsis Synopsis
dblink(text connstr, text sql) dblink(text connstr, text sql)
- or - dblink(text connname, text sql)
dblink(text sql) dblink(text sql)
Inputs Inputs
connname
connstr connstr
If two arguments are present, the first is first assumed to be a specific
connection name to use. If the name is not found, the argument is then
assumed to be a valid connection string, of standard libpq format,
e.g.: "hostaddr=127.0.0.1 dbname=mydb user=postgres password=mypasswd"
standard libpq format connection string, If only one argument is used, then the unnamed connection is used.
e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
If the second form is used, then the dblink_connect(text connstr) must be
executed first.
sql sql
...@@ -29,7 +31,7 @@ Outputs ...@@ -29,7 +31,7 @@ Outputs
Example usage Example usage
test=# select * from dblink('dbname=template1','select proname, prosrc from pg_proc') select * from dblink('dbname=template1','select proname, prosrc from pg_proc')
as t1(proname name, prosrc text) where proname like 'bytea%'; as t1(proname name, prosrc text) where proname like 'bytea%';
proname | prosrc proname | prosrc
------------+------------ ------------+------------
...@@ -47,13 +49,13 @@ test=# select * from dblink('dbname=template1','select proname, prosrc from pg_p ...@@ -47,13 +49,13 @@ test=# select * from dblink('dbname=template1','select proname, prosrc from pg_p
byteaout | byteaout byteaout | byteaout
(12 rows) (12 rows)
test=# select dblink_connect('dbname=template1'); select dblink_connect('dbname=template1');
dblink_connect dblink_connect
---------------- ----------------
OK OK
(1 row) (1 row)
test=# select * from dblink('select proname, prosrc from pg_proc') select * from dblink('select proname, prosrc from pg_proc')
as t1(proname name, prosrc text) where proname like 'bytea%'; as t1(proname name, prosrc text) where proname like 'bytea%';
proname | prosrc proname | prosrc
------------+------------ ------------+------------
...@@ -71,6 +73,33 @@ test=# select * from dblink('select proname, prosrc from pg_proc') ...@@ -71,6 +73,33 @@ test=# select * from dblink('select proname, prosrc from pg_proc')
byteaout | byteaout byteaout | byteaout
(12 rows) (12 rows)
select dblink_connect('myconn','dbname=regression');
dblink_connect
----------------
OK
(1 row)
select * from dblink('myconn','select proname, prosrc from pg_proc')
as t1(proname name, prosrc text) where proname like 'bytea%';
proname | prosrc
------------+------------
bytearecv | bytearecv
byteasend | byteasend
byteale | byteale
byteagt | byteagt
byteage | byteage
byteane | byteane
byteacmp | byteacmp
bytealike | bytealike
byteanlike | byteanlike
byteacat | byteacat
byteaeq | byteaeq
bytealt | bytealt
byteain | byteain
byteaout | byteaout
(14 rows)
================================================================== ==================================================================
A more convenient way to use dblink may be to create a view: A more convenient way to use dblink may be to create a view:
......
...@@ -106,11 +106,11 @@ WHERE t.a > 7; ...@@ -106,11 +106,11 @@ WHERE t.a > 7;
9 | j | {a9,b9,c9} 9 | j | {a9,b9,c9}
(2 rows) (2 rows)
-- should generate "no connection available" error -- should generate "connection not available" error
SELECT * SELECT *
FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7; WHERE t.a > 7;
ERROR: dblink: no connection available ERROR: dblink_record: connection not available
-- create a persistent connection -- create a persistent connection
SELECT dblink_connect('dbname=regression'); SELECT dblink_connect('dbname=regression');
dblink_connect dblink_connect
...@@ -172,10 +172,10 @@ SELECT dblink_close('rmt_foo_cursor'); ...@@ -172,10 +172,10 @@ SELECT dblink_close('rmt_foo_cursor');
OK OK
(1 row) (1 row)
-- should generate "cursor rmt_foo_cursor does not exist" error -- should generate "cursor not found: rmt_foo_cursor" error
SELECT * SELECT *
FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]);
ERROR: dblink_fetch: cursor rmt_foo_cursor does not exist ERROR: dblink_fetch: cursor not found: rmt_foo_cursor
-- close the persistent connection -- close the persistent connection
SELECT dblink_disconnect(); SELECT dblink_disconnect();
dblink_disconnect dblink_disconnect
...@@ -183,11 +183,12 @@ SELECT dblink_disconnect(); ...@@ -183,11 +183,12 @@ SELECT dblink_disconnect();
OK OK
(1 row) (1 row)
-- should generate "no connection available" error -- should generate "no connection to the server" error
SELECT * SELECT *
FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7; WHERE t.a > 7;
ERROR: dblink: no connection available ERROR: dblink: sql error: no connection to the server
-- put more data into our slave table, first using arbitrary connection syntax -- put more data into our slave table, first using arbitrary connection syntax
-- but truncate the actual return value so we can use diff to check for success -- but truncate the actual return value so we can use diff to check for success
SELECT substr(dblink_exec('dbname=regression','INSERT INTO foo VALUES(10,''k'',''{"a10","b10","c10"}'')'),1,6); SELECT substr(dblink_exec('dbname=regression','INSERT INTO foo VALUES(10,''k'',''{"a10","b10","c10"}'')'),1,6);
...@@ -268,3 +269,198 @@ SELECT dblink_disconnect(); ...@@ -268,3 +269,198 @@ SELECT dblink_disconnect();
OK OK
(1 row) (1 row)
--
-- tests for the new named persistent connection syntax
--
-- should generate "missing "=" after "myconn" in connection info string" error
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
ERROR: dblink: connection error: missing "=" after "myconn" in connection info string
-- create a named persistent connection
SELECT dblink_connect('myconn','dbname=regression');
dblink_connect
----------------
OK
(1 row)
-- use the named persistent connection
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
a | b | c
----+---+---------------
8 | i | {a8,b8,c8}
9 | j | {a9,b9,c9}
10 | k | {a10,b10,c10}
(3 rows)
-- create a second named persistent connection
-- should error with "cannot save named connection"
SELECT dblink_connect('myconn','dbname=regression');
NOTICE: cannot use a connection name more than once
ERROR: dblink_connect: cannot save named connection
-- create a second named persistent connection with a new name
SELECT dblink_connect('myconn2','dbname=regression');
dblink_connect
----------------
OK
(1 row)
-- use the second named persistent connection
SELECT *
FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
a | b | c
----+---+---------------
8 | i | {a8,b8,c8}
9 | j | {a9,b9,c9}
10 | k | {a10,b10,c10}
(3 rows)
-- close the second named persistent connection
SELECT dblink_disconnect('myconn2');
dblink_disconnect
-------------------
OK
(1 row)
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
-------------
OK
(1 row)
-- fetch some data
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
a | b | c
---+---+------------
0 | a | {a0,b0,c0}
1 | b | {a1,b1,c1}
2 | c | {a2,b2,c2}
3 | d | {a3,b3,c3}
(4 rows)
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
a | b | c
---+---+------------
4 | e | {a4,b4,c4}
5 | f | {a5,b5,c5}
6 | g | {a6,b6,c6}
7 | h | {a7,b7,c7}
(4 rows)
-- this one only finds three rows left
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
a | b | c
----+---+---------------
8 | i | {a8,b8,c8}
9 | j | {a9,b9,c9}
10 | k | {a10,b10,c10}
(3 rows)
-- close the cursor
SELECT dblink_close('myconn','rmt_foo_cursor');
dblink_close
--------------
OK
(1 row)
-- should generate "cursor not found: rmt_foo_cursor" error
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
ERROR: dblink_fetch: cursor not found: rmt_foo_cursor
-- close the named persistent connection
SELECT dblink_disconnect('myconn');
dblink_disconnect
-------------------
OK
(1 row)
-- should generate "missing "=" after "myconn" in connection info string" error
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
ERROR: dblink: connection error: missing "=" after "myconn" in connection info string
-- create a named persistent connection
SELECT dblink_connect('myconn','dbname=regression');
dblink_connect
----------------
OK
(1 row)
-- put more data into our slave table, using named persistent connection syntax
-- but truncate the actual return value so we can use diff to check for success
SELECT substr(dblink_exec('myconn','INSERT INTO foo VALUES(11,''l'',''{"a11","b11","c11"}'')'),1,6);
substr
--------
INSERT
(1 row)
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
a | b | c
----+---+---------------
0 | a | {a0,b0,c0}
1 | b | {a1,b1,c1}
2 | c | {a2,b2,c2}
3 | d | {a3,b3,c3}
4 | e | {a4,b4,c4}
5 | f | {a5,b5,c5}
6 | g | {a6,b6,c6}
7 | h | {a7,b7,c7}
8 | i | {a8,b8,c8}
9 | j | {a9,b9,c9}
10 | k | {a10,b10,c10}
11 | l | {a11,b11,c11}
(12 rows)
-- change some data
SELECT dblink_exec('myconn','UPDATE foo SET f3[2] = ''b99'' WHERE f1 = 11');
dblink_exec
-------------
UPDATE 1
(1 row)
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE a = 11;
a | b | c
----+---+---------------
11 | l | {a11,b99,c11}
(1 row)
-- delete some data
SELECT dblink_exec('myconn','DELETE FROM foo WHERE f1 = 11');
dblink_exec
-------------
DELETE 1
(1 row)
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE a = 11;
a | b | c
---+---+---
(0 rows)
-- close the named persistent connection
SELECT dblink_disconnect('myconn');
dblink_disconnect
-------------------
OK
(1 row)
-- close the named persistent connection again
-- should get "connection named "myconn" not found" error
SELECT dblink_disconnect('myconn');
ERROR: dblink_disconnect: connection named "myconn" not found
...@@ -68,7 +68,7 @@ SELECT * ...@@ -68,7 +68,7 @@ SELECT *
FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[]) FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7; WHERE t.a > 7;
-- should generate "no connection available" error -- should generate "connection not available" error
SELECT * SELECT *
FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7; WHERE t.a > 7;
...@@ -98,14 +98,14 @@ FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); ...@@ -98,14 +98,14 @@ FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]);
-- close the cursor -- close the cursor
SELECT dblink_close('rmt_foo_cursor'); SELECT dblink_close('rmt_foo_cursor');
-- should generate "cursor rmt_foo_cursor does not exist" error -- should generate "cursor not found: rmt_foo_cursor" error
SELECT * SELECT *
FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]);
-- close the persistent connection -- close the persistent connection
SELECT dblink_disconnect(); SELECT dblink_disconnect();
-- should generate "no connection available" error -- should generate "no connection to the server" error
SELECT * SELECT *
FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7; WHERE t.a > 7;
...@@ -143,3 +143,98 @@ WHERE a = 11; ...@@ -143,3 +143,98 @@ WHERE a = 11;
-- close the persistent connection -- close the persistent connection
SELECT dblink_disconnect(); SELECT dblink_disconnect();
--
-- tests for the new named persistent connection syntax
--
-- should generate "missing "=" after "myconn" in connection info string" error
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
-- create a named persistent connection
SELECT dblink_connect('myconn','dbname=regression');
-- use the named persistent connection
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
-- create a second named persistent connection
-- should error with "cannot save named connection"
SELECT dblink_connect('myconn','dbname=regression');
-- create a second named persistent connection with a new name
SELECT dblink_connect('myconn2','dbname=regression');
-- use the second named persistent connection
SELECT *
FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
-- close the second named persistent connection
SELECT dblink_disconnect('myconn2');
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
-- fetch some data
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
-- this one only finds three rows left
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
-- close the cursor
SELECT dblink_close('myconn','rmt_foo_cursor');
-- should generate "cursor not found: rmt_foo_cursor" error
SELECT *
FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]);
-- close the named persistent connection
SELECT dblink_disconnect('myconn');
-- should generate "missing "=" after "myconn" in connection info string" error
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE t.a > 7;
-- create a named persistent connection
SELECT dblink_connect('myconn','dbname=regression');
-- put more data into our slave table, using named persistent connection syntax
-- but truncate the actual return value so we can use diff to check for success
SELECT substr(dblink_exec('myconn','INSERT INTO foo VALUES(11,''l'',''{"a11","b11","c11"}'')'),1,6);
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
-- change some data
SELECT dblink_exec('myconn','UPDATE foo SET f3[2] = ''b99'' WHERE f1 = 11');
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE a = 11;
-- delete some data
SELECT dblink_exec('myconn','DELETE FROM foo WHERE f1 = 11');
-- let's see it
SELECT *
FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
WHERE a = 11;
-- close the named persistent connection
SELECT dblink_disconnect('myconn');
-- close the named persistent connection again
-- should get "connection named "myconn" not found" error
SELECT dblink_disconnect('myconn');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment