diff options
Diffstat (limited to 'net/smc/smc_rx.c')
| -rw-r--r-- | net/smc/smc_rx.c | 308 | 
1 files changed, 265 insertions, 43 deletions
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c index eff4e0d0bb31..3d77b383cccd 100644 --- a/net/smc/smc_rx.c +++ b/net/smc/smc_rx.c @@ -22,11 +22,10 @@  #include "smc_tx.h" /* smc_tx_consumer_update() */  #include "smc_rx.h" -/* callback implementation for sk.sk_data_ready() - * to wakeup rcvbuf consumers that blocked with smc_rx_wait_data(). +/* callback implementation to wakeup consumers blocked with smc_rx_wait().   * indirectly called by smc_cdc_msg_recv_action().   */ -static void smc_rx_data_ready(struct sock *sk) +static void smc_rx_wake_up(struct sock *sk)  {  	struct socket_wq *wq; @@ -44,28 +43,180 @@ static void smc_rx_data_ready(struct sock *sk)  	rcu_read_unlock();  } +/* Update consumer cursor + *   @conn   connection to update + *   @cons   consumer cursor + *   @len    number of Bytes consumed + *   Returns: + *   1 if we should end our receive, 0 otherwise + */ +static int smc_rx_update_consumer(struct smc_sock *smc, +				  union smc_host_cursor cons, size_t len) +{ +	struct smc_connection *conn = &smc->conn; +	struct sock *sk = &smc->sk; +	bool force = false; +	int diff, rc = 0; + +	smc_curs_add(conn->rmb_desc->len, &cons, len); + +	/* did we process urgent data? */ +	if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { +		diff = smc_curs_comp(conn->rmb_desc->len, &cons, +				     &conn->urg_curs); +		if (sock_flag(sk, SOCK_URGINLINE)) { +			if (diff == 0) { +				force = true; +				rc = 1; +				conn->urg_state = SMC_URG_READ; +			} +		} else { +			if (diff == 1) { +				/* skip urgent byte */ +				force = true; +				smc_curs_add(conn->rmb_desc->len, &cons, 1); +				conn->urg_rx_skip_pend = false; +			} else if (diff < -1) +				/* we read past urgent byte */ +				conn->urg_state = SMC_URG_READ; +		} +	} + +	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn), +		       conn); + +	/* send consumer cursor update if required */ +	/* similar to advertising new TCP rcv_wnd if required */ +	smc_tx_consumer_update(conn, force); + +	return rc; +} + +static void smc_rx_update_cons(struct smc_sock *smc, size_t len) +{ +	struct smc_connection *conn = &smc->conn; +	union smc_host_cursor cons; + +	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn), +		       conn); +	smc_rx_update_consumer(smc, cons, len); +} + +struct smc_spd_priv { +	struct smc_sock *smc; +	size_t		 len; +}; + +static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, +				    struct pipe_buffer *buf) +{ +	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; +	struct smc_sock *smc = priv->smc; +	struct smc_connection *conn; +	struct sock *sk = &smc->sk; + +	if (sk->sk_state == SMC_CLOSED || +	    sk->sk_state == SMC_PEERFINCLOSEWAIT || +	    sk->sk_state == SMC_APPFINCLOSEWAIT) +		goto out; +	conn = &smc->conn; +	lock_sock(sk); +	smc_rx_update_cons(smc, priv->len); +	release_sock(sk); +	if (atomic_sub_and_test(priv->len, &conn->splice_pending)) +		smc_rx_wake_up(sk); +out: +	kfree(priv); +	put_page(buf->page); +	sock_put(sk); +} + +static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe, +				   struct pipe_buffer *buf) +{ +	return 1; +} + +static const struct pipe_buf_operations smc_pipe_ops = { +	.can_merge = 0, +	.confirm = generic_pipe_buf_confirm, +	.release = smc_rx_pipe_buf_release, +	.steal = smc_rx_pipe_buf_nosteal, +	.get = generic_pipe_buf_get +}; + +static void smc_rx_spd_release(struct splice_pipe_desc *spd, +			       unsigned int i) +{ +	put_page(spd->pages[i]); +} + +static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, +			 struct smc_sock *smc) +{ +	struct splice_pipe_desc spd; +	struct partial_page partial; +	struct smc_spd_priv *priv; +	struct page *page; +	int bytes; + +	page = virt_to_page(smc->conn.rmb_desc->cpu_addr); +	priv = kzalloc(sizeof(*priv), GFP_KERNEL); +	if (!priv) +		return -ENOMEM; +	priv->len = len; +	priv->smc = smc; +	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; +	partial.len = len; +	partial.private = (unsigned long)priv; + +	spd.nr_pages_max = 1; +	spd.nr_pages = 1; +	spd.pages = &page; +	spd.partial = &partial; +	spd.ops = &smc_pipe_ops; +	spd.spd_release = smc_rx_spd_release; + +	bytes = splice_to_pipe(pipe, &spd); +	if (bytes > 0) { +		sock_hold(&smc->sk); +		get_page(smc->conn.rmb_desc->pages); +		atomic_add(bytes, &smc->conn.splice_pending); +	} + +	return bytes; +} + +static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) +{ +	return atomic_read(&conn->bytes_to_rcv) && +	       !atomic_read(&conn->splice_pending); +} +  /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted   *   @smc    smc socket   *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout + *   @fcrit  add'l criterion to evaluate as function pointer   * Returns:   * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.   * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).   */ -static int smc_rx_wait_data(struct smc_sock *smc, long *timeo) +int smc_rx_wait(struct smc_sock *smc, long *timeo, +		int (*fcrit)(struct smc_connection *conn))  {  	DEFINE_WAIT_FUNC(wait, woken_wake_function);  	struct smc_connection *conn = &smc->conn;  	struct sock *sk = &smc->sk;  	int rc; -	if (atomic_read(&conn->bytes_to_rcv)) +	if (fcrit(conn))  		return 1;  	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);  	add_wait_queue(sk_sleep(sk), &wait);  	rc = sk_wait_event(sk, timeo,  			   sk->sk_err ||  			   sk->sk_shutdown & RCV_SHUTDOWN || -			   atomic_read(&conn->bytes_to_rcv) || +			   fcrit(conn) ||  			   smc_cdc_rxed_any_close_or_senddone(conn),  			   &wait);  	remove_wait_queue(sk_sleep(sk), &wait); @@ -73,65 +224,115 @@ static int smc_rx_wait_data(struct smc_sock *smc, long *timeo)  	return rc;  } -/* rcvbuf consumer: main API called by socket layer. - * called under sk lock. +static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, +			   int flags) +{ +	struct smc_connection *conn = &smc->conn; +	union smc_host_cursor cons; +	struct sock *sk = &smc->sk; +	int rc = 0; + +	if (sock_flag(sk, SOCK_URGINLINE) || +	    !(conn->urg_state == SMC_URG_VALID) || +	    conn->urg_state == SMC_URG_READ) +		return -EINVAL; + +	if (conn->urg_state == SMC_URG_VALID) { +		if (!(flags & MSG_PEEK)) +			smc->conn.urg_state = SMC_URG_READ; +		msg->msg_flags |= MSG_OOB; +		if (len > 0) { +			if (!(flags & MSG_TRUNC)) +				rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); +			len = 1; +			smc_curs_write(&cons, +				       smc_curs_read(&conn->local_tx_ctrl.cons, +						     conn), +				       conn); +			if (smc_curs_diff(conn->rmb_desc->len, &cons, +					  &conn->urg_curs) > 1) +				conn->urg_rx_skip_pend = true; +			/* Urgent Byte was already accounted for, but trigger +			 * skipping the urgent byte in non-inline case +			 */ +			if (!(flags & MSG_PEEK)) +				smc_rx_update_consumer(smc, cons, 0); +		} else { +			msg->msg_flags |= MSG_TRUNC; +		} + +		return rc ? -EFAULT : len; +	} + +	if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) +		return 0; + +	return -EAGAIN; +} + +/* smc_rx_recvmsg - receive data from RMBE + * @msg:	copy data to receive buffer + * @pipe:	copy data to pipe if set - indicates splice() call + * + * rcvbuf consumer: main API called by socket layer. + * Called under sk lock.   */ -int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len, -		   int flags) +int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, +		   struct pipe_inode_info *pipe, size_t len, int flags)  {  	size_t copylen, read_done = 0, read_remaining = len;  	size_t chunk_len, chunk_off, chunk_len_sum;  	struct smc_connection *conn = &smc->conn; +	int (*func)(struct smc_connection *conn);  	union smc_host_cursor cons;  	int readable, chunk;  	char *rcvbuf_base;  	struct sock *sk; +	int splbytes;  	long timeo;  	int target;		/* Read at least these many bytes */  	int rc;  	if (unlikely(flags & MSG_ERRQUEUE))  		return -EINVAL; /* future work for sk.sk_family == AF_SMC */ -	if (flags & MSG_OOB) -		return -EINVAL; /* future work */  	sk = &smc->sk;  	if (sk->sk_state == SMC_LISTEN)  		return -ENOTCONN; +	if (flags & MSG_OOB) +		return smc_rx_recv_urg(smc, msg, len, flags);  	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);  	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); -	msg->msg_namelen = 0;  	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */  	rcvbuf_base = conn->rmb_desc->cpu_addr;  	do { /* while (read_remaining) */ -		if (read_done >= target) +		if (read_done >= target || (pipe && read_done))  			break;  		if (atomic_read(&conn->bytes_to_rcv))  			goto copy; +		else if (conn->urg_state == SMC_URG_VALID) +			/* we received a single urgent Byte - skip */ +			smc_rx_update_cons(smc, 0); + +		if (sk->sk_shutdown & RCV_SHUTDOWN || +		    smc_cdc_rxed_any_close_or_senddone(conn) || +		    conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) +			break;  		if (read_done) {  			if (sk->sk_err ||  			    sk->sk_state == SMC_CLOSED || -			    sk->sk_shutdown & RCV_SHUTDOWN ||  			    !timeo || -			    signal_pending(current) || -			    smc_cdc_rxed_any_close_or_senddone(conn) || -			    conn->local_tx_ctrl.conn_state_flags. -			    peer_conn_abort) +			    signal_pending(current))  				break;  		} else {  			if (sk->sk_err) {  				read_done = sock_error(sk);  				break;  			} -			if (sk->sk_shutdown & RCV_SHUTDOWN || -			    smc_cdc_rxed_any_close_or_senddone(conn) || -			    conn->local_tx_ctrl.conn_state_flags. -			    peer_conn_abort) -				break;  			if (sk->sk_state == SMC_CLOSED) {  				if (!sock_flag(sk, SOCK_DONE)) {  					/* This occurs when user tries to read @@ -150,32 +351,56 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,  				return -EAGAIN;  		} -		if (!atomic_read(&conn->bytes_to_rcv)) { -			smc_rx_wait_data(smc, &timeo); +		if (!smc_rx_data_available(conn)) { +			smc_rx_wait(smc, &timeo, smc_rx_data_available);  			continue;  		}  copy:  		/* initialize variables for 1st iteration of subsequent loop */ -		/* could be just 1 byte, even after smc_rx_wait_data above */ +		/* could be just 1 byte, even after waiting on data above */  		readable = atomic_read(&conn->bytes_to_rcv); -		/* not more than what user space asked for */ -		copylen = min_t(size_t, read_remaining, readable); +		splbytes = atomic_read(&conn->splice_pending); +		if (!readable || (msg && splbytes)) { +			if (splbytes) +				func = smc_rx_data_available_and_no_splice_pend; +			else +				func = smc_rx_data_available; +			smc_rx_wait(smc, &timeo, func); +			continue; +		} +  		smc_curs_write(&cons,  			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),  			       conn); +		/* subsequent splice() calls pick up where previous left */ +		if (splbytes) +			smc_curs_add(conn->rmb_desc->len, &cons, splbytes); +		if (conn->urg_state == SMC_URG_VALID && +		    sock_flag(&smc->sk, SOCK_URGINLINE) && +		    readable > 1) +			readable--;	/* always stop at urgent Byte */ +		/* not more than what user space asked for */ +		copylen = min_t(size_t, read_remaining, readable);  		/* determine chunks where to read from rcvbuf */  		/* either unwrapped case, or 1st chunk of wrapped case */ -		chunk_len = min_t(size_t, -				  copylen, conn->rmbe_size - cons.count); +		chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - +				  cons.count);  		chunk_len_sum = chunk_len;  		chunk_off = cons.count;  		smc_rmb_sync_sg_for_cpu(conn);  		for (chunk = 0; chunk < 2; chunk++) {  			if (!(flags & MSG_TRUNC)) { -				rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off, -						   chunk_len); -				if (rc) { +				if (msg) { +					rc = memcpy_to_msg(msg, rcvbuf_base + +							   chunk_off, +							   chunk_len); +				} else { +					rc = smc_rx_splice(pipe, rcvbuf_base + +							chunk_off, chunk_len, +							smc); +				} +				if (rc < 0) {  					if (!read_done)  						read_done = -EFAULT;  					smc_rmb_sync_sg_for_device(conn); @@ -196,18 +421,13 @@ copy:  		/* update cursors */  		if (!(flags & MSG_PEEK)) { -			smc_curs_add(conn->rmbe_size, &cons, copylen);  			/* increased in recv tasklet smc_cdc_msg_rcv() */  			smp_mb__before_atomic();  			atomic_sub(copylen, &conn->bytes_to_rcv); -			/* guarantee 0 <= bytes_to_rcv <= rmbe_size */ +			/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */  			smp_mb__after_atomic(); -			smc_curs_write(&conn->local_tx_ctrl.cons, -				       smc_curs_read(&cons, conn), -				       conn); -			/* send consumer cursor update if required */ -			/* similar to advertising new TCP rcv_wnd if required */ -			smc_tx_consumer_update(conn); +			if (msg && smc_rx_update_consumer(smc, cons, copylen)) +				goto out;  		}  	} while (read_remaining);  out: @@ -217,5 +437,7 @@ out:  /* Initialize receive properties on connection establishment. NB: not __init! */  void smc_rx_init(struct smc_sock *smc)  { -	smc->sk.sk_data_ready = smc_rx_data_ready; +	smc->sk.sk_data_ready = smc_rx_wake_up; +	atomic_set(&smc->conn.splice_pending, 0); +	smc->conn.urg_state = SMC_URG_READ;  }  | 
