diff options
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r-- | contrib/dblink/dblink.c | 164 |
1 files changed, 99 insertions, 65 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 1e62d8091a9..a67a836eba1 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -70,6 +70,9 @@ typedef struct storeInfo AttInMetadata *attinmeta; MemoryContext tmpcontext; char **cstrs; + /* temp storage for results to avoid leaks on exception */ + PGresult *last_res; + PGresult *cur_res; } storeInfo; /* @@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo, const char *conname, const char *sql, bool fail); -static int storeHandler(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); +static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql); +static void storeRow(storeInfo *sinfo, PGresult *res, bool first); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); @@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS) /* async query send */ retval = PQsendQuery(conn, sql); if (retval != 1) - elog(NOTICE, "%s", PQerrorMessage(conn)); + elog(NOTICE, "could not send query: %s", PQerrorMessage(conn)); PG_RETURN_INT32(retval); } @@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res) /* * Execute the given SQL command and store its results into a tuplestore * to be returned as the result of the current function. + * * This is equivalent to PQexec followed by materializeResult, but we make - * use of libpq's "row processor" API to reduce per-row overhead. + * use of libpq's single-row mode to avoid accumulating the whole result + * inside libpq before it gets transferred to the tuplestore. */ static void materializeQueryResult(FunctionCallInfo fcinfo, @@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo, /* prepTuplestoreResult must have been called previously */ Assert(rsinfo->returnMode == SFRM_Materialize); + /* initialize storeInfo to empty */ + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.fcinfo = fcinfo; + PG_TRY(); { - /* initialize storeInfo to empty */ - memset(&sinfo, 0, sizeof(sinfo)); - sinfo.fcinfo = fcinfo; - - /* We'll collect tuples using storeHandler */ - PQsetRowProcessor(conn, storeHandler, &sinfo); - - res = PQexec(conn, sql); - - /* We don't keep the custom row processor installed permanently */ - PQsetRowProcessor(conn, NULL, NULL); + /* execute query, collecting any tuples into the tuplestore */ + res = storeQueryResult(&sinfo, conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && @@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo, else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* - * storeHandler didn't get called, so we need to convert the - * command status string to a tuple manually + * storeRow didn't get called, so we need to convert the command + * status string to a tuple manually */ TupleDesc tupdesc; AttInMetadata *attinmeta; @@ -1008,25 +1008,30 @@ materializeQueryResult(FunctionCallInfo fcinfo, tuplestore_puttuple(tupstore, tuple); PQclear(res); + res = NULL; } else { Assert(PQresultStatus(res) == PGRES_TUPLES_OK); - /* storeHandler should have created a tuplestore */ + /* storeRow should have created a tuplestore */ Assert(rsinfo->setResult != NULL); PQclear(res); + res = NULL; } + PQclear(sinfo.last_res); + sinfo.last_res = NULL; + PQclear(sinfo.cur_res); + sinfo.cur_res = NULL; } PG_CATCH(); { - /* be sure to unset the custom row processor */ - PQsetRowProcessor(conn, NULL, NULL); /* be sure to release any libpq result we collected */ - if (res) - PQclear(res); + PQclear(res); + PQclear(sinfo.last_res); + PQclear(sinfo.cur_res); /* and clear out any pending data in libpq */ - while ((res = PQskipResult(conn)) != NULL) + while ((res = PQgetResult(conn)) != NULL) PQclear(res); PG_RE_THROW(); } @@ -1034,23 +1039,72 @@ materializeQueryResult(FunctionCallInfo fcinfo, } /* - * Custom row processor for materializeQueryResult. - * Prototype of this function must match PQrowProcessor. + * Execute query, and send any result rows to sinfo->tuplestore. */ -static int -storeHandler(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) +static PGresult * +storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql) +{ + bool first = true; + PGresult *res; + + if (!PQsendQuery(conn, sql)) + elog(ERROR, "could not send query: %s", PQerrorMessage(conn)); + + if (!PQsetSingleRowMode(conn)) /* shouldn't fail */ + elog(ERROR, "failed to set single-row mode for dblink query"); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + sinfo->cur_res = PQgetResult(conn); + if (!sinfo->cur_res) + break; + + if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE) + { + /* got one row from possibly-bigger resultset */ + storeRow(sinfo, sinfo->cur_res, first); + + PQclear(sinfo->cur_res); + sinfo->cur_res = NULL; + first = false; + } + else + { + /* if empty resultset, fill tuplestore header */ + if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK) + storeRow(sinfo, sinfo->cur_res, first); + + /* store completed result at last_res */ + PQclear(sinfo->last_res); + sinfo->last_res = sinfo->cur_res; + sinfo->cur_res = NULL; + first = true; + } + } + + /* return last_res */ + res = sinfo->last_res; + sinfo->last_res = NULL; + return res; +} + +/* + * Send single row to sinfo->tuplestore. + * + * If "first" is true, create the tuplestore using PGresult's metadata + * (in this case the PGresult might contain either zero or one row). + */ +static void +storeRow(storeInfo *sinfo, PGresult *res, bool first) { - storeInfo *sinfo = (storeInfo *) param; int nfields = PQnfields(res); - char **cstrs = sinfo->cstrs; HeapTuple tuple; - char *pbuf; - int pbuflen; int i; MemoryContext oldcontext; - if (columns == NULL) + if (first) { /* Prepare for new result set */ ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo; @@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns, sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Create a new, empty tuplestore */ - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); rsinfo->setResult = sinfo->tuplestore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); + /* Done if empty resultset */ + if (PQntuples(res) == 0) + return; + /* * Set up sufficiently-wide string pointers array; this won't change * in size so it's easy to preallocate. @@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns, ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - - return 1; } - CHECK_FOR_INTERRUPTS(); + /* Should have a single-row result if we get here */ + Assert(PQntuples(res) == 1); /* * Do the following work in a temp context that we reset after each tuple. @@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns, oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext); /* - * The strings passed to us are not null-terminated, but the datatype - * input functions we're about to call require null termination. Copy the - * strings and add null termination. As a micro-optimization, allocate - * all the strings with one palloc. + * Fill cstrs with null-terminated strings of column values. */ - pbuflen = nfields; /* count the null terminators themselves */ for (i = 0; i < nfields; i++) { - int len = columns[i].len; - - if (len > 0) - pbuflen += len; - } - pbuf = (char *) palloc(pbuflen); - - for (i = 0; i < nfields; i++) - { - int len = columns[i].len; - - if (len < 0) - cstrs[i] = NULL; + if (PQgetisnull(res, 0, i)) + sinfo->cstrs[i] = NULL; else - { - cstrs[i] = pbuf; - memcpy(pbuf, columns[i].value, len); - pbuf += len; - *pbuf++ = '\0'; - } + sinfo->cstrs[i] = PQgetvalue(res, 0, i); } /* Convert row to a tuple, and add it to the tuplestore */ - tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs); tuplestore_puttuple(sinfo->tuplestore, tuple); /* Clean up */ MemoryContextSwitchTo(oldcontext); MemoryContextReset(sinfo->tmpcontext); - - return 1; } /* |