diff options
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 219 |
1 files changed, 102 insertions, 117 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index badc0b32a8e..53516db7234 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -38,7 +38,8 @@ char *const pgresStatus[] = { "PGRES_BAD_RESPONSE", "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", - "PGRES_COPY_BOTH" + "PGRES_COPY_BOTH", + "PGRES_SINGLE_TUPLE" }; /* @@ -51,8 +52,6 @@ static bool static_std_strings = false; static PGEvent *dupEvents(PGEvent *events, int count); static bool pqAddTuple(PGresult *res, PGresAttValue *tup); -static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); static bool PQsendQueryStart(PGconn *conn); static int PQsendQueryGuts(PGconn *conn, const char *command, @@ -64,8 +63,6 @@ static int PQsendQueryGuts(PGconn *conn, const int *paramFormats, int resultFormat); static void parseInput(PGconn *conn); -static int dummyRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); static bool PQexecStart(PGconn *conn); static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, @@ -181,6 +178,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_OUT: case PGRES_COPY_IN: case PGRES_COPY_BOTH: + case PGRES_SINGLE_TUPLE: /* non-error cases */ break; default: @@ -698,6 +696,8 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. + * + * Any "next" result gets cleared too. */ void pqClearAsyncResult(PGconn *conn) @@ -705,6 +705,9 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result = NULL; + if (conn->next_result) + PQclear(conn->next_result); + conn->next_result = NULL; } /* @@ -758,7 +761,6 @@ pqPrepareAsyncResult(PGconn *conn) * conn->errorMessage. */ res = conn->result; - conn->result = NULL; /* handing over ownership to caller */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -771,6 +773,16 @@ pqPrepareAsyncResult(PGconn *conn) appendPQExpBufferStr(&conn->errorMessage, PQresultErrorMessage(res)); } + + /* + * Replace conn->result with next_result, if any. In the normal case + * there isn't a next result and we're just dropping ownership of the + * current result. In single-row mode this restores the situation to what + * it was before we created the current single-row result. + */ + conn->result = conn->next_result; + conn->next_result = NULL; + return res; } @@ -981,85 +993,55 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) /* - * PQsetRowProcessor - * Set function that copies row data out from the network buffer, - * along with a passthrough parameter for it. - */ -void -PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) -{ - if (!conn) - return; - - if (func) - { - /* set custom row processor */ - conn->rowProcessor = func; - conn->rowProcessorParam = param; - } - else - { - /* set default row processor */ - conn->rowProcessor = pqStdRowProcessor; - conn->rowProcessorParam = conn; - } -} - -/* - * PQgetRowProcessor - * Get current row processor of PGconn. - * If param is not NULL, also store the passthrough parameter at *param. - */ -PQrowProcessor -PQgetRowProcessor(const PGconn *conn, void **param) -{ - if (!conn) - { - if (param) - *param = NULL; - return NULL; - } - - if (param) - *param = conn->rowProcessorParam; - return conn->rowProcessor; -} - -/* - * pqStdRowProcessor - * Add the received row to the PGresult structure - * Returns 1 if OK, -1 if error occurred. + * pqRowProcessor + * Add the received row to the current async result (conn->result). + * Returns 1 if OK, 0 if error occurred. + * + * On error, *errmsgp can be set to an error string to be returned. + * If it is left NULL, the error is presumed to be "out of memory". * - * Note: "param" should point to the PGconn, but we don't actually need that - * as of the current coding. + * In single-row mode, we create a new result holding just the current row, + * stashing the previous result in conn->next_result so that it becomes + * active again after pqPrepareAsyncResult(). This allows the result metadata + * (column descriptions) to be carried forward to each result row. */ -static int -pqStdRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) +int +pqRowProcessor(PGconn *conn, const char **errmsgp) { + PGresult *res = conn->result; int nfields = res->numAttributes; + const PGdataValue *columns = conn->rowBuf; PGresAttValue *tup; int i; - if (columns == NULL) + /* + * In single-row mode, make a new PGresult that will hold just this one + * row; the original conn->result is left unchanged so that it can be used + * again as the template for future rows. + */ + if (conn->singleRowMode) { - /* New result set ... we have nothing to do in this function. */ - return 1; + /* Copy everything that should be in the result at this point */ + res = PQcopyResult(res, + PG_COPYRES_ATTRS | PG_COPYRES_EVENTS | + PG_COPYRES_NOTICEHOOKS); + if (!res) + return 0; } /* * Basically we just allocate space in the PGresult for each field and * copy the data over. * - * Note: on malloc failure, we return -1 leaving *errmsgp still NULL, - * which caller will take to mean "out of memory". This is preferable to - * trying to set up such a message here, because evidently there's not - * enough memory for gettext() to do anything. + * Note: on malloc failure, we return 0 leaving *errmsgp still NULL, which + * caller will take to mean "out of memory". This is preferable to trying + * to set up such a message here, because evidently there's not enough + * memory for gettext() to do anything. */ tup = (PGresAttValue *) pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); if (tup == NULL) - return -1; + goto fail; for (i = 0; i < nfields; i++) { @@ -1078,7 +1060,7 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns, val = (char *) pqResultAlloc(res, clen + 1, isbinary); if (val == NULL) - return -1; + goto fail; /* copy and zero-terminate the data (even if it's binary) */ memcpy(val, columns[i].value, clen); @@ -1091,10 +1073,30 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns, /* And add the tuple to the PGresult's tuple array */ if (!pqAddTuple(res, tup)) - return -1; + goto fail; + + /* + * Success. In single-row mode, make the result available to the client + * immediately. + */ + if (conn->singleRowMode) + { + /* Change result status to special single-row value */ + res->resultStatus = PGRES_SINGLE_TUPLE; + /* Stash old result for re-use later */ + conn->next_result = conn->result; + conn->result = res; + /* And mark the result ready to return */ + conn->asyncStatus = PGASYNC_READY; + } - /* Success */ return 1; + +fail: + /* release locally allocated PGresult, if we made one */ + if (res != conn->result) + PQclear(res); + return 0; } @@ -1343,6 +1345,10 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result = NULL; + conn->next_result = NULL; + + /* reset single-row processing mode */ + conn->singleRowMode = false; /* ready to send command message */ return true; @@ -1548,6 +1554,31 @@ pqHandleSendFailure(PGconn *conn) } /* + * Select row-by-row processing mode + */ +int +PQsetSingleRowMode(PGconn *conn) +{ + /* + * Only allow setting the flag when we have launched a query and not yet + * received any results. + */ + if (!conn) + return 0; + if (conn->asyncStatus != PGASYNC_BUSY) + return 0; + if (conn->queryclass != PGQUERY_SIMPLE && + conn->queryclass != PGQUERY_EXTENDED) + return 0; + if (conn->result) + return 0; + + /* OK, set flag */ + conn->singleRowMode = true; + return 1; +} + +/* * Consume any available input from the backend * 0 return: some kind of trouble * 1 return: no problem @@ -1587,9 +1618,6 @@ PQconsumeInput(PGconn *conn) * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. - * - * Note: callers of parseInput must be prepared for a longjmp exit when we are - * in PGASYNC_BUSY state, since an external row processor might do that. */ static void parseInput(PGconn *conn) @@ -1737,49 +1765,6 @@ PQgetResult(PGconn *conn) return res; } -/* - * PQskipResult - * Get the next PGresult produced by a query, but discard any data rows. - * - * This is mainly useful for cleaning up after a longjmp out of a row - * processor, when resuming processing of the current query result isn't - * wanted. Note that this is of little value in an async-style application, - * since any preceding calls to PQisBusy would have already called the regular - * row processor. - */ -PGresult * -PQskipResult(PGconn *conn) -{ - PGresult *res; - PQrowProcessor savedRowProcessor; - - if (!conn) - return NULL; - - /* temporarily install dummy row processor */ - savedRowProcessor = conn->rowProcessor; - conn->rowProcessor = dummyRowProcessor; - /* no need to save/change rowProcessorParam */ - - /* fetch the next result */ - res = PQgetResult(conn); - - /* restore previous row processor */ - conn->rowProcessor = savedRowProcessor; - - return res; -} - -/* - * Do-nothing row processor for PQskipResult - */ -static int -dummyRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) -{ - return 1; -} - /* * PQexec @@ -1886,7 +1871,7 @@ PQexecStart(PGconn *conn) * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. */ - while ((result = PQskipResult(conn)) != NULL) + while ((result = PQgetResult(conn)) != NULL) { ExecStatusType resultStatus = result->resultStatus; |