summaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c223
1 files changed, 205 insertions, 18 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index f753c6e2324..8c64d42dda2 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
+ bool parallel_commit; /* do we commit (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
@@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
static void configure_remote_session(PGconn *conn);
+static void do_sql_command_begin(PGconn *conn, const char *sql);
+static void do_sql_command_end(PGconn *conn, const char *sql,
+ bool consume_input);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -100,6 +104,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
void *arg);
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
+static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
@@ -107,6 +112,9 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
bool toplevel);
+static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
+static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
+ int curlevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
* is changed will be closed and re-made later.
*
* By default, all the connections to any foreign servers are kept open.
+ *
+ * Also determine whether to commit (sub)transactions opened on the remote
+ * server in parallel at (sub)transaction end.
*/
entry->keep_connections = true;
+ entry->parallel_commit = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "keep_connections") == 0)
entry->keep_connections = defGetBoolean(def);
+ else if (strcmp(def->defname, "parallel_commit") == 0)
+ entry->parallel_commit = defGetBoolean(def);
}
/* Now try to make the connection */
@@ -623,10 +637,30 @@ configure_remote_session(PGconn *conn)
void
do_sql_command(PGconn *conn, const char *sql)
{
- PGresult *res;
+ do_sql_command_begin(conn, sql);
+ do_sql_command_end(conn, sql, false);
+}
+static void
+do_sql_command_begin(PGconn *conn, const char *sql)
+{
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
+}
+
+static void
+do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
+{
+ PGresult *res;
+
+ /*
+ * If requested, consume whatever data is available from the socket.
+ * (Note that if all data is available, this allows pgfdw_get_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
@@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
+ List *pending_entries = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
@@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Commit all remote transactions during pre-commit */
entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
+ pending_entries = lappend(pending_entries, entry);
+ continue;
+ }
do_sql_command(entry->conn, "COMMIT TRANSACTION");
entry->changing_xact_state = false;
@@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* Reset state to show we're out of a transaction */
- entry->xact_depth = 0;
+ pgfdw_reset_xact_state(entry, true);
+ }
- /*
- * If the connection isn't in a good idle state, it is marked as
- * invalid or keep_connections option of its server is disabled, then
- * discard it to recover. Next GetConnection will open a new
- * connection.
- */
- if (PQstatus(entry->conn) != CONNECTION_OK ||
- PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
- entry->changing_xact_state ||
- entry->invalidated ||
- !entry->keep_connections)
- {
- elog(DEBUG3, "discarding connection %p", entry->conn);
- disconnect_pg_server(entry);
- }
+ /* If there are any pending connections, finish cleaning them up */
+ if (pending_entries)
+ {
+ Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+ event == XACT_EVENT_PRE_COMMIT);
+ pgfdw_finish_pre_commit_cleanup(pending_entries);
}
/*
@@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
int curlevel;
+ List *pending_entries = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
/* Commit all remote subtransactions during pre-commit */
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, sql);
+ pending_entries = lappend(pending_entries, entry);
+ continue;
+ }
do_sql_command(entry->conn, sql);
entry->changing_xact_state = false;
}
@@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* OK, we're outta that level of subtransaction */
- entry->xact_depth--;
+ pgfdw_reset_xact_state(entry, false);
+ }
+
+ /* If there are any pending connections, finish cleaning them up */
+ if (pending_entries)
+ {
+ Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
+ pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
}
}
@@ -1170,6 +1217,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
}
/*
+ * Reset state to show we're out of a (sub)transaction.
+ */
+static void
+pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
+{
+ if (toplevel)
+ {
+ /* Reset state to show we're out of a transaction */
+ entry->xact_depth = 0;
+
+ /*
+ * If the connection isn't in a good idle state, it is marked as
+ * invalid or keep_connections option of its server is disabled, then
+ * discard it to recover. Next GetConnection will open a new
+ * connection.
+ */
+ if (PQstatus(entry->conn) != CONNECTION_OK ||
+ PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
+ entry->changing_xact_state ||
+ entry->invalidated ||
+ !entry->keep_connections)
+ {
+ elog(DEBUG3, "discarding connection %p", entry->conn);
+ disconnect_pg_server(entry);
+ }
+ }
+ else
+ {
+ /* Reset state to show we're out of a subtransaction */
+ entry->xact_depth--;
+ }
+}
+
+/*
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
* and discard any pending result, and false if not.
@@ -1457,6 +1538,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
}
/*
+ * Finish pre-commit cleanup of connections on each of which we've sent a
+ * COMMIT command to the remote server.
+ */
+static void
+pgfdw_finish_pre_commit_cleanup(List *pending_entries)
+{
+ ConnCacheEntry *entry;
+ List *pending_deallocs = NIL;
+ ListCell *lc;
+
+ Assert(pending_entries);
+
+ /*
+ * Get the result of the COMMIT command for each of the pending entries
+ */
+ foreach(lc, pending_entries)
+ {
+ entry = (ConnCacheEntry *) lfirst(lc);
+
+ Assert(entry->changing_xact_state);
+ /*
+ * We might already have received the result on the socket, so pass
+ * consume_input=true to try to consume it first
+ */
+ do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
+ entry->changing_xact_state = false;
+
+ /* Do a DEALLOCATE ALL in parallel if needed */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
+ if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ {
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ pgfdw_reset_xact_state(entry, true);
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_deallocs)
+ return;
+
+ /*
+ * Get the result of the DEALLOCATE command for each of the pending
+ * entries
+ */
+ foreach(lc, pending_deallocs)
+ {
+ PGresult *res;
+
+ entry = (ConnCacheEntry *) lfirst(lc);
+
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
+ while ((res = PQgetResult(entry->conn)) != NULL)
+ {
+ PQclear(res);
+ /* Stop if the connection is lost (else we'll loop infinitely) */
+ if (PQstatus(entry->conn) == CONNECTION_BAD)
+ break;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ pgfdw_reset_xact_state(entry, true);
+ }
+}
+
+/*
+ * Finish pre-subcommit cleanup of connections on each of which we've sent a
+ * RELEASE command to the remote server.
+ */
+static void
+pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
+{
+ ConnCacheEntry *entry;
+ char sql[100];
+ ListCell *lc;
+
+ Assert(pending_entries);
+
+ /*
+ * Get the result of the RELEASE command for each of the pending entries
+ */
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+ foreach(lc, pending_entries)
+ {
+ entry = (ConnCacheEntry *) lfirst(lc);
+
+ Assert(entry->changing_xact_state);
+ /*
+ * We might already have received the result on the socket, so pass
+ * consume_input=true to try to consume it first
+ */
+ do_sql_command_end(entry->conn, sql, true);
+ entry->changing_xact_state = false;
+
+ pgfdw_reset_xact_state(entry, false);
+ }
+}
+
+/*
* List active foreign server connections.
*
* This function takes no input parameter and returns setof record made of