summaryrefslogtreecommitdiff
path: root/contrib/dblink/dblink.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r--contrib/dblink/dblink.c164
1 files changed, 99 insertions, 65 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 1e62d8091a9..a67a836eba1 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -70,6 +70,9 @@ typedef struct storeInfo
AttInMetadata *attinmeta;
MemoryContext tmpcontext;
char **cstrs;
+ /* temp storage for results to avoid leaks on exception */
+ PGresult *last_res;
+ PGresult *cur_res;
} storeInfo;
/*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
const char *conname,
const char *sql,
bool fail);
-static int storeHandler(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param);
+static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
+static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
/* async query send */
retval = PQsendQuery(conn, sql);
if (retval != 1)
- elog(NOTICE, "%s", PQerrorMessage(conn));
+ elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
PG_RETURN_INT32(retval);
}
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
/*
* Execute the given SQL command and store its results into a tuplestore
* to be returned as the result of the current function.
+ *
* This is equivalent to PQexec followed by materializeResult, but we make
- * use of libpq's "row processor" API to reduce per-row overhead.
+ * use of libpq's single-row mode to avoid accumulating the whole result
+ * inside libpq before it gets transferred to the tuplestore.
*/
static void
materializeQueryResult(FunctionCallInfo fcinfo,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
/* prepTuplestoreResult must have been called previously */
Assert(rsinfo->returnMode == SFRM_Materialize);
+ /* initialize storeInfo to empty */
+ memset(&sinfo, 0, sizeof(sinfo));
+ sinfo.fcinfo = fcinfo;
+
PG_TRY();
{
- /* initialize storeInfo to empty */
- memset(&sinfo, 0, sizeof(sinfo));
- sinfo.fcinfo = fcinfo;
-
- /* We'll collect tuples using storeHandler */
- PQsetRowProcessor(conn, storeHandler, &sinfo);
-
- res = PQexec(conn, sql);
-
- /* We don't keep the custom row processor installed permanently */
- PQsetRowProcessor(conn, NULL, NULL);
+ /* execute query, collecting any tuples into the tuplestore */
+ res = storeQueryResult(&sinfo, conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/*
- * storeHandler didn't get called, so we need to convert the
- * command status string to a tuple manually
+ * storeRow didn't get called, so we need to convert the command
+ * status string to a tuple manually
*/
TupleDesc tupdesc;
AttInMetadata *attinmeta;
@@ -1008,25 +1008,30 @@ materializeQueryResult(FunctionCallInfo fcinfo,
tuplestore_puttuple(tupstore, tuple);
PQclear(res);
+ res = NULL;
}
else
{
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
- /* storeHandler should have created a tuplestore */
+ /* storeRow should have created a tuplestore */
Assert(rsinfo->setResult != NULL);
PQclear(res);
+ res = NULL;
}
+ PQclear(sinfo.last_res);
+ sinfo.last_res = NULL;
+ PQclear(sinfo.cur_res);
+ sinfo.cur_res = NULL;
}
PG_CATCH();
{
- /* be sure to unset the custom row processor */
- PQsetRowProcessor(conn, NULL, NULL);
/* be sure to release any libpq result we collected */
- if (res)
- PQclear(res);
+ PQclear(res);
+ PQclear(sinfo.last_res);
+ PQclear(sinfo.cur_res);
/* and clear out any pending data in libpq */
- while ((res = PQskipResult(conn)) != NULL)
+ while ((res = PQgetResult(conn)) != NULL)
PQclear(res);
PG_RE_THROW();
}
@@ -1034,23 +1039,72 @@ materializeQueryResult(FunctionCallInfo fcinfo,
}
/*
- * Custom row processor for materializeQueryResult.
- * Prototype of this function must match PQrowProcessor.
+ * Execute query, and send any result rows to sinfo->tuplestore.
*/
-static int
-storeHandler(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param)
+static PGresult *
+storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
+{
+ bool first = true;
+ PGresult *res;
+
+ if (!PQsendQuery(conn, sql))
+ elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
+
+ if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
+ elog(ERROR, "failed to set single-row mode for dblink query");
+
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ sinfo->cur_res = PQgetResult(conn);
+ if (!sinfo->cur_res)
+ break;
+
+ if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
+ {
+ /* got one row from possibly-bigger resultset */
+ storeRow(sinfo, sinfo->cur_res, first);
+
+ PQclear(sinfo->cur_res);
+ sinfo->cur_res = NULL;
+ first = false;
+ }
+ else
+ {
+ /* if empty resultset, fill tuplestore header */
+ if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
+ storeRow(sinfo, sinfo->cur_res, first);
+
+ /* store completed result at last_res */
+ PQclear(sinfo->last_res);
+ sinfo->last_res = sinfo->cur_res;
+ sinfo->cur_res = NULL;
+ first = true;
+ }
+ }
+
+ /* return last_res */
+ res = sinfo->last_res;
+ sinfo->last_res = NULL;
+ return res;
+}
+
+/*
+ * Send single row to sinfo->tuplestore.
+ *
+ * If "first" is true, create the tuplestore using PGresult's metadata
+ * (in this case the PGresult might contain either zero or one row).
+ */
+static void
+storeRow(storeInfo *sinfo, PGresult *res, bool first)
{
- storeInfo *sinfo = (storeInfo *) param;
int nfields = PQnfields(res);
- char **cstrs = sinfo->cstrs;
HeapTuple tuple;
- char *pbuf;
- int pbuflen;
int i;
MemoryContext oldcontext;
- if (columns == NULL)
+ if (first)
{
/* Prepare for new result set */
ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* Create a new, empty tuplestore */
- oldcontext = MemoryContextSwitchTo(
- rsinfo->econtext->ecxt_per_query_memory);
+ oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->setResult = sinfo->tuplestore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
+ /* Done if empty resultset */
+ if (PQntuples(res) == 0)
+ return;
+
/*
* Set up sufficiently-wide string pointers array; this won't change
* in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
-
- return 1;
}
- CHECK_FOR_INTERRUPTS();
+ /* Should have a single-row result if we get here */
+ Assert(PQntuples(res) == 1);
/*
* Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
/*
- * The strings passed to us are not null-terminated, but the datatype
- * input functions we're about to call require null termination. Copy the
- * strings and add null termination. As a micro-optimization, allocate
- * all the strings with one palloc.
+ * Fill cstrs with null-terminated strings of column values.
*/
- pbuflen = nfields; /* count the null terminators themselves */
for (i = 0; i < nfields; i++)
{
- int len = columns[i].len;
-
- if (len > 0)
- pbuflen += len;
- }
- pbuf = (char *) palloc(pbuflen);
-
- for (i = 0; i < nfields; i++)
- {
- int len = columns[i].len;
-
- if (len < 0)
- cstrs[i] = NULL;
+ if (PQgetisnull(res, 0, i))
+ sinfo->cstrs[i] = NULL;
else
- {
- cstrs[i] = pbuf;
- memcpy(pbuf, columns[i].value, len);
- pbuf += len;
- *pbuf++ = '\0';
- }
+ sinfo->cstrs[i] = PQgetvalue(res, 0, i);
}
/* Convert row to a tuple, and add it to the tuplestore */
- tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
tuplestore_puttuple(sinfo->tuplestore, tuple);
/* Clean up */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(sinfo->tmpcontext);
-
- return 1;
}
/*