diff options
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 101 |
1 files changed, 24 insertions, 77 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f1c843e868c..6c01e7b9918 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -23,19 +23,11 @@ #include "pqexpbuffer.h" #include "access/xlog.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/walreceiver.h" +#include "storage/proc.h" #include "utils/builtins.h" -#ifdef HAVE_POLL_H -#include <poll.h> -#endif -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#endif -#ifdef HAVE_SYS_SELECT_H -#include <sys/select.h> -#endif - PG_MODULE_MAGIC; void _PG_init(void); @@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ -static bool libpq_select(int timeout_ms); static PGresult *libpqrcv_PQexec(const char *query); /* @@ -367,67 +358,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli, } /* - * Wait until we can read WAL stream, or timeout. - * - * Returns true if data has become available for reading, false if timed out - * or interrupted by signal. - * - * This is based on pqSocketCheck. - */ -static bool -libpq_select(int timeout_ms) -{ - int ret; - - Assert(streamConn != NULL); - if (PQsocket(streamConn) < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("invalid socket: %s", PQerrorMessage(streamConn)))); - - /* We use poll(2) if available, otherwise select(2) */ - { -#ifdef HAVE_POLL - struct pollfd input_fd; - - input_fd.fd = PQsocket(streamConn); - input_fd.events = POLLIN | POLLERR; - input_fd.revents = 0; - - ret = poll(&input_fd, 1, timeout_ms); -#else /* !HAVE_POLL */ - - fd_set input_mask; - struct timeval timeout; - struct timeval *ptr_timeout; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(streamConn), &input_mask); - - if (timeout_ms < 0) - ptr_timeout = NULL; - else - { - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms % 1000) * 1000; - ptr_timeout = &timeout; - } - - ret = select(PQsocket(streamConn) + 1, &input_mask, - NULL, NULL, ptr_timeout); -#endif /* HAVE_POLL */ - } - - if (ret == 0 || (ret < 0 && errno == EINTR)) - return false; - if (ret < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed: %m"))); - return true; -} - -/* * Send a query and wait for the results by using the asynchronous libpq * functions and the backend version of select(). * @@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query) */ while (PQisBusy(streamConn)) { + int rc; + /* * We don't need to break down the sleep into smaller increments, - * and check for interrupts after each nap, since we can just - * elog(FATAL) within SIGTERM signal handler if the signal arrives - * in the middle of establishment of replication connection. + * since we'll get interrupted by signals and can either handle + * interrupts here or elog(FATAL) within SIGTERM signal handler if + * the signal arrives in the middle of establishment of + * replication connection. */ - if (!libpq_select(-1)) - continue; /* interrupted */ + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | + WL_LATCH_SET, + PQsocket(streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER_READ); + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* interrupted */ + if (rc & WL_LATCH_SET) + { + CHECK_FOR_INTERRUPTS(); + continue; + } if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ } |