summaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-protocol2.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq/fe-protocol2.c')
-rw-r--r--src/interfaces/libpq/fe-protocol2.c325
1 files changed, 240 insertions, 85 deletions
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c38993b8b..43f9954dd1c 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -49,11 +49,19 @@ static int getNotify(PGconn *conn);
PostgresPollingStatusType
pqSetenvPoll(PGconn *conn)
{
+ PostgresPollingStatusType result;
PGresult *res;
+ PQrowProcessor savedRowProcessor;
+ void *savedRowProcessorParam;
if (conn == NULL || conn->status == CONNECTION_BAD)
return PGRES_POLLING_FAILED;
+ /* Ensure the standard row processor is used to collect any results */
+ savedRowProcessor = conn->rowProcessor;
+ savedRowProcessorParam = conn->rowProcessorParam;
+ PQsetRowProcessor(conn, NULL, NULL);
+
/* Check whether there are any data for us */
switch (conn->setenv_state)
{
@@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn)
if (n < 0)
goto error_return;
if (n == 0)
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
break;
}
@@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn)
/* Should we raise an error if called when not active? */
case SETENV_STATE_IDLE:
- return PGRES_POLLING_OK;
+ result = PGRES_POLLING_OK;
+ goto normal_return;
default:
printfPQExpBuffer(&conn->errorMessage,
@@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn)
case SETENV_STATE_CLIENT_ENCODING_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
@@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn)
case SETENV_STATE_OPTION_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
@@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn)
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
- return PGRES_POLLING_READING;
+ result = PGRES_POLLING_READING;
+ goto normal_return;
}
case SETENV_STATE_QUERY1_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
@@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn)
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
- return PGRES_POLLING_READING;
+ result = PGRES_POLLING_READING;
+ goto normal_return;
}
case SETENV_STATE_QUERY2_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
@@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn)
{
/* Query finished, so we're done */
conn->setenv_state = SETENV_STATE_IDLE;
- return PGRES_POLLING_OK;
+ result = PGRES_POLLING_OK;
+ goto normal_return;
}
break;
}
@@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn)
error_return:
conn->setenv_state = SETENV_STATE_IDLE;
- return PGRES_POLLING_FAILED;
+ result = PGRES_POLLING_FAILED;
+
+normal_return:
+ conn->rowProcessor = savedRowProcessor;
+ conn->rowProcessorParam = savedRowProcessorParam;
+ return result;
}
@@ -406,6 +438,9 @@ error_return:
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput2(PGconn *conn)
@@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn)
/* First 'T' in a query sequence */
if (getRowDescriptions(conn))
return;
+ /* getRowDescriptions() moves inStart itself */
+ continue;
}
else
{
@@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, FALSE))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else
{
@@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, TRUE))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else
{
@@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn)
/*
* parseInput subroutine to read a 'T' (row descriptions) message.
* We build a PGresult structure containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
*
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time. Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note that if we run out of data, we have to suspend and reprocess
+ * the message after more data is received. Otherwise, conn->inStart
+ * must get advanced past the processed data.
*/
static int
getRowDescriptions(PGconn *conn)
{
- PGresult *result = NULL;
+ PGresult *result;
int nfields;
+ const char *errmsg;
int i;
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
/* parseInput already read the 'T' label. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
- goto failure;
+ goto EOFexit;
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
@@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
@@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn)
pqGetInt(&typid, 4, conn) ||
pqGetInt(&typlen, 2, conn) ||
pqGetInt(&atttypmod, 4, conn))
- goto failure;
+ goto EOFexit;
/*
* Since pqGetInt treats 2-byte integers as unsigned, we need to
@@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
result->attDescs[i].tableid = 0;
result->attDescs[i].columnid = 0;
result->attDescs[i].format = 0;
@@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn)
/* Success! */
conn->result = result;
- return 0;
-failure:
- if (result)
+ /*
+ * Advance inStart to show that the "T" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
+
+ /* Give the row processor a chance to initialize for new result set */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /*
+ * Discard the failed message. Unfortunately we don't know for sure
+ * where the end is, so just throw away everything in the input buffer.
+ * This is not very desirable but it's the best we can do in protocol v2.
+ */
+ conn->inStart = conn->inEnd;
+
+set_error_result:
+
+ /*
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
+ */
+ pqClearAsyncResult(conn);
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+
+ /*
+ * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
+ * do to recover...
+ */
+ conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+ conn->asyncStatus = PGASYNC_READY;
+
+EOFexit:
+ if (result && result != conn->result)
PQclear(result);
return EOF;
}
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
*
* Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. Otherwise, conn->inStart
+ * must get advanced past the processed data.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
-
+ const char *errmsg;
+ PGdataValue *rowbuf;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
char *bitmap = std_bitmap;
@@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary)
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
- result->binary = binary;
-
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
+ /* Resize row buffer if needed */
+ rowbuf = conn->rowBuf;
+ if (nfields > conn->rowBufLen)
{
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
- /*
- * If it's binary, fix the column format indicators. We assume the
- * backend will consistently send either B or D, not a mix.
- */
- if (binary)
+ rowbuf = (PGdataValue *) realloc(rowbuf,
+ nfields * sizeof(PGdataValue));
+ if (!rowbuf)
{
- for (i = 0; i < nfields; i++)
- result->attDescs[i].format = 1;
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
}
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
+ }
+
+ /* Save format specifier */
+ result->binary = binary;
+
+ /*
+ * If it's binary, fix the column format indicators. We assume the
+ * backend will consistently send either B or D, not a mix.
+ */
+ if (binary)
+ {
+ for (i = 0; i < nfields; i++)
+ result->attDescs[i].format = 1;
}
- tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
- goto outOfMemory;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
}
if (pqGetnchar(bitmap, nbytes, conn))
@@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary)
for (i = 0; i < nfields; i++)
{
+ /* get the value length */
if (!(bmap & 0200))
- {
- /* if the field value is absent, make it a null string */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- }
+ vlen = NULL_LEN;
+ else if (pqGetInt(&vlen, 4, conn))
+ goto EOFexit;
else
{
- /* get the value length (the first four bytes are for length) */
- if (pqGetInt(&vlen, 4, conn))
- goto EOFexit;
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
- if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- goto EOFexit;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
+ rowbuf[i].len = vlen;
+
+ /*
+ * rowbuf[i].value always points to the next address in the data
+ * buffer even if the value is NULL. This allows row processors to
+ * estimate data sizes more easily.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+ /* Skip over the data value */
+ if (vlen > 0)
+ {
+ if (pqSkipnchar(vlen, conn))
+ goto EOFexit;
+ }
+
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
@@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary)
bmap <<= 1;
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
-
+ /* Release bitmap now if we allocated it */
if (bitmap != std_bitmap)
free(bitmap);
- return 0;
+ bitmap = NULL;
+
+ /*
+ * Advance inStart to show that the "D" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
+
+ /* Pass the completed row values to rowProcessor */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
-outOfMemory:
- /* Replace partially constructed result with an error result */
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /*
+ * Discard the failed message. Unfortunately we don't know for sure
+ * where the end is, so just throw away everything in the input buffer.
+ * This is not very desirable but it's the best we can do in protocol v2.
+ */
+ conn->inStart = conn->inEnd;
+
+set_error_result:
/*
- * we do NOT use pqSaveErrorResult() here, because of the likelihood that
- * there's not enough memory to concatenate messages...
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
@@ -838,8 +994,6 @@ outOfMemory:
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
conn->asyncStatus = PGASYNC_READY;
- /* Discard the failed message --- good idea? */
- conn->inStart = conn->inEnd;
EOFexit:
if (bitmap != NULL && bitmap != std_bitmap)
@@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen)
{
int result = 1; /* return value if buffer overflows */
- if (conn->sock < 0)
+ if (conn->sock < 0 ||
+ conn->asyncStatus != PGASYNC_COPY_OUT)
{
*s = '\0';
return EOF;