summaryrefslogtreecommitdiff
path: root/src/interfaces/libpq
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq')
-rw-r--r--src/interfaces/libpq/fe-exec.c112
-rw-r--r--src/interfaces/libpq/fe-protocol3.c30
-rw-r--r--src/interfaces/libpq/libpq-int.h6
3 files changed, 117 insertions, 31 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 919cf5741d4..7f1ab94fd1e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1380,7 +1380,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
- if (conn->asyncStatus == PGASYNC_IDLE)
+ if (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
pqPipelineProcessQueue(conn);
break;
}
@@ -1436,6 +1437,7 @@ static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
+ PGcmdQueueEntry *entry2 = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
@@ -1451,6 +1453,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2 = pqAllocCmdQueueEntry(conn);
+ if (entry2 == NULL)
+ goto sendFailed;
+ }
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1520,6 +1528,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * When pipeline mode is in use, we need a second entry in the command
+ * queue to represent Close Portal message. This allows us later to wait
+ * for the CloseComplete message to be received before getting in IDLE
+ * state.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2->queryclass = PGQUERY_CLOSE;
+ entry2->query = NULL;
+ pqAppendCmdQueueEntry(conn, entry2);
+ }
+
return 1;
sendFailed:
@@ -1767,11 +1789,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
+
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
@@ -2144,16 +2168,21 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF)
- {
- /*
- * We're about to return the NULL that terminates the round of
- * results from the current query; prepare to send the results
- * of the next query when we're called next.
- */
- pqPipelineProcessQueue(conn);
- }
break;
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query, if any, when we're called next. If there's
+ * no next element in the command queue, this gets us in IDLE
+ * state.
+ */
+ pqPipelineProcessQueue(conn);
+ res = NULL; /* query is complete */
+ break;
+
case PGASYNC_READY:
/*
@@ -2174,7 +2203,7 @@ PQgetResult(PGconn *conn)
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
- conn->asyncStatus = PGASYNC_IDLE;
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
@@ -2220,6 +2249,22 @@ PQgetResult(PGconn *conn)
break;
}
+ /* If the next command we expect is CLOSE, read and consume it */
+ if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+ conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ if (res && res->resultStatus != PGRES_FATAL_ERROR)
+ {
+ conn->asyncStatus = PGASYNC_BUSY;
+ parseInput(conn);
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ }
+ else
+ /* we won't ever see the Close */
+ pqCommandQueueAdvance(conn);
+ }
+
/* Time to fire PGEVT_RESULTCREATE events, if there are any */
if (res && res->nEvents > 0)
(void) PQfireResultCreateEvents(conn, res);
@@ -3014,7 +3059,10 @@ PQexitPipelineMode(PGconn *conn)
if (!conn)
return 0;
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+ (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+ conn->cmd_queue_head == NULL)
return 1;
switch (conn->asyncStatus)
@@ -3031,9 +3079,16 @@ PQexitPipelineMode(PGconn *conn)
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
- default:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK */
break;
+
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while in COPY\n"));
}
/* still work to process */
@@ -3070,6 +3125,10 @@ pqCommandQueueAdvance(PGconn *conn)
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
+ /* If the queue is now empty, reset the tail too */
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_tail = NULL;
+
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3092,15 +3151,35 @@ pqPipelineProcessQueue(PGconn *conn)
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
+
case PGASYNC_IDLE:
+ /*
+ * If we're in IDLE mode and there's some command in the queue,
+ * get us into PIPELINE_IDLE mode and process normally. Otherwise
+ * there's nothing for us to do.
+ */
+ if (conn->cmd_queue_head != NULL)
+ {
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ break;
+ }
+ return;
+
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
/* next query please */
break;
}
- /* Nothing to do if not in pipeline mode, or queue is empty */
- if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
- conn->cmd_queue_head == NULL)
+ /*
+ * If there are no further commands to process in the queue, get us in
+ * "real idle" mode now.
+ */
+ if (conn->cmd_queue_head == NULL)
+ {
+ conn->asyncStatus = PGASYNC_IDLE;
return;
+ }
/*
* Reset the error state. This and the next couple of steps correspond to
@@ -3193,6 +3272,7 @@ PQpipelineSync(PGconn *conn)
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 10c76daf6ed..f267dfd33c5 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -159,18 +159,6 @@ pqParseInput3(PGconn *conn)
return;
/*
- * We're also notionally not-IDLE when in pipeline mode the state
- * says "idle" (so we have completed receiving the results of one
- * query from the server and dispatched them to the application)
- * but another query is queued; yield back control to caller so
- * that they can initiate processing of the next query in the
- * queue.
- */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
- conn->cmd_queue_head != NULL)
- return;
-
- /*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
* ParameterStatus is handled normally; anything else is just
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
}
break;
case '2': /* Bind Complete */
+ /* Nothing to do for this message type */
+ break;
case '3': /* Close Complete */
- /* Nothing to do for these message types */
+ /*
+ * If we get CloseComplete when waiting for it, consume
+ * the queue element and keep going. A result is not
+ * expected from this message; it is just there so that
+ * we know to wait for it when PQsendQuery is used in
+ * pipeline mode, before going in IDLE state. Failing to
+ * do this makes us receive CloseComplete when IDLE, which
+ * creates problems.
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ pqCommandQueueAdvance(conn);
+ }
+
break;
case 'S': /* parameter status */
if (getParameterStatus(conn))
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 3db6a17db4d..51ab51f9f92 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -225,7 +225,8 @@ typedef enum
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
} PGAsyncStatusType;
/* Target server type (decoded value of target_session_attrs) */
@@ -311,7 +312,8 @@ typedef enum
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
- PGQUERY_SYNC /* Sync (at end of a pipeline) */
+ PGQUERY_SYNC, /* Sync (at end of a pipeline) */
+ PGQUERY_CLOSE
} PGQueryClass;
/*