summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Paquier <michael@paquier.xyz>2024-01-16 10:13:42 +0900
committerMichael Paquier <michael@paquier.xyz>2024-01-16 10:13:42 +0900
commit4794c2d31714235700ed68edafa10d1928c9bbb2 (patch)
tree4ead26663e9ae6eb6e6d195880bc5a7dd950e4c1 /src
parent83eb244e419efd43959fdd544ff8c30e175565ef (diff)
libpq: Add PQsendPipelineSync()
This new function is equivalent to PQpipelineSync(), except that it does not flush anything to the server except if the size threshold of the output buffer is reached; the user must subsequently call PQflush() instead. Its purpose is to reduce the system call overhead of pipeline mode, by giving to applications more control over the timing of the flushes when manipulating commands in pipeline mode. Author: Anton Kirilov Reviewed-by: Jelte Fennema-Nio, Robert Haas, Álvaro Herrera, Denis Laxalde, Michael Paquier Discussion: https://postgr.es/m/CACV6eE5arHFZEA717=iKEa_OewpVFfWJOmsOdGrqqsr8CJVfWQ@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/interfaces/libpq/exports.txt1
-rw-r--r--src/interfaces/libpq/fe-exec.c54
-rw-r--r--src/interfaces/libpq/libpq-fe.h1
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c43
-rw-r--r--src/test/modules/libpq_pipeline/traces/multi_pipelines.trace11
5 files changed, 100 insertions, 10 deletions
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 28b861fd93b..088592deb16 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -192,3 +192,4 @@ PQclosePortal 189
PQsendClosePrepared 190
PQsendClosePortal 191
PQchangePassword 192
+PQsendPipelineSync 193
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index e1768d5475d..52d41658c1f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -81,6 +81,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type,
const char *target);
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
+static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
static int pqPipelineFlush(PGconn *conn);
@@ -3224,25 +3225,48 @@ pqPipelineProcessQueue(PGconn *conn)
/*
* PQpipelineSync
* Send a Sync message as part of a pipeline, and flush to server
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+ return pqPipelineSyncInternal(conn, true);
+}
+
+/*
+ * PQsendPipelineSync
+ * Send a Sync message as part of a pipeline, without flushing to server
+ */
+int
+PQsendPipelineSync(PGconn *conn)
+{
+ return pqPipelineSyncInternal(conn, false);
+}
+
+/*
+ * Workhorse function for PQpipelineSync and PQsendPipelineSync.
*
* It's legal to start submitting more commands in the pipeline immediately,
* without waiting for the results of the current pipeline. There's no need to
* end pipeline mode and start it again.
*
- * If a command in a pipeline fails, every subsequent command up to and including
- * the result to the Sync message sent by PQpipelineSync gets set to
- * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
- * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ * If a command in a pipeline fails, every subsequent command up to and
+ * including the result to the Sync message sent by pqPipelineSyncInternal
+ * gets set to PGRES_PIPELINE_ABORTED state. If the whole pipeline is
+ * processed without error, a PGresult with PGRES_PIPELINE_SYNC is produced.
*
- * Queries can already have been sent before PQpipelineSync is called, but
- * PQpipelineSync needs to be called before retrieving command results.
+ * Queries can already have been sent before pqPipelineSyncInternal is called,
+ * but pqPipelineSyncInternal needs to be called before retrieving command
+ * results.
*
* The connection will remain in pipeline mode and unavailable for new
* synchronous command execution functions until all results from the pipeline
* are processed by the client.
+ *
+ * immediate_flush controls if the flush happens immediately after sending the
+ * Sync message or not.
*/
-int
-PQpipelineSync(PGconn *conn)
+static int
+pqPipelineSyncInternal(PGconn *conn, bool immediate_flush)
{
PGcmdQueueEntry *entry;
@@ -3288,9 +3312,19 @@ PQpipelineSync(PGconn *conn)
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
+ * If immediate_flush is disabled, the data is pushed if we are past the
+ * size threshold.
*/
- if (PQflush(conn) < 0)
- goto sendFailed;
+ if (immediate_flush)
+ {
+ if (pqFlush(conn) < 0)
+ goto sendFailed;
+ }
+ else
+ {
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
+ }
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f0ec660cb69..defc415fa3f 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn);
extern int PQexitPipelineMode(PGconn *conn);
extern int PQpipelineSync(PGconn *conn);
extern int PQsendFlushRequest(PGconn *conn);
+extern int PQsendPipelineSync(PGconn *conn);
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71cd04c5f23..5f43aa40de4 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -162,6 +162,7 @@ test_multi_pipelines(PGconn *conn)
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ /* first pipeline */
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
@@ -169,14 +170,27 @@ test_multi_pipelines(PGconn *conn)
if (PQpipelineSync(conn) != 1)
pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+ /* second pipeline */
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+ /* Skip flushing once. */
+ if (PQsendPipelineSync(conn) != 1)
+ pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /* third pipeline */
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
+
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
/* OK, start processing the results */
+
+ /* first pipeline */
+
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
@@ -214,6 +228,35 @@ test_multi_pipelines(PGconn *conn)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Unexpected result code %s from second pipeline item",
PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first result");
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+
+ /* third pipeline */
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from third pipeline item",
+ PQresStatus(PQresultStatus(res)));
res = PQgetResult(conn);
if (res != NULL)
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
index 4b9ab07ca42..1ee21f61dce 100644
--- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
@@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0
F 6 Describe P ""
F 9 Execute "" 0
F 4 Sync
+F 21 Parse "" "SELECT $1" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
B 4 ParseComplete
B 4 BindComplete
B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0