summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/interfaces/libpq/exports.txt1
-rw-r--r--src/interfaces/libpq/fe-exec.c54
-rw-r--r--src/interfaces/libpq/libpq-fe.h1
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c43
-rw-r--r--src/test/modules/libpq_pipeline/traces/multi_pipelines.trace11
5 files changed, 100 insertions, 10 deletions
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 28b861fd93b..088592deb16 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -192,3 +192,4 @@ PQclosePortal 189
PQsendClosePrepared 190
PQsendClosePortal 191
PQchangePassword 192
+PQsendPipelineSync 193
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index e1768d5475d..52d41658c1f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -81,6 +81,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type,
const char *target);
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
+static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
static int pqPipelineFlush(PGconn *conn);
@@ -3224,25 +3225,48 @@ pqPipelineProcessQueue(PGconn *conn)
/*
* PQpipelineSync
* Send a Sync message as part of a pipeline, and flush to server
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+ return pqPipelineSyncInternal(conn, true);
+}
+
+/*
+ * PQsendPipelineSync
+ * Send a Sync message as part of a pipeline, without flushing to server
+ */
+int
+PQsendPipelineSync(PGconn *conn)
+{
+ return pqPipelineSyncInternal(conn, false);
+}
+
+/*
+ * Workhorse function for PQpipelineSync and PQsendPipelineSync.
*
* It's legal to start submitting more commands in the pipeline immediately,
* without waiting for the results of the current pipeline. There's no need to
* end pipeline mode and start it again.
*
- * If a command in a pipeline fails, every subsequent command up to and including
- * the result to the Sync message sent by PQpipelineSync gets set to
- * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
- * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ * If a command in a pipeline fails, every subsequent command up to and
+ * including the result to the Sync message sent by pqPipelineSyncInternal
+ * gets set to PGRES_PIPELINE_ABORTED state. If the whole pipeline is
+ * processed without error, a PGresult with PGRES_PIPELINE_SYNC is produced.
*
- * Queries can already have been sent before PQpipelineSync is called, but
- * PQpipelineSync needs to be called before retrieving command results.
+ * Queries can already have been sent before pqPipelineSyncInternal is called,
+ * but pqPipelineSyncInternal needs to be called before retrieving command
+ * results.
*
* The connection will remain in pipeline mode and unavailable for new
* synchronous command execution functions until all results from the pipeline
* are processed by the client.
+ *
+ * immediate_flush controls if the flush happens immediately after sending the
+ * Sync message or not.
*/
-int
-PQpipelineSync(PGconn *conn)
+static int
+pqPipelineSyncInternal(PGconn *conn, bool immediate_flush)
{
PGcmdQueueEntry *entry;
@@ -3288,9 +3312,19 @@ PQpipelineSync(PGconn *conn)
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
+ * If immediate_flush is disabled, the data is pushed if we are past the
+ * size threshold.
*/
- if (PQflush(conn) < 0)
- goto sendFailed;
+ if (immediate_flush)
+ {
+ if (pqFlush(conn) < 0)
+ goto sendFailed;
+ }
+ else
+ {
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
+ }
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f0ec660cb69..defc415fa3f 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn);
extern int PQexitPipelineMode(PGconn *conn);
extern int PQpipelineSync(PGconn *conn);
extern int PQsendFlushRequest(PGconn *conn);
+extern int PQsendPipelineSync(PGconn *conn);
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71cd04c5f23..5f43aa40de4 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -162,6 +162,7 @@ test_multi_pipelines(PGconn *conn)
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ /* first pipeline */
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
@@ -169,14 +170,27 @@ test_multi_pipelines(PGconn *conn)
if (PQpipelineSync(conn) != 1)
pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+ /* second pipeline */
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+ /* Skip flushing once. */
+ if (PQsendPipelineSync(conn) != 1)
+ pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /* third pipeline */
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
+
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
/* OK, start processing the results */
+
+ /* first pipeline */
+
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
@@ -214,6 +228,35 @@ test_multi_pipelines(PGconn *conn)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Unexpected result code %s from second pipeline item",
PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first result");
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+
+ /* third pipeline */
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from third pipeline item",
+ PQresStatus(PQresultStatus(res)));
res = PQgetResult(conn);
if (res != NULL)
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
index 4b9ab07ca42..1ee21f61dce 100644
--- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
@@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0
F 6 Describe P ""
F 9 Execute "" 0
F 4 Sync
+F 21 Parse "" "SELECT $1" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
B 4 ParseComplete
B 4 BindComplete
B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0