diff options
author | Noah Misch <noah@leadboat.com> | 2024-01-08 11:39:56 -0800 |
---|---|---|
committer | Noah Misch <noah@leadboat.com> | 2025-04-03 09:34:01 -0700 |
commit | 63f6ecb6b023143becd4d8a580c23c0a3259dc84 (patch) | |
tree | 3ba0335293540bf45ca91dec7aa3961b7de73765 /src | |
parent | 9e129a2243e2c1e5b7107cec06262aad26533f0c (diff) |
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query). Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side. The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these.
The original commit d3c5f37dd543498cc7c678815d3921823beec9e9 did not
back-patch. Back-patch now to v16-v13, bringing coverage to all supported
versions. This back-patch omits the orignal's refactoring in postgres_fdw.
Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 9 | ||||
-rw-r--r-- | src/include/libpq/libpq-be-fe-helpers.h | 127 |
2 files changed, 130 insertions, 6 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index a468420dcaf..60f906dce60 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -639,12 +639,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Send a query and wait for the results by using the asynchronous libpq * functions and socket readiness events. * - * We must not use the regular blocking libpq functions like PQexec() - * since they are uninterruptible by signals on some platforms, such as - * Windows. - * - * The function is modeled on PQexec() in libpq, but only implements - * those parts that are in use in the walreceiver api. + * The function is modeled on libpqsrv_exec(), with the behavior difference + * being that it calls ProcessWalRcvInterrupts(). As an optimization, it + * skips try/catch, since all errors terminate the process. * * May return NULL, rather than an error result, on failure. */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 41e3bb4376a..a4b3e805b9d 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -49,6 +49,8 @@ static inline void libpqsrv_connect_prepare(void); static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); /* @@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) PG_END_TRY(); } +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + #endif /* LIBPQ_BE_FE_HELPERS_H */ |