diff options
Diffstat (limited to 'src/interfaces/libpq')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 112 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 30 | ||||
-rw-r--r-- | src/interfaces/libpq/libpq-int.h | 6 |
3 files changed, 117 insertions, 31 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 919cf5741d4..7f1ab94fd1e 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1380,7 +1380,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) * itself consume commands from the queue; if we're in any other * state, we don't have to do anything. */ - if (conn->asyncStatus == PGASYNC_IDLE) + if (conn->asyncStatus == PGASYNC_IDLE || + conn->asyncStatus == PGASYNC_PIPELINE_IDLE) pqPipelineProcessQueue(conn); break; } @@ -1436,6 +1437,7 @@ static int PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) { PGcmdQueueEntry *entry = NULL; + PGcmdQueueEntry *entry2 = NULL; if (!PQsendQueryStart(conn, newQuery)) return 0; @@ -1451,6 +1453,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) entry = pqAllocCmdQueueEntry(conn); if (entry == NULL) return 0; /* error msg already set */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + entry2 = pqAllocCmdQueueEntry(conn); + if (entry2 == NULL) + goto sendFailed; + } /* Send the query message(s) */ if (conn->pipelineStatus == PQ_PIPELINE_OFF) @@ -1520,6 +1528,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); + + /* + * When pipeline mode is in use, we need a second entry in the command + * queue to represent Close Portal message. This allows us later to wait + * for the CloseComplete message to be received before getting in IDLE + * state. + */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + entry2->queryclass = PGQUERY_CLOSE; + entry2->query = NULL; + pqAppendCmdQueueEntry(conn, entry2); + } + return 1; sendFailed: @@ -1767,11 +1789,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: /* ok to queue */ break; + case PGASYNC_COPY_IN: case PGASYNC_COPY_OUT: case PGASYNC_COPY_BOTH: @@ -2144,16 +2168,21 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF) - { - /* - * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query when we're called next. - */ - pqPipelineProcessQueue(conn); - } break; + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); + + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query, if any, when we're called next. If there's + * no next element in the command queue, this gets us in IDLE + * state. + */ + pqPipelineProcessQueue(conn); + res = NULL; /* query is complete */ + break; + case PGASYNC_READY: /* @@ -2174,7 +2203,7 @@ PQgetResult(PGconn *conn) * We're about to send the results of the current query. Set * us idle now, and ... */ - conn->asyncStatus = PGASYNC_IDLE; + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; /* * ... in cases when we're sending a pipeline-sync result, @@ -2220,6 +2249,22 @@ PQgetResult(PGconn *conn) break; } + /* If the next command we expect is CLOSE, read and consume it */ + if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE && + conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) + { + if (res && res->resultStatus != PGRES_FATAL_ERROR) + { + conn->asyncStatus = PGASYNC_BUSY; + parseInput(conn); + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; + } + else + /* we won't ever see the Close */ + pqCommandQueueAdvance(conn); + } + /* Time to fire PGEVT_RESULTCREATE events, if there are any */ if (res && res->nEvents > 0) (void) PQfireResultCreateEvents(conn, res); @@ -3014,7 +3059,10 @@ PQexitPipelineMode(PGconn *conn) if (!conn) return 0; - if (conn->pipelineStatus == PQ_PIPELINE_OFF) + if (conn->pipelineStatus == PQ_PIPELINE_OFF && + (conn->asyncStatus == PGASYNC_IDLE || + conn->asyncStatus == PGASYNC_PIPELINE_IDLE) && + conn->cmd_queue_head == NULL) return 1; switch (conn->asyncStatus) @@ -3031,9 +3079,16 @@ PQexitPipelineMode(PGconn *conn) libpq_gettext("cannot exit pipeline mode while busy\n")); return 0; - default: + case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK */ break; + + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode while in COPY\n")); } /* still work to process */ @@ -3070,6 +3125,10 @@ pqCommandQueueAdvance(PGconn *conn) prevquery = conn->cmd_queue_head; conn->cmd_queue_head = conn->cmd_queue_head->next; + /* If the queue is now empty, reset the tail too */ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_tail = NULL; + /* and make it recyclable */ prevquery->next = NULL; pqRecycleCmdQueueEntry(conn, prevquery); @@ -3092,15 +3151,35 @@ pqPipelineProcessQueue(PGconn *conn) case PGASYNC_BUSY: /* client still has to process current query or results */ return; + case PGASYNC_IDLE: + /* + * If we're in IDLE mode and there's some command in the queue, + * get us into PIPELINE_IDLE mode and process normally. Otherwise + * there's nothing for us to do. + */ + if (conn->cmd_queue_head != NULL) + { + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; + break; + } + return; + + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); /* next query please */ break; } - /* Nothing to do if not in pipeline mode, or queue is empty */ - if (conn->pipelineStatus == PQ_PIPELINE_OFF || - conn->cmd_queue_head == NULL) + /* + * If there are no further commands to process in the queue, get us in + * "real idle" mode now. + */ + if (conn->cmd_queue_head == NULL) + { + conn->asyncStatus = PGASYNC_IDLE; return; + } /* * Reset the error state. This and the next couple of steps correspond to @@ -3193,6 +3272,7 @@ PQpipelineSync(PGconn *conn) case PGASYNC_READY_MORE: case PGASYNC_BUSY: case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK to send sync */ break; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 10c76daf6ed..f267dfd33c5 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -159,18 +159,6 @@ pqParseInput3(PGconn *conn) return; /* - * We're also notionally not-IDLE when in pipeline mode the state - * says "idle" (so we have completed receiving the results of one - * query from the server and dispatched them to the application) - * but another query is queued; yield back control to caller so - * that they can initiate processing of the next query in the - * queue. - */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF && - conn->cmd_queue_head != NULL) - return; - - /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; * ParameterStatus is handled normally; anything else is just @@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn) } break; case '2': /* Bind Complete */ + /* Nothing to do for this message type */ + break; case '3': /* Close Complete */ - /* Nothing to do for these message types */ + /* + * If we get CloseComplete when waiting for it, consume + * the queue element and keep going. A result is not + * expected from this message; it is just there so that + * we know to wait for it when PQsendQuery is used in + * pipeline mode, before going in IDLE state. Failing to + * do this makes us receive CloseComplete when IDLE, which + * creates problems. + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) + { + pqCommandQueueAdvance(conn); + } + break; case 'S': /* parameter status */ if (getParameterStatus(conn)) diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 3db6a17db4d..51ab51f9f92 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -225,7 +225,8 @@ typedef enum * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */ } PGAsyncStatusType; /* Target server type (decoded value of target_session_attrs) */ @@ -311,7 +312,8 @@ typedef enum PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ PGQUERY_DESCRIBE, /* Describe Statement or Portal */ - PGQUERY_SYNC /* Sync (at end of a pipeline) */ + PGQUERY_SYNC, /* Sync (at end of a pipeline) */ + PGQUERY_CLOSE } PGQueryClass; /* |