diff options
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r-- | contrib/dblink/dblink.c | 1094 |
1 files changed, 803 insertions, 291 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 0401e06f4f1..a6ede5ae1c1 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -3,7 +3,9 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, + * Joe Conway <mail@joeconway.com> + * + * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -25,16 +27,39 @@ * */ -#include "dblink.h" +#include <string.h> +#include "postgres.h" +#include "libpq-fe.h" +#include "libpq-int.h" +#include "fmgr.h" +#include "funcapi.h" +#include "access/tupdesc.h" +#include "access/heapam.h" +#include "catalog/catname.h" +#include "catalog/namespace.h" +#include "catalog/pg_index.h" +#include "catalog/pg_type.h" +#include "executor/executor.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "nodes/nodes.h" +#include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "parser/parse_type.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/array.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "dblink.h" /* * Internal declarations */ static dblink_results *init_dblink_results(MemoryContext fn_mcxt); -static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt); static char **get_pkey_attnames(Oid relid, int16 *numatts); -static char *get_strtok(char *fldtext, char *fldsep, int fldnum); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); @@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); -static dblink_results *get_res_ptr(int32 res_id_index); +static dblink_results *get_res_ptr(int32 res_id_index); static void append_res_ptr(dblink_results *results); static void remove_res_ptr(dblink_results *results); +static TupleDesc pgresultGetTupleDesc(PGresult *res); /* Global */ -List *res_id = NIL; -int res_id_index = 0; +List *res_id = NIL; +int res_id_index = 0; +PGconn *persistent_conn = NULL; + +#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) +#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) +#define xpfree(var_) \ + do { \ + if (var_ != NULL) \ + { \ + pfree(var_); \ + var_ = NULL; \ + } \ + } while (0) + + +/* + * Create a persistent connection to another database + */ +PG_FUNCTION_INFO_V1(dblink_connect); +Datum +dblink_connect(PG_FUNCTION_ARGS) +{ + char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); + char *msg; + text *result_text; + MemoryContext oldcontext; + + if (persistent_conn != NULL) + PQfinish(persistent_conn); + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + persistent_conn = PQconnectdb(connstr); + MemoryContextSwitchTo(oldcontext); + + if (PQstatus(persistent_conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(persistent_conn)); + PQfinish(persistent_conn); + persistent_conn = NULL; + elog(ERROR, "dblink_connect: connection error: %s", msg); + } + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Clear a persistent connection to another database + */ +PG_FUNCTION_INFO_V1(dblink_disconnect); +Datum +dblink_disconnect(PG_FUNCTION_ARGS) +{ + text *result_text; + + if (persistent_conn != NULL) + PQfinish(persistent_conn); + + persistent_conn = NULL; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * opens a cursor using a persistent connection + */ +PG_FUNCTION_INFO_V1(dblink_open); +Datum +dblink_open(PG_FUNCTION_ARGS) +{ + char *msg; + PGresult *res = NULL; + PGconn *conn = NULL; + text *result_text; + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + char *sql = GET_STR(PG_GETARG_TEXT_P(1)); + StringInfo str = makeStringInfo(); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_open: no connection available"); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_open: begin error: %s", msg); + } + PQclear(res); + + appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql); + res = PQexec(conn, str->data); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + + PQclear(res); + + PQfinish(conn); + persistent_conn = NULL; + + elog(ERROR, "dblink: sql error: %s", msg); + } + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} +/* + * closes a cursor + */ +PG_FUNCTION_INFO_V1(dblink_close); +Datum +dblink_close(PG_FUNCTION_ARGS) +{ + PGconn *conn = NULL; + PGresult *res = NULL; + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + StringInfo str = makeStringInfo(); + text *result_text; + char *msg; + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_close: no connection available"); + + appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname)); + + /* close the cursor */ + res = PQexec(conn, str->data); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_close: sql error: %s", msg); + } + + PQclear(res); + + /* commit the transaction */ + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_close: commit error: %s", msg); + } + PQclear(res); + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Fetch results from an open cursor + */ +PG_FUNCTION_INFO_V1(dblink_fetch); +Datum +dblink_fetch(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + TupleDesc tupdesc = NULL; + int call_cntr; + int max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + char *msg; + PGresult *res = NULL; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + Oid functypeid; + char functyptype; + Oid funcid = fcinfo->flinfo->fn_oid; + PGconn *conn = NULL; + StringInfo str = makeStringInfo(); + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + int howmany = PG_GETARG_INT32(1); + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_fetch: no connection available"); + + appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname)); + + res = PQexec(conn, str->data); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_fetch: sql error: %s", msg); + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* cursor does not exist - closed already or bad name */ + PQclear(res); + elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname)); + } + + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* fast track when no results */ + if (funcctx->max_calls < 1) + SRF_RETURN_DONE(funcctx); + + /* check typtype to see if we have a predetermined return type */ + functypeid = get_func_rettype(funcid); + functyptype = get_typtype(functypeid); + + if (functyptype == 'c') + tupdesc = TypeGetTupleDesc(functypeid, NIL); + else if (functyptype == 'p' && functypeid == RECORDOID) + tupdesc = pgresultGetTupleDesc(res); + else if (functyptype == 'b') + elog(ERROR, "dblink_fetch: invalid kind of return type specified for function"); + else + elog(ERROR, "dblink_fetch: unknown kind of return type specified for function"); + + /* store needed metadata for subsequent calls */ + slot = TupleDescGetSlot(tupdesc); + funcctx->slot = slot; + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + slot = funcctx->slot; + + res = (PGresult *) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + tupdesc = attinmeta->tupdesc; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + int i; + int nfields = PQnfields(res); + + values = (char **) palloc(nfields * sizeof(char *)); + for (i = 0; i < nfields; i++) + { + if (PQgetisnull(res, call_cntr, i) == 0) + values[i] = PQgetvalue(res, call_cntr, i); + else + values[i] = NULL; + } + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else /* do when there is no more left */ + { + PQclear(res); + SRF_RETURN_DONE(funcctx); + } +} + +/* + * Note: this is the new preferred version of dblink + */ +PG_FUNCTION_INFO_V1(dblink_record); +Datum +dblink_record(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + TupleDesc tupdesc = NULL; + int call_cntr; + int max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + char *msg; + PGresult *res = NULL; + bool is_sql_cmd = false; + char *sql_cmd_status = NULL; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + Oid functypeid; + char functyptype; + Oid funcid = fcinfo->flinfo->fn_oid; + PGconn *conn = NULL; + char *connstr = NULL; + char *sql = NULL; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + if (fcinfo->nargs == 2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); + + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + elog(ERROR, "dblink: connection error: %s", msg); + } + } + else if (fcinfo->nargs == 1) + { + sql = GET_STR(PG_GETARG_TEXT_P(0)); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink: no connection available"); + } + else + elog(ERROR, "dblink: wrong number of arguments"); + + res = PQexec(conn, sql); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + if (fcinfo->nargs == 1) + persistent_conn = NULL; + + elog(ERROR, "dblink: sql error: %s", msg); + } + else + { + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); + + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; + } + else + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* if needed, close the connection to the database and cleanup */ + if (fcinfo->nargs == 2) + PQfinish(conn); + } + + /* fast track when no results */ + if (funcctx->max_calls < 1) + SRF_RETURN_DONE(funcctx); + + /* check typtype to see if we have a predetermined return type */ + functypeid = get_func_rettype(funcid); + functyptype = get_typtype(functypeid); + + if (!is_sql_cmd) + { + if (functyptype == 'c') + tupdesc = TypeGetTupleDesc(functypeid, NIL); + else if (functyptype == 'p' && functypeid == RECORDOID) + tupdesc = pgresultGetTupleDesc(res); + else if (functyptype == 'b') + elog(ERROR, "Invalid kind of return type specified for function"); + else + elog(ERROR, "Unknown kind of return type specified for function"); + } + + /* store needed metadata for subsequent calls */ + slot = TupleDescGetSlot(tupdesc); + funcctx->slot = slot; + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + slot = funcctx->slot; + + res = (PGresult *) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + tupdesc = attinmeta->tupdesc; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + + if (!is_sql_cmd) + { + int i; + int nfields = PQnfields(res); + + values = (char **) palloc(nfields * sizeof(char *)); + for (i = 0; i < nfields; i++) + { + if (PQgetisnull(res, call_cntr, i) == 0) + values[i] = PQgetvalue(res, call_cntr, i); + else + values[i] = NULL; + } + } + else + { + values = (char **) palloc(1 * sizeof(char *)); + values[0] = sql_cmd_status; + } + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else /* do when there is no more left */ + { + PQclear(res); + SRF_RETURN_DONE(funcctx); + } +} + +/* + * Execute an SQL non-SELECT command + */ +PG_FUNCTION_INFO_V1(dblink_exec); +Datum +dblink_exec(PG_FUNCTION_ARGS) +{ + char *msg; + PGresult *res = NULL; + char *sql_cmd_status = NULL; + TupleDesc tupdesc = NULL; + text *result_text; + PGconn *conn = NULL; + char *connstr = NULL; + char *sql = NULL; + + if (fcinfo->nargs == 2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); + + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + elog(ERROR, "dblink_exec: connection error: %s", msg); + } + } + else if (fcinfo->nargs == 1) + { + sql = GET_STR(PG_GETARG_TEXT_P(0)); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_exec: no connection available"); + } + else + elog(ERROR, "dblink_exec: wrong number of arguments"); + + + res = PQexec(conn, sql); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + if (fcinfo->nargs == 1) + persistent_conn = NULL; + + elog(ERROR, "dblink_exec: sql error: %s", msg); + } + else + { + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); + + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + } + else + elog(ERROR, "dblink_exec: queries returning results not allowed"); + } + PQclear(res); + + /* if needed, close the connection to the database and cleanup */ + if (fcinfo->nargs == 2) + PQfinish(conn); + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Note: this original version of dblink is DEPRECATED; + * it *will* be removed in favor of the new version on next release + */ PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) @@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } - /* + * Note: dblink_tok is DEPRECATED; + * it *will* be removed in favor of the new version on next release + * * dblink_tok * parse dblink output string * return fldnum item (0 based) * based on provided field separator */ - PG_FUNCTION_INFO_V1(dblink_tok); Datum dblink_tok(PG_FUNCTION_ARGS) @@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS) } } - -/* - * dblink_strtok - * parse input string - * return ord item (0 based) - * based on provided field separator - */ -PG_FUNCTION_INFO_V1(dblink_strtok); -Datum -dblink_strtok(PG_FUNCTION_ARGS) -{ - char *fldtext; - char *fldsep; - int fldnum; - char *buffer; - text *result_text; - - fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); - fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); - fldnum = PG_GETARG_INT32(2); - - if (fldtext[0] == '\0') - { - elog(ERROR, "get_strtok: blank list not permitted"); - } - if (fldsep[0] == '\0') - { - elog(ERROR, "get_strtok: blank field separator not permitted"); - } - - buffer = get_strtok(fldtext, fldsep, fldnum); - - pfree(fldtext); - pfree(fldsep); - - if (buffer == NULL) - { - PG_RETURN_NULL(); - } - else - { - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer))); - pfree(buffer); - - PG_RETURN_TEXT_P(result_text); - } -} - - /* * dblink_get_pkey * - * Return comma delimited list of primary key - * fields for the supplied relation, + * Return list of primary key fields for the supplied relation, * or NULL if none exists. */ PG_FUNCTION_INFO_V1(dblink_get_pkey); Datum dblink_get_pkey(PG_FUNCTION_ARGS) { - text *relname_text; - Oid relid; - char **result; - text *result_text; - int16 numatts; - ReturnSetInfo *rsi; - dblink_array_results *ret_set; + int16 numatts; + Oid relid; + char **results; + FuncCallContext *funcctx; + int32 call_cntr; + int32 max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc = NULL; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* convert relname to rel Oid */ + relid = get_relid_from_relname(PG_GETARG_TEXT_P(0)); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation does not exist"); - if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) - elog(ERROR, "dblink: function called in context that does not accept a set result"); + /* need a tuple descriptor representing one INT and one TEXT column */ + tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position", + INT4OID, -1, 0, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname", + TEXTOID, -1, 0, false); - if (fcinfo->flinfo->fn_extra == NULL) - { - relname_text = PG_GETARG_TEXT_P(0); + /* allocate a slot for a tuple with this tupdesc */ + slot = TupleDescGetSlot(tupdesc); - /* - * Convert relname to rel OID. - */ - relid = get_relid_from_relname(relname_text); - if (!OidIsValid(relid)) - elog(ERROR, "dblink_get_pkey: relation does not exist"); + /* assign slot to function context */ + funcctx->slot = slot; /* - * get an array of attnums. + * Generate attribute metadata needed later to produce tuples from raw + * C strings */ - result = get_pkey_attnames(relid, &numatts); + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + /* get an array of attnums */ + results = get_pkey_attnames(relid, &numatts); - if ((result != NULL) && (numatts > 0)) + if ((results != NULL) && (numatts > 0)) { - ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); + funcctx->max_calls = numatts; - ret_set->elem_num = 0; - ret_set->num_elems = numatts; - ret_set->res = result; + /* got results, keep track of them */ + funcctx->user_fctx = results; + } + else /* fast track when no results */ + SRF_RETURN_DONE(funcctx); - fcinfo->flinfo->fn_extra = (void *) ret_set; + MemoryContextSwitchTo(oldcontext); + } - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; - PG_RETURN_TEXT_P(result_text); - } - else - { - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; + slot = funcctx->slot; - PG_RETURN_NULL(); - } - } - else - { - /* - * check for more results - */ - ret_set = fcinfo->flinfo->fn_extra; - ret_set->elem_num++; - result = ret_set->res; + results = (char **) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; - if (ret_set->elem_num < ret_set->num_elems) - { - /* - * fetch next one - */ - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); - PG_RETURN_TEXT_P(result_text); - } - else - { - int i; + values = (char **) palloc(2 * sizeof(char *)); + values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */ - /* - * or if no more, clean things up - */ - for (i = 0; i < ret_set->num_elems; i++) - pfree(result[i]); + sprintf(values[0], "%d", call_cntr + 1); - pfree(ret_set->res); - pfree(ret_set); + values[1] = results[call_cntr]; - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); - PG_RETURN_NULL(); - } + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); } - PG_RETURN_NULL(); + else /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); } - /* + * Note: dblink_last_oid is DEPRECATED; + * it *will* be removed on next release + * * dblink_last_oid * return last inserted oid */ @@ -447,23 +1011,26 @@ Datum dblink_build_sql_insert(PG_FUNCTION_ARGS) { Oid relid; - text *relname_text; - int16 *pkattnums; + text *relname_text; + int16 *pkattnums; int16 pknumatts; - char **src_pkattvals; - char **tgt_pkattvals; - ArrayType *src_pkattvals_arry; - ArrayType *tgt_pkattvals_arry; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; int src_ndim; - int *src_dim; + int *src_dim; int src_nitems; int tgt_ndim; int *tgt_dim; int tgt_nitems; int i; - char *ptr; - char *sql; - text *sql_text; + char *ptr; + char *sql; + text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), + &typlen, &typbyval, &typalign); + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) char *ptr; char *sql; text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) char *ptr; char *sql; text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), + &typlen, &typbyval, &typalign); + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(sql_text); } - /* * dblink_current_query * return the current query string @@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS) } -/* - * dblink_replace_text - * replace all occurences of 'old_sub_str' in 'orig_str' - * with 'new_sub_str' to form 'new_str' - * - * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == '' - * otherwise returns 'new_str' - */ -PG_FUNCTION_INFO_V1(dblink_replace_text); -Datum -dblink_replace_text(PG_FUNCTION_ARGS) -{ - text *left_text; - text *right_text; - text *buf_text; - text *ret_text; - char *ret_str; - int curr_posn; - text *src_text = PG_GETARG_TEXT_P(0); - int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text))); - text *from_sub_text = PG_GETARG_TEXT_P(1); - int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text))); - text *to_sub_text = PG_GETARG_TEXT_P(2); - char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text))); - StringInfo str = makeStringInfo(); - - if (src_text_len == 0 || from_sub_text_len == 0) - PG_RETURN_TEXT_P(src_text); - - buf_text = DatumGetTextPCopy(PointerGetDatum(src_text)); - curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); - - while (curr_posn > 0) - { - left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); - right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1)); - - appendStringInfo(str, "%s", - DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text)))); - appendStringInfo(str, "%s", to_sub_str); - - pfree(buf_text); - pfree(left_text); - buf_text = right_text; - curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); - } - - appendStringInfo(str, "%s", - DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text)))); - pfree(buf_text); - - ret_str = pstrdup(str->data); - ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str))); - - PG_RETURN_TEXT_P(ret_text); -} - - /************************************************************* * internal functions */ @@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt) return retval; } - -/* - * init_dblink_array_results - * - create an empty dblink_array_results data structure - */ -static dblink_array_results * -init_dblink_array_results(MemoryContext fn_mcxt) -{ - MemoryContext oldcontext; - dblink_array_results *retval; - - oldcontext = MemoryContextSwitchTo(fn_mcxt); - - retval = (dblink_array_results *) palloc(sizeof(dblink_array_results)); - MemSet(retval, 0, sizeof(dblink_array_results)); - - retval->elem_num = -1; - retval->num_elems = 0; - retval->res = NULL; - - MemoryContextSwitchTo(oldcontext); - - return retval; -} - /* * get_pkey_attnames * @@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts) Relation rel; TupleDesc tupdesc; - /* - * Open relation using relid, get tupdesc - */ + /* open relation using relid, get tupdesc */ rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; - /* - * Initialize numatts to 0 in case no primary key - * exists - */ + /* initialize numatts to 0 in case no primary key exists */ *numatts = 0; - /* - * Use relid to get all related indexes - */ + /* use relid to get all related indexes */ indexRelation = heap_openr(IndexRelationName, AccessShareLock); ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, F_OIDEQ, ObjectIdGetDatum(relid)); @@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) { Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); - /* - * We're only interested if it is the primary key - */ + /* we're only interested if it is the primary key */ if (index->indisprimary == TRUE) { i = 0; @@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) if (*numatts > 0) { result = (char **) palloc(*numatts * sizeof(char *)); + for (i = 0; i < *numatts; i++) result[i] = SPI_fname(tupdesc, index->indkey[i]); } @@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts) return result; } - -/* - * get_strtok - * - * parse input string - * return ord item (0 based) - * based on provided field separator - */ -static char * -get_strtok(char *fldtext, char *fldsep, int fldnum) -{ - int j = 0; - char *result; - - if (fldnum < 0) - { - elog(ERROR, "get_strtok: field number < 0 not permitted"); - } - - if (fldsep[0] == '\0') - { - elog(ERROR, "get_strtok: blank field separator not permitted"); - } - - result = strtok(fldtext, fldsep); - for (j = 1; j < fldnum + 1; j++) - { - result = strtok(NULL, fldsep); - if (result == NULL) - return NULL; - } - - return pstrdup(result); -} - static char * get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { @@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + if (!tuple) + elog(ERROR, "dblink_build_sql_insert: row not found"); appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); @@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + if (!tuple) + elog(ERROR, "dblink_build_sql_update: row not found"); appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); @@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); - tupdesc = rel->rd_att; + tupdesc = CreateTupleDescCopy(rel->rd_att); + relation_close(rel, AccessShareLock); /* * Connect to SPI manager @@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p static Oid get_relid_from_relname(text *relname_text) { -#ifdef NamespaceRelationName RangeVar *relvar; Relation rel; Oid relid; @@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text) rel = heap_openrv(relvar, AccessShareLock); relid = RelationGetRelid(rel); relation_close(rel, AccessShareLock); -#else - char *relname; - Relation rel; - Oid relid; - - relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text))); - rel = relation_openr(relname, AccessShareLock); - relid = RelationGetRelid(rel); - relation_close(rel, AccessShareLock); -#endif /* NamespaceRelationName */ return relid; } @@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results) res_id_index = 0; } +static TupleDesc +pgresultGetTupleDesc(PGresult *res) +{ + int natts; + AttrNumber attnum; + TupleDesc desc; + char *attname; + int32 atttypmod; + int attdim; + bool attisset; + Oid atttypid; + int i; + + /* + * allocate a new tuple descriptor + */ + natts = PQnfields(res); + if (natts < 1) + elog(ERROR, "cannot create a description for empty results"); + + desc = CreateTemplateTupleDesc(natts, WITHOUTOID); + + attnum = 0; + + for (i = 0; i < natts; i++) + { + /* + * for each field, get the name and type information from the query + * result and have TupleDescInitEntry fill in the attribute + * information we need. + */ + attnum++; + + attname = PQfname(res, i); + atttypid = PQftype(res, i); + atttypmod = PQfmod(res, i); + + if (PQfsize(res, i) != get_typlen(atttypid)) + elog(ERROR, "Size of remote field \"%s\" does not match size " + "of local type \"%s\"", + attname, + format_type_with_typemod(atttypid, atttypmod)); + + attdim = 0; + attisset = false; + + TupleDescInitEntry(desc, attnum, attname, atttypid, + atttypmod, attdim, attisset); + } + + return desc; +} |