summaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-exec.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r--src/interfaces/libpq/fe-exec.c219
1 files changed, 102 insertions, 117 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index badc0b32a8e..53516db7234 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -38,7 +38,8 @@ char *const pgresStatus[] = {
"PGRES_BAD_RESPONSE",
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
- "PGRES_COPY_BOTH"
+ "PGRES_COPY_BOTH",
+ "PGRES_SINGLE_TUPLE"
};
/*
@@ -51,8 +52,6 @@ static bool static_std_strings = false;
static PGEvent *dupEvents(PGEvent *events, int count);
static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
-static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param);
static bool PQsendQueryStart(PGconn *conn);
static int PQsendQueryGuts(PGconn *conn,
const char *command,
@@ -64,8 +63,6 @@ static int PQsendQueryGuts(PGconn *conn,
const int *paramFormats,
int resultFormat);
static void parseInput(PGconn *conn);
-static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param);
static bool PQexecStart(PGconn *conn);
static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
@@ -181,6 +178,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
+ case PGRES_SINGLE_TUPLE:
/* non-error cases */
break;
default:
@@ -698,6 +696,8 @@ PQclear(PGresult *res)
/*
* Handy subroutine to deallocate any partially constructed async result.
+ *
+ * Any "next" result gets cleared too.
*/
void
pqClearAsyncResult(PGconn *conn)
@@ -705,6 +705,9 @@ pqClearAsyncResult(PGconn *conn)
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
+ if (conn->next_result)
+ PQclear(conn->next_result);
+ conn->next_result = NULL;
}
/*
@@ -758,7 +761,6 @@ pqPrepareAsyncResult(PGconn *conn)
* conn->errorMessage.
*/
res = conn->result;
- conn->result = NULL; /* handing over ownership to caller */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
@@ -771,6 +773,16 @@ pqPrepareAsyncResult(PGconn *conn)
appendPQExpBufferStr(&conn->errorMessage,
PQresultErrorMessage(res));
}
+
+ /*
+ * Replace conn->result with next_result, if any. In the normal case
+ * there isn't a next result and we're just dropping ownership of the
+ * current result. In single-row mode this restores the situation to what
+ * it was before we created the current single-row result.
+ */
+ conn->result = conn->next_result;
+ conn->next_result = NULL;
+
return res;
}
@@ -981,85 +993,55 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
/*
- * PQsetRowProcessor
- * Set function that copies row data out from the network buffer,
- * along with a passthrough parameter for it.
- */
-void
-PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
-{
- if (!conn)
- return;
-
- if (func)
- {
- /* set custom row processor */
- conn->rowProcessor = func;
- conn->rowProcessorParam = param;
- }
- else
- {
- /* set default row processor */
- conn->rowProcessor = pqStdRowProcessor;
- conn->rowProcessorParam = conn;
- }
-}
-
-/*
- * PQgetRowProcessor
- * Get current row processor of PGconn.
- * If param is not NULL, also store the passthrough parameter at *param.
- */
-PQrowProcessor
-PQgetRowProcessor(const PGconn *conn, void **param)
-{
- if (!conn)
- {
- if (param)
- *param = NULL;
- return NULL;
- }
-
- if (param)
- *param = conn->rowProcessorParam;
- return conn->rowProcessor;
-}
-
-/*
- * pqStdRowProcessor
- * Add the received row to the PGresult structure
- * Returns 1 if OK, -1 if error occurred.
+ * pqRowProcessor
+ * Add the received row to the current async result (conn->result).
+ * Returns 1 if OK, 0 if error occurred.
+ *
+ * On error, *errmsgp can be set to an error string to be returned.
+ * If it is left NULL, the error is presumed to be "out of memory".
*
- * Note: "param" should point to the PGconn, but we don't actually need that
- * as of the current coding.
+ * In single-row mode, we create a new result holding just the current row,
+ * stashing the previous result in conn->next_result so that it becomes
+ * active again after pqPrepareAsyncResult(). This allows the result metadata
+ * (column descriptions) to be carried forward to each result row.
*/
-static int
-pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param)
+int
+pqRowProcessor(PGconn *conn, const char **errmsgp)
{
+ PGresult *res = conn->result;
int nfields = res->numAttributes;
+ const PGdataValue *columns = conn->rowBuf;
PGresAttValue *tup;
int i;
- if (columns == NULL)
+ /*
+ * In single-row mode, make a new PGresult that will hold just this one
+ * row; the original conn->result is left unchanged so that it can be used
+ * again as the template for future rows.
+ */
+ if (conn->singleRowMode)
{
- /* New result set ... we have nothing to do in this function. */
- return 1;
+ /* Copy everything that should be in the result at this point */
+ res = PQcopyResult(res,
+ PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+ PG_COPYRES_NOTICEHOOKS);
+ if (!res)
+ return 0;
}
/*
* Basically we just allocate space in the PGresult for each field and
* copy the data over.
*
- * Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
- * which caller will take to mean "out of memory". This is preferable to
- * trying to set up such a message here, because evidently there's not
- * enough memory for gettext() to do anything.
+ * Note: on malloc failure, we return 0 leaving *errmsgp still NULL, which
+ * caller will take to mean "out of memory". This is preferable to trying
+ * to set up such a message here, because evidently there's not enough
+ * memory for gettext() to do anything.
*/
tup = (PGresAttValue *)
pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
if (tup == NULL)
- return -1;
+ goto fail;
for (i = 0; i < nfields; i++)
{
@@ -1078,7 +1060,7 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
val = (char *) pqResultAlloc(res, clen + 1, isbinary);
if (val == NULL)
- return -1;
+ goto fail;
/* copy and zero-terminate the data (even if it's binary) */
memcpy(val, columns[i].value, clen);
@@ -1091,10 +1073,30 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
/* And add the tuple to the PGresult's tuple array */
if (!pqAddTuple(res, tup))
- return -1;
+ goto fail;
+
+ /*
+ * Success. In single-row mode, make the result available to the client
+ * immediately.
+ */
+ if (conn->singleRowMode)
+ {
+ /* Change result status to special single-row value */
+ res->resultStatus = PGRES_SINGLE_TUPLE;
+ /* Stash old result for re-use later */
+ conn->next_result = conn->result;
+ conn->result = res;
+ /* And mark the result ready to return */
+ conn->asyncStatus = PGASYNC_READY;
+ }
- /* Success */
return 1;
+
+fail:
+ /* release locally allocated PGresult, if we made one */
+ if (res != conn->result)
+ PQclear(res);
+ return 0;
}
@@ -1343,6 +1345,10 @@ PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
+ conn->next_result = NULL;
+
+ /* reset single-row processing mode */
+ conn->singleRowMode = false;
/* ready to send command message */
return true;
@@ -1548,6 +1554,31 @@ pqHandleSendFailure(PGconn *conn)
}
/*
+ * Select row-by-row processing mode
+ */
+int
+PQsetSingleRowMode(PGconn *conn)
+{
+ /*
+ * Only allow setting the flag when we have launched a query and not yet
+ * received any results.
+ */
+ if (!conn)
+ return 0;
+ if (conn->asyncStatus != PGASYNC_BUSY)
+ return 0;
+ if (conn->queryclass != PGQUERY_SIMPLE &&
+ conn->queryclass != PGQUERY_EXTENDED)
+ return 0;
+ if (conn->result)
+ return 0;
+
+ /* OK, set flag */
+ conn->singleRowMode = true;
+ return 1;
+}
+
+/*
* Consume any available input from the backend
* 0 return: some kind of trouble
* 1 return: no problem
@@ -1587,9 +1618,6 @@ PQconsumeInput(PGconn *conn)
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
- *
- * Note: callers of parseInput must be prepared for a longjmp exit when we are
- * in PGASYNC_BUSY state, since an external row processor might do that.
*/
static void
parseInput(PGconn *conn)
@@ -1737,49 +1765,6 @@ PQgetResult(PGconn *conn)
return res;
}
-/*
- * PQskipResult
- * Get the next PGresult produced by a query, but discard any data rows.
- *
- * This is mainly useful for cleaning up after a longjmp out of a row
- * processor, when resuming processing of the current query result isn't
- * wanted. Note that this is of little value in an async-style application,
- * since any preceding calls to PQisBusy would have already called the regular
- * row processor.
- */
-PGresult *
-PQskipResult(PGconn *conn)
-{
- PGresult *res;
- PQrowProcessor savedRowProcessor;
-
- if (!conn)
- return NULL;
-
- /* temporarily install dummy row processor */
- savedRowProcessor = conn->rowProcessor;
- conn->rowProcessor = dummyRowProcessor;
- /* no need to save/change rowProcessorParam */
-
- /* fetch the next result */
- res = PQgetResult(conn);
-
- /* restore previous row processor */
- conn->rowProcessor = savedRowProcessor;
-
- return res;
-}
-
-/*
- * Do-nothing row processor for PQskipResult
- */
-static int
-dummyRowProcessor(PGresult *res, const PGdataValue *columns,
- const char **errmsgp, void *param)
-{
- return 1;
-}
-
/*
* PQexec
@@ -1886,7 +1871,7 @@ PQexecStart(PGconn *conn)
* Silently discard any prior query result that application didn't eat.
* This is probably poor design, but it's here for backward compatibility.
*/
- while ((result = PQskipResult(conn)) != NULL)
+ while ((result = PQgetResult(conn)) != NULL)
{
ExecStatusType resultStatus = result->resultStatus;