Commit d15b1e17 authored by Tom Lane's avatar Tom Lane

Update dblink to work with qualified relation names.

From Joe Conway.
parent 3212cf94
...@@ -65,17 +65,17 @@ Installation: ...@@ -65,17 +65,17 @@ Installation:
- extracts and returns individual field results - extracts and returns individual field results
dblink_strtok(text,text,int) RETURNS text dblink_strtok(text,text,int) RETURNS text
- extracts and returns individual token from delimited text - extracts and returns individual token from delimited text
dblink_get_pkey(name) RETURNS setof text dblink_get_pkey(text) RETURNS setof text
- returns the field names of a relation's primary key fields - returns the field names of a relation's primary key fields
dblink_last_oid(int) RETURNS oid dblink_last_oid(int) RETURNS oid
- returns the last inserted oid - returns the last inserted oid
dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
- builds an insert statement using a local tuple, replacing the - builds an insert statement using a local tuple, replacing the
selection key field values with alternate supplied values selection key field values with alternate supplied values
dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text
- builds a delete statement using supplied values for selection - builds a delete statement using supplied values for selection
key field values key field values
dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
- builds an update statement using a local tuple, replacing the - builds an update statement using a local tuple, replacing the
selection key field values with alternate supplied values selection key field values with alternate supplied values
dblink_current_query() RETURNS text dblink_current_query() RETURNS text
...@@ -206,7 +206,7 @@ dblink_get_pkey -- returns the field names of a relation's primary ...@@ -206,7 +206,7 @@ dblink_get_pkey -- returns the field names of a relation's primary
Synopsis Synopsis
dblink_get_pkey(name relname) RETURNS setof text dblink_get_pkey(text relname) RETURNS setof text
Inputs Inputs
...@@ -278,16 +278,16 @@ dblink_build_sql_update -- builds an update statement using a local ...@@ -278,16 +278,16 @@ dblink_build_sql_update -- builds an update statement using a local
Synopsis Synopsis
dblink_build_sql_insert(name relname dblink_build_sql_insert(text relname
,int2vector primary_key_attnums ,int2vector primary_key_attnums
,int2 num_primary_key_atts ,int2 num_primary_key_atts
,_text src_pk_att_vals_array ,_text src_pk_att_vals_array
,_text tgt_pk_att_vals_array) RETURNS text ,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_delete(name relname dblink_build_sql_delete(text relname
,int2vector primary_key_attnums ,int2vector primary_key_attnums
,int2 num_primary_key_atts ,int2 num_primary_key_atts
,_text tgt_pk_att_vals_array) RETURNS text ,_text tgt_pk_att_vals_array) RETURNS text
dblink_build_sql_update(name relname dblink_build_sql_update(text relname
,int2vector primary_key_attnums ,int2vector primary_key_attnums
,int2 num_primary_key_atts ,int2 num_primary_key_atts
,_text src_pk_att_vals_array ,_text src_pk_att_vals_array
......
...@@ -27,6 +27,26 @@ ...@@ -27,6 +27,26 @@
#include "dblink.h" #include "dblink.h"
/*
* Internal declarations
*/
static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results *results);
static void remove_res_ptr(dblink_results *results);
/* Global */ /* Global */
List *res_id = NIL; List *res_id = NIL;
int res_id_index = 0; int res_id_index = 0;
...@@ -281,7 +301,7 @@ PG_FUNCTION_INFO_V1(dblink_get_pkey); ...@@ -281,7 +301,7 @@ PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum Datum
dblink_get_pkey(PG_FUNCTION_ARGS) dblink_get_pkey(PG_FUNCTION_ARGS)
{ {
char *relname; text *relname_text;
Oid relid; Oid relid;
char **result; char **result;
text *result_text; text *result_text;
...@@ -294,15 +314,14 @@ dblink_get_pkey(PG_FUNCTION_ARGS) ...@@ -294,15 +314,14 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
if (fcinfo->flinfo->fn_extra == NULL) if (fcinfo->flinfo->fn_extra == NULL)
{ {
relname = NameStr(*PG_GETARG_NAME(0)); relname_text = PG_GETARG_TEXT_P(0);
/* /*
* Convert relname to rel OID. * Convert relname to rel OID.
*/ */
relid = get_relid_from_relname(relname); relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid)) if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", elog(ERROR, "dblink_get_pkey: relation does not exist");
relname);
/* /*
* get an array of attnums. * get an array of attnums.
...@@ -428,7 +447,7 @@ Datum ...@@ -428,7 +447,7 @@ Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS) dblink_build_sql_insert(PG_FUNCTION_ARGS)
{ {
Oid relid; Oid relid;
char *relname; text *relname_text;
int16 *pkattnums; int16 *pkattnums;
int16 pknumatts; int16 pknumatts;
char **src_pkattvals; char **src_pkattvals;
...@@ -446,15 +465,14 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) ...@@ -446,15 +465,14 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
char *sql; char *sql;
text *sql_text; text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0)); relname_text = PG_GETARG_TEXT_P(0);
/* /*
* Convert relname to rel OID. * Convert relname to rel OID.
*/ */
relid = get_relid_from_relname(relname); relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid)) if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", elog(ERROR, "dblink_build_sql_insert: relation does not exist");
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1); pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2); pknumatts = PG_GETARG_INT16(2);
...@@ -554,7 +572,7 @@ Datum ...@@ -554,7 +572,7 @@ Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS) dblink_build_sql_delete(PG_FUNCTION_ARGS)
{ {
Oid relid; Oid relid;
char *relname; text *relname_text;
int16 *pkattnums; int16 *pkattnums;
int16 pknumatts; int16 pknumatts;
char **tgt_pkattvals; char **tgt_pkattvals;
...@@ -567,15 +585,14 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) ...@@ -567,15 +585,14 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
char *sql; char *sql;
text *sql_text; text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0)); relname_text = PG_GETARG_TEXT_P(0);
/* /*
* Convert relname to rel OID. * Convert relname to rel OID.
*/ */
relid = get_relid_from_relname(relname); relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid)) if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", elog(ERROR, "dblink_build_sql_delete: relation does not exist");
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1); pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2); pknumatts = PG_GETARG_INT16(2);
...@@ -653,7 +670,7 @@ Datum ...@@ -653,7 +670,7 @@ Datum
dblink_build_sql_update(PG_FUNCTION_ARGS) dblink_build_sql_update(PG_FUNCTION_ARGS)
{ {
Oid relid; Oid relid;
char *relname; text *relname_text;
int16 *pkattnums; int16 *pkattnums;
int16 pknumatts; int16 pknumatts;
char **src_pkattvals; char **src_pkattvals;
...@@ -671,15 +688,14 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ...@@ -671,15 +688,14 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
char *sql; char *sql;
text *sql_text; text *sql_text;
relname = NameStr(*PG_GETARG_NAME(0)); relname_text = PG_GETARG_TEXT_P(0);
/* /*
* Convert relname to rel OID. * Convert relname to rel OID.
*/ */
relid = get_relid_from_relname(relname); relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid)) if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", elog(ERROR, "dblink_build_sql_update: relation does not exist");
relname);
pkattnums = (int16 *) PG_GETARG_POINTER(1); pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2); pknumatts = PG_GETARG_INT16(2);
...@@ -841,7 +857,7 @@ dblink_replace_text(PG_FUNCTION_ARGS) ...@@ -841,7 +857,7 @@ dblink_replace_text(PG_FUNCTION_ARGS)
* init_dblink_results * init_dblink_results
* - create an empty dblink_results data structure * - create an empty dblink_results data structure
*/ */
dblink_results * static dblink_results *
init_dblink_results(MemoryContext fn_mcxt) init_dblink_results(MemoryContext fn_mcxt)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
...@@ -866,7 +882,7 @@ init_dblink_results(MemoryContext fn_mcxt) ...@@ -866,7 +882,7 @@ init_dblink_results(MemoryContext fn_mcxt)
* init_dblink_array_results * init_dblink_array_results
* - create an empty dblink_array_results data structure * - create an empty dblink_array_results data structure
*/ */
dblink_array_results * static dblink_array_results *
init_dblink_array_results(MemoryContext fn_mcxt) init_dblink_array_results(MemoryContext fn_mcxt)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
...@@ -892,7 +908,7 @@ init_dblink_array_results(MemoryContext fn_mcxt) ...@@ -892,7 +908,7 @@ init_dblink_array_results(MemoryContext fn_mcxt)
* Get the primary key attnames for the given relation. * Get the primary key attnames for the given relation.
* Return NULL, and set numatts = 0, if no primary key exists. * Return NULL, and set numatts = 0, if no primary key exists.
*/ */
char ** static char **
get_pkey_attnames(Oid relid, int16 *numatts) get_pkey_attnames(Oid relid, int16 *numatts)
{ {
Relation indexRelation; Relation indexRelation;
...@@ -961,7 +977,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) ...@@ -961,7 +977,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
* return ord item (0 based) * return ord item (0 based)
* based on provided field separator * based on provided field separator
*/ */
char * static char *
get_strtok(char *fldtext, char *fldsep, int fldnum) get_strtok(char *fldtext, char *fldsep, int fldnum)
{ {
int j = 0; int j = 0;
...@@ -988,7 +1004,7 @@ get_strtok(char *fldtext, char *fldsep, int fldnum) ...@@ -988,7 +1004,7 @@ get_strtok(char *fldtext, char *fldsep, int fldnum)
return pstrdup(result); return pstrdup(result);
} }
char * static char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{ {
Relation rel; Relation rel;
...@@ -1059,7 +1075,7 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval ...@@ -1059,7 +1075,7 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
return (sql); return (sql);
} }
char * static char *
get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals) get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{ {
Relation rel; Relation rel;
...@@ -1112,7 +1128,7 @@ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattval ...@@ -1112,7 +1128,7 @@ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattval
return (sql); return (sql);
} }
char * static char *
get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{ {
Relation rel; Relation rel;
...@@ -1235,7 +1251,7 @@ quote_ident_cstr(char *rawstr) ...@@ -1235,7 +1251,7 @@ quote_ident_cstr(char *rawstr)
return result; return result;
} }
int16 static int16
get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
{ {
int i; int i;
...@@ -1251,7 +1267,7 @@ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) ...@@ -1251,7 +1267,7 @@ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
return -1; return -1;
} }
HeapTuple static HeapTuple
get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals) get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
{ {
Relation rel; Relation rel;
...@@ -1340,17 +1356,24 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p ...@@ -1340,17 +1356,24 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
return NULL; return NULL;
} }
Oid static Oid
get_relid_from_relname(char *relname) get_relid_from_relname(text *relname_text)
{ {
#ifdef NamespaceRelationName #ifdef NamespaceRelationName
Oid relid; RangeVar *relvar;
Relation rel;
Oid relid;
relid = RelnameGetRelid(relname); relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_relid_from_relname"));
rel = heap_openrv(relvar, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
#else #else
Relation rel; char *relname;
Oid relid; Relation rel;
Oid relid;
relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
rel = relation_openr(relname, AccessShareLock); rel = relation_openr(relname, AccessShareLock);
relid = RelationGetRelid(rel); relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock); relation_close(rel, AccessShareLock);
...@@ -1359,7 +1382,7 @@ get_relid_from_relname(char *relname) ...@@ -1359,7 +1382,7 @@ get_relid_from_relname(char *relname)
return relid; return relid;
} }
dblink_results * static dblink_results *
get_res_ptr(int32 res_id_index) get_res_ptr(int32 res_id_index)
{ {
List *ptr; List *ptr;
...@@ -1385,7 +1408,7 @@ get_res_ptr(int32 res_id_index) ...@@ -1385,7 +1408,7 @@ get_res_ptr(int32 res_id_index)
/* /*
* Add node to global List res_id * Add node to global List res_id
*/ */
void static void
append_res_ptr(dblink_results *results) append_res_ptr(dblink_results *results)
{ {
res_id = lappend(res_id, results); res_id = lappend(res_id, results);
...@@ -1395,7 +1418,7 @@ append_res_ptr(dblink_results *results) ...@@ -1395,7 +1418,7 @@ append_res_ptr(dblink_results *results)
* Remove node from global List * Remove node from global List
* using res_id_index * using res_id_index
*/ */
void static void
remove_res_ptr(dblink_results *results) remove_res_ptr(dblink_results *results)
{ {
res_id = lremove(results, res_id); res_id = lremove(results, res_id);
...@@ -1404,4 +1427,3 @@ remove_res_ptr(dblink_results *results) ...@@ -1404,4 +1427,3 @@ remove_res_ptr(dblink_results *results)
res_id_index = 0; res_id_index = 0;
} }
...@@ -120,26 +120,6 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); ...@@ -120,26 +120,6 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
extern Datum dblink_current_query(PG_FUNCTION_ARGS); extern Datum dblink_current_query(PG_FUNCTION_ARGS);
extern Datum dblink_replace_text(PG_FUNCTION_ARGS); extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
/*
* Internal declarations
*/
dblink_results *init_dblink_results(MemoryContext fn_mcxt);
dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
char **get_pkey_attnames(Oid relid, int16 *numatts);
char *get_strtok(char *fldtext, char *fldsep, int fldnum);
char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
Oid get_relid_from_relname(char *relname);
dblink_results *get_res_ptr(int32 res_id_index);
void append_res_ptr(dblink_results *results);
void remove_res_ptr(dblink_results *results);
extern char *debug_query_string; extern char *debug_query_string;
#endif /* DBLINK_H */ #endif /* DBLINK_H */
...@@ -10,7 +10,7 @@ CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text ...@@ -10,7 +10,7 @@ CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
WITH (iscachable, isstrict); WITH (iscachable, isstrict);
CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof text
AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
...@@ -18,15 +18,15 @@ CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid ...@@ -18,15 +18,15 @@ CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text CREATE OR REPLACE FUNCTION dblink_build_sql_delete (text, int2vector, int2, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _text, _text) RETURNS text
AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c' AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
WITH (isstrict); WITH (isstrict);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment