diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/interfaces/libpq/exports.txt | 1 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 54 | ||||
-rw-r--r-- | src/interfaces/libpq/libpq-fe.h | 1 | ||||
-rw-r--r-- | src/test/modules/libpq_pipeline/libpq_pipeline.c | 43 | ||||
-rw-r--r-- | src/test/modules/libpq_pipeline/traces/multi_pipelines.trace | 11 |
5 files changed, 100 insertions, 10 deletions
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 28b861fd93b..088592deb16 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -192,3 +192,4 @@ PQclosePortal 189 PQsendClosePrepared 190 PQsendClosePortal 191 PQchangePassword 192 +PQsendPipelineSync 193 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index e1768d5475d..52d41658c1f 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -81,6 +81,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type, const char *target); static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); +static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush); static int pqPipelineFlush(PGconn *conn); @@ -3224,25 +3225,48 @@ pqPipelineProcessQueue(PGconn *conn) /* * PQpipelineSync * Send a Sync message as part of a pipeline, and flush to server + */ +int +PQpipelineSync(PGconn *conn) +{ + return pqPipelineSyncInternal(conn, true); +} + +/* + * PQsendPipelineSync + * Send a Sync message as part of a pipeline, without flushing to server + */ +int +PQsendPipelineSync(PGconn *conn) +{ + return pqPipelineSyncInternal(conn, false); +} + +/* + * Workhorse function for PQpipelineSync and PQsendPipelineSync. * * It's legal to start submitting more commands in the pipeline immediately, * without waiting for the results of the current pipeline. There's no need to * end pipeline mode and start it again. * - * If a command in a pipeline fails, every subsequent command up to and including - * the result to the Sync message sent by PQpipelineSync gets set to - * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without - * error, a PGresult with PGRES_PIPELINE_SYNC is produced. + * If a command in a pipeline fails, every subsequent command up to and + * including the result to the Sync message sent by pqPipelineSyncInternal + * gets set to PGRES_PIPELINE_ABORTED state. If the whole pipeline is + * processed without error, a PGresult with PGRES_PIPELINE_SYNC is produced. * - * Queries can already have been sent before PQpipelineSync is called, but - * PQpipelineSync needs to be called before retrieving command results. + * Queries can already have been sent before pqPipelineSyncInternal is called, + * but pqPipelineSyncInternal needs to be called before retrieving command + * results. * * The connection will remain in pipeline mode and unavailable for new * synchronous command execution functions until all results from the pipeline * are processed by the client. + * + * immediate_flush controls if the flush happens immediately after sending the + * Sync message or not. */ -int -PQpipelineSync(PGconn *conn) +static int +pqPipelineSyncInternal(PGconn *conn, bool immediate_flush) { PGcmdQueueEntry *entry; @@ -3288,9 +3312,19 @@ PQpipelineSync(PGconn *conn) /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. + * If immediate_flush is disabled, the data is pushed if we are past the + * size threshold. */ - if (PQflush(conn) < 0) - goto sendFailed; + if (immediate_flush) + { + if (pqFlush(conn) < 0) + goto sendFailed; + } + else + { + if (pqPipelineFlush(conn) < 0) + goto sendFailed; + } /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f0ec660cb69..defc415fa3f 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn); extern int PQexitPipelineMode(PGconn *conn); extern int PQpipelineSync(PGconn *conn); extern int PQsendFlushRequest(PGconn *conn); +extern int PQsendPipelineSync(PGconn *conn); /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 71cd04c5f23..5f43aa40de4 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -162,6 +162,7 @@ test_multi_pipelines(PGconn *conn) if (PQenterPipelineMode(conn) != 1) pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + /* first pipeline */ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, dummy_params, NULL, NULL, 0) != 1) pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn)); @@ -169,14 +170,27 @@ test_multi_pipelines(PGconn *conn) if (PQpipelineSync(conn) != 1) pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + /* second pipeline */ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, dummy_params, NULL, NULL, 0) != 1) pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn)); + /* Skip flushing once. */ + if (PQsendPipelineSync(conn) != 1) + pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + + /* third pipeline */ + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); /* OK, start processing the results */ + + /* first pipeline */ + res = PQgetResult(conn); if (res == NULL) pg_fatal("PQgetResult returned null when there's a pipeline item: %s", @@ -214,6 +228,35 @@ test_multi_pipelines(PGconn *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) pg_fatal("Unexpected result code %s from second pipeline item", PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result"); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of sync result, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + + /* third pipeline */ + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from third pipeline item", + PQresStatus(PQresultStatus(res))); res = PQgetResult(conn); if (res != NULL) diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace index 4b9ab07ca42..1ee21f61dce 100644 --- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace +++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace @@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0 F 6 Describe P "" F 9 Execute "" 0 F 4 Sync +F 21 Parse "" "SELECT $1" 1 NNNN +F 19 Bind "" "" 0 1 1 '1' 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Sync +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +B 5 ReadyForQuery I B 4 ParseComplete B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 |