diff options
| author | Tom Lane <tgl@sss.pgh.pa.us> | 2012-04-04 18:27:56 -0400 |
|---|---|---|
| committer | Tom Lane <tgl@sss.pgh.pa.us> | 2012-04-04 18:27:56 -0400 |
| commit | 92785dac2ee7026948962cd61c4cd84a2d052772 (patch) | |
| tree | deb7a2c120978b9f3b85410317271a91b76ad66d /src/interfaces/libpq/fe-protocol2.c | |
| parent | cb917e1544612c187c74fed1a990e26820514c8a (diff) | |
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing
it back to the application. That provides a simple and transactional API,
but it's pretty inefficient for large result sets. This patch allows the
application to process each row on-the-fly instead of accumulating the
rows into the PGresult. Error recovery becomes a bit more complex, but
often that tradeoff is well worth making.
Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
Diffstat (limited to 'src/interfaces/libpq/fe-protocol2.c')
| -rw-r--r-- | src/interfaces/libpq/fe-protocol2.c | 325 |
1 files changed, 240 insertions, 85 deletions
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c38993b8b..43f9954dd1c 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -49,11 +49,19 @@ static int getNotify(PGconn *conn); PostgresPollingStatusType pqSetenvPoll(PGconn *conn) { + PostgresPollingStatusType result; PGresult *res; + PQrowProcessor savedRowProcessor; + void *savedRowProcessorParam; if (conn == NULL || conn->status == CONNECTION_BAD) return PGRES_POLLING_FAILED; + /* Ensure the standard row processor is used to collect any results */ + savedRowProcessor = conn->rowProcessor; + savedRowProcessorParam = conn->rowProcessorParam; + PQsetRowProcessor(conn, NULL, NULL); + /* Check whether there are any data for us */ switch (conn->setenv_state) { @@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn) if (n < 0) goto error_return; if (n == 0) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } break; } @@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn) /* Should we raise an error if called when not active? */ case SETENV_STATE_IDLE: - return PGRES_POLLING_OK; + result = PGRES_POLLING_OK; + goto normal_return; default: printfPQExpBuffer(&conn->errorMessage, @@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_CLIENT_ENCODING_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_OPTION_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY1_WAIT; - return PGRES_POLLING_READING; + result = PGRES_POLLING_READING; + goto normal_return; } case SETENV_STATE_QUERY1_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY2_WAIT; - return PGRES_POLLING_READING; + result = PGRES_POLLING_READING; + goto normal_return; } case SETENV_STATE_QUERY2_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn) { /* Query finished, so we're done */ conn->setenv_state = SETENV_STATE_IDLE; - return PGRES_POLLING_OK; + result = PGRES_POLLING_OK; + goto normal_return; } break; } @@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn) error_return: conn->setenv_state = SETENV_STATE_IDLE; - return PGRES_POLLING_FAILED; + result = PGRES_POLLING_FAILED; + +normal_return: + conn->rowProcessor = savedRowProcessor; + conn->rowProcessorParam = savedRowProcessorParam; + return result; } @@ -406,6 +438,9 @@ error_return: * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. + * + * Note: callers of parseInput must be prepared for a longjmp exit when we are + * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput2(PGconn *conn) @@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn) /* First 'T' in a query sequence */ if (getRowDescriptions(conn)) return; + /* getRowDescriptions() moves inStart itself */ + continue; } else { @@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, FALSE)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else { @@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, TRUE)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else { @@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn) /* * parseInput subroutine to read a 'T' (row descriptions) message. * We build a PGresult structure containing the attribute data. - * Returns: 0 if completed message, EOF if not enough data yet. + * Returns: 0 if completed message, EOF if error or not enough data + * received yet. * - * Note that if we run out of data, we have to release the partially - * constructed PGresult, and rebuild it again next time. Fortunately, - * that shouldn't happen often, since 'T' messages usually fit in a packet. + * Note that if we run out of data, we have to suspend and reprocess + * the message after more data is received. Otherwise, conn->inStart + * must get advanced past the processed data. */ static int getRowDescriptions(PGconn *conn) { - PGresult *result = NULL; + PGresult *result; int nfields; + const char *errmsg; int i; result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK); if (!result) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } /* parseInput already read the 'T' label. */ /* the next two bytes are the number of fields */ if (pqGetInt(&(result->numAttributes), 2, conn)) - goto failure; + goto EOFexit; nfields = result->numAttributes; /* allocate space for the attribute descriptors */ @@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn) result->attDescs = (PGresAttDesc *) pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); if (!result->attDescs) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); } @@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn) pqGetInt(&typid, 4, conn) || pqGetInt(&typlen, 2, conn) || pqGetInt(&atttypmod, 4, conn)) - goto failure; + goto EOFexit; /* * Since pqGetInt treats 2-byte integers as unsigned, we need to @@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn) result->attDescs[i].name = pqResultStrdup(result, conn->workBuffer.data); if (!result->attDescs[i].name) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } result->attDescs[i].tableid = 0; result->attDescs[i].columnid = 0; result->attDescs[i].format = 0; @@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn) /* Success! */ conn->result = result; - return 0; -failure: - if (result) + /* + * Advance inStart to show that the "T" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; + + /* Give the row processor a chance to initialize for new result set */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, NULL, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* + * Discard the failed message. Unfortunately we don't know for sure + * where the end is, so just throw away everything in the input buffer. + * This is not very desirable but it's the best we can do in protocol v2. + */ + conn->inStart = conn->inEnd; + +set_error_result: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + + /* + * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can + * do to recover... + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); + conn->asyncStatus = PGASYNC_READY; + +EOFexit: + if (result && result != conn->result) PQclear(result); return EOF; } /* * parseInput subroutine to read a 'B' or 'D' (row data) message. - * We add another tuple to the existing PGresult structure. - * Returns: 0 if completed message, EOF if error or not enough data yet. + * We fill rowbuf with column pointers and then call the row processor. + * Returns: 0 if completed message, EOF if error or not enough data + * received yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. Otherwise, conn->inStart + * must get advanced past the processed data. */ static int getAnotherTuple(PGconn *conn, bool binary) { PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; - + const char *errmsg; + PGdataValue *rowbuf; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ char *bitmap = std_bitmap; @@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary) int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value */ - result->binary = binary; - - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + /* Resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) + rowbuf = (PGdataValue *) realloc(rowbuf, + nfields * sizeof(PGdataValue)); + if (!rowbuf) { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + + /* Save format specifier */ + result->binary = binary; + + /* + * If it's binary, fix the column format indicators. We assume the + * backend will consistently send either B or D, not a mix. + */ + if (binary) + { + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } } if (pqGetnchar(bitmap, nbytes, conn)) @@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { + /* get the value length */ if (!(bmap & 0200)) - { - /* if the field value is absent, make it a null string */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - } + vlen = NULL_LEN; + else if (pqGetInt(&vlen, 4, conn)) + goto EOFexit; else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } + rowbuf[i].len = vlen; + + /* + * rowbuf[i].value always points to the next address in the data + * buffer even if the value is NULL. This allows row processors to + * estimate data sizes more easily. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + + /* Skip over the data value */ + if (vlen > 0) + { + if (pqSkipnchar(vlen, conn)) + goto EOFexit; + } + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) @@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; - + /* Release bitmap now if we allocated it */ if (bitmap != std_bitmap) free(bitmap); - return 0; + bitmap = NULL; + + /* + * Advance inStart to show that the "D" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; + + /* Pass the completed row values to rowProcessor */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; -outOfMemory: - /* Replace partially constructed result with an error result */ + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* + * Discard the failed message. Unfortunately we don't know for sure + * where the end is, so just throw away everything in the input buffer. + * This is not very desirable but it's the best we can do in protocol v2. + */ + conn->inStart = conn->inEnd; + +set_error_result: /* - * we do NOT use pqSaveErrorResult() here, because of the likelihood that - * there's not enough memory to concatenate messages... + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can @@ -838,8 +994,6 @@ outOfMemory: */ conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); conn->asyncStatus = PGASYNC_READY; - /* Discard the failed message --- good idea? */ - conn->inStart = conn->inEnd; EOFexit: if (bitmap != NULL && bitmap != std_bitmap) @@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen) { int result = 1; /* return value if buffer overflows */ - if (conn->sock < 0) + if (conn->sock < 0 || + conn->asyncStatus != PGASYNC_COPY_OUT) { *s = '\0'; return EOF; |
