summaryrefslogtreecommitdiff
path: root/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c101
1 files changed, 24 insertions, 77 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f1c843e868c..6c01e7b9918 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,19 +23,11 @@
#include "pqexpbuffer.h"
#include "access/xlog.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/walreceiver.h"
+#include "storage/proc.h"
#include "utils/builtins.h"
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
static PGresult *libpqrcv_PQexec(const char *query);
/*
@@ -367,67 +358,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
}
/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
- int ret;
-
- Assert(streamConn != NULL);
- if (PQsocket(streamConn) < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
- /* We use poll(2) if available, otherwise select(2) */
- {
-#ifdef HAVE_POLL
- struct pollfd input_fd;
-
- input_fd.fd = PQsocket(streamConn);
- input_fd.events = POLLIN | POLLERR;
- input_fd.revents = 0;
-
- ret = poll(&input_fd, 1, timeout_ms);
-#else /* !HAVE_POLL */
-
- fd_set input_mask;
- struct timeval timeout;
- struct timeval *ptr_timeout;
-
- FD_ZERO(&input_mask);
- FD_SET(PQsocket(streamConn), &input_mask);
-
- if (timeout_ms < 0)
- ptr_timeout = NULL;
- else
- {
- timeout.tv_sec = timeout_ms / 1000;
- timeout.tv_usec = (timeout_ms % 1000) * 1000;
- ptr_timeout = &timeout;
- }
-
- ret = select(PQsocket(streamConn) + 1, &input_mask,
- NULL, NULL, ptr_timeout);
-#endif /* HAVE_POLL */
- }
-
- if (ret == 0 || (ret < 0 && errno == EINTR))
- return false;
- if (ret < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("select() failed: %m")));
- return true;
-}
-
-/*
* Send a query and wait for the results by using the asynchronous libpq
* functions and the backend version of select().
*
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
*/
while (PQisBusy(streamConn))
{
+ int rc;
+
/*
* We don't need to break down the sleep into smaller increments,
- * and check for interrupts after each nap, since we can just
- * elog(FATAL) within SIGTERM signal handler if the signal arrives
- * in the middle of establishment of replication connection.
+ * since we'll get interrupted by signals and can either handle
+ * interrupts here or elog(FATAL) within SIGTERM signal handler if
+ * the signal arrives in the middle of establishment of
+ * replication connection.
*/
- if (!libpq_select(-1))
- continue; /* interrupted */
+ ResetLatch(&MyProc->procLatch);
+ rc = WaitLatchOrSocket(&MyProc->procLatch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_LATCH_SET,
+ PQsocket(streamConn),
+ 0,
+ WAIT_EVENT_LIBPQWALRECEIVER_READ);
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+
+ /* interrupted */
+ if (rc & WL_LATCH_SET)
+ {
+ CHECK_FOR_INTERRUPTS();
+ continue;
+ }
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
}