diff options
| author | Robert Haas <rhaas@postgresql.org> | 2021-02-05 16:08:45 -0500 | 
|---|---|---|
| committer | Robert Haas <rhaas@postgresql.org> | 2021-02-05 16:08:45 -0500 | 
| commit | 418611c84d004f45d92bcaa3f8e100385d96cd41 (patch) | |
| tree | fc7727b5a72d57a5f1bc655e023d5ae51d6cf1ea /src | |
| parent | e955bd4b6c2bcdbd253837f6cf4c7520b98e69d4 (diff) | |
Generalize parallel slot result handling.
Instead of having a hard-coded behavior that we ignore missing
tables and report all other errors, let the caller decide what
to do by setting a callback.
Mark Dilger, reviewed and somewhat revised by me. The larger patch
series of which this is a part has also had review from Peter
Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul
Sul, but I don't know whether any of them have reviewed this bit
specifically.
Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com
Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com
Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com
Diffstat (limited to 'src')
| -rw-r--r-- | src/bin/scripts/reindexdb.c | 1 | ||||
| -rw-r--r-- | src/bin/scripts/vacuumdb.c | 1 | ||||
| -rw-r--r-- | src/fe_utils/parallel_slot.c | 91 | ||||
| -rw-r--r-- | src/include/fe_utils/parallel_slot.h | 29 | 
4 files changed, 94 insertions, 28 deletions
| diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index 7781fb1151a..9f072ac49ae 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,  			goto finish;  		} +		ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);  		run_reindex_command(free_slot->connection, process_type, objname,  							echo, verbose, concurrently, true); diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index ed320817bc4..9dc8aca29f3 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,  		 * Execute the vacuum.  All errors are handled in processQueryResult  		 * through ParallelSlotsGetIdle.  		 */ +		ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);  		run_vacuum_command(free_slot->connection, sql.data,  						   echo, tabname); diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c index 3987a4702b5..b625deb2545 100644 --- a/src/fe_utils/parallel_slot.c +++ b/src/fe_utils/parallel_slot.c @@ -30,7 +30,7 @@  static void init_slot(ParallelSlot *slot, PGconn *conn);  static int	select_loop(int maxFd, fd_set *workerset); -static bool processQueryResult(PGconn *conn, PGresult *result); +static bool processQueryResult(ParallelSlot *slot, PGresult *result);  static void  init_slot(ParallelSlot *slot, PGconn *conn) @@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn)  	slot->connection = conn;  	/* Initially assume connection is idle */  	slot->isFree = true; +	ParallelSlotClearHandler(slot);  }  /* - * Process (and delete) a query result.  Returns true if there's no error, - * false otherwise -- but errors about trying to work on a missing relation - * are reported and subsequently ignored. + * Process (and delete) a query result.  Returns true if there's no problem, + * false otherwise. It's up to the handler to decide what cosntitutes a + * problem.   */  static bool -processQueryResult(PGconn *conn, PGresult *result) +processQueryResult(ParallelSlot *slot, PGresult *result)  { -	/* -	 * If it's an error, report it.  Errors about a missing table are harmless -	 * so we continue processing; but die for other errors. -	 */ -	if (PQresultStatus(result) != PGRES_COMMAND_OK) -	{ -		char	   *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); +	Assert(slot->handler != NULL); -		pg_log_error("processing of database \"%s\" failed: %s", -					 PQdb(conn), PQerrorMessage(conn)); - -		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) -		{ -			PQclear(result); -			return false; -		} -	} +	/* On failure, the handler should return NULL after freeing the result */ +	if (!slot->handler(result, slot->connection, slot->handler_context)) +		return false; +	/* Ok, we have to free it ourself */  	PQclear(result);  	return true;  } @@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)   * Note that this will block if the connection is busy.   */  static bool -consumeQueryResult(PGconn *conn) +consumeQueryResult(ParallelSlot *slot)  {  	bool		ok = true;  	PGresult   *result; -	SetCancelConn(conn); -	while ((result = PQgetResult(conn)) != NULL) +	SetCancelConn(slot->connection); +	while ((result = PQgetResult(slot->connection)) != NULL)  	{ -		if (!processQueryResult(conn, result)) +		if (!processQueryResult(slot, result))  			ok = false;  	}  	ResetCancelConn(); @@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)  				if (result != NULL)  				{ -					/* Check and discard the command result */ -					if (!processQueryResult(slots[i].connection, result)) +					/* Handle and discard the command result */ +					if (!processQueryResult(slots + i, result))  						return NULL;  				}  				else  				{  					/* This connection has become idle */  					slots[i].isFree = true; +					ParallelSlotClearHandler(slots + i);  					if (firstFree < 0)  						firstFree = i;  					break; @@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)  	for (i = 0; i < numslots; i++)  	{ -		if (!consumeQueryResult((slots + i)->connection)) +		if (!consumeQueryResult(slots + i)) +			return false; +	} + +	return true; +} + +/* + * TableCommandResultHandler + * + * ParallelSlotResultHandler for results of commands (not queries) against + * tables. + * + * Requires that the result status is either PGRES_COMMAND_OK or an error about + * a missing table.  This is useful for utilities that compile a list of tables + * to process and then run commands (vacuum, reindex, or whatever) against + * those tables, as there is a race condition between the time the list is + * compiled and the time the command attempts to open the table. + * + * For missing tables, logs an error but allows processing to continue. + * + * For all other errors, logs an error and terminates further processing. + * + * res: PGresult from the query executed on the slot's connection + * conn: connection belonging to the slot + * context: unused + */ +bool +TableCommandResultHandler(PGresult *res, PGconn *conn, void *context) +{ +	/* +	 * If it's an error, report it.  Errors about a missing table are harmless +	 * so we continue processing; but die for other errors. +	 */ +	if (PQresultStatus(res) != PGRES_COMMAND_OK) +	{ +		char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE); + +		pg_log_error("processing of database \"%s\" failed: %s", +					 PQdb(conn), PQerrorMessage(conn)); + +		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) +		{ +			PQclear(res);  			return false; +		}  	}  	return true; diff --git a/src/include/fe_utils/parallel_slot.h b/src/include/fe_utils/parallel_slot.h index 99eeb3328d6..8902f8d4f48 100644 --- a/src/include/fe_utils/parallel_slot.h +++ b/src/include/fe_utils/parallel_slot.h @@ -15,12 +15,39 @@  #include "fe_utils/connect_utils.h"  #include "libpq-fe.h" +typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn, +										   void *context); +  typedef struct ParallelSlot  {  	PGconn	   *connection;		/* One connection */  	bool		isFree;			/* Is it known to be idle? */ + +	/* +	 * Prior to issuing a command or query on 'connection', a handler callback +	 * function may optionally be registered to be invoked to process the +	 * results, and context information may optionally be registered for use +	 * by the handler.  If unset, these fields should be NULL. +	 */ +	ParallelSlotResultHandler handler; +	void	   *handler_context;  } ParallelSlot; +static inline void +ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler, +					   void *context) +{ +	slot->handler = handler; +	slot->handler_context = context; +} + +static inline void +ParallelSlotClearHandler(ParallelSlot *slot) +{ +	slot->handler = NULL; +	slot->handler_context = NULL; +} +  extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);  extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams, @@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);  extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots); +extern bool TableCommandResultHandler(PGresult *res, PGconn *conn, +									  void *context);  #endif							/* PARALLEL_SLOT_H */ | 
