summaryrefslogtreecommitdiff
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c139
1 files changed, 95 insertions, 44 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index 0b3d0ef2f008..6e96f108473e 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -120,6 +120,57 @@ static void release_in_xmit(struct rds_conn_path *cp)
}
/*
+ * Helper function for multipath fanout to ensure lane 0 transmits queued
+ * messages before other lanes to prevent out-of-order delivery.
+ *
+ * Returns true if lane 0 still has messages or false otherwise
+ */
+static bool rds_mprds_cp0_catchup(struct rds_connection *conn)
+{
+ struct rds_conn_path *cp0 = conn->c_path;
+ struct rds_message *rm0;
+ unsigned long flags;
+ bool ret = false;
+
+ spin_lock_irqsave(&cp0->cp_lock, flags);
+
+ /* the oldest / first message in the retransmit queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ if (!list_empty(&cp0->cp_retrans)) {
+ rm0 = list_entry(cp0->cp_retrans.next, struct rds_message,
+ m_conn_item);
+ if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the retransmit queue of cp_index#0 has not
+ * quite caught up yet
+ */
+ ret = true;
+ goto unlock;
+ }
+ }
+
+ /* the oldest / first message of the send queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ rm0 = cp0->cp_xmit_rm;
+ if (!rm0 && !list_empty(&cp0->cp_send_queue))
+ rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message,
+ m_conn_item);
+ if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the send queue of cp_index#0 has not quite
+ * caught up yet
+ */
+ ret = true;
+ }
+
+unlock:
+ spin_unlock_irqrestore(&cp0->cp_lock, flags);
+ return ret;
+}
+
+/*
* We're making the conscious trade-off here to only send one message
* down the connection at a time.
* Pro:
@@ -248,6 +299,14 @@ restart:
if (batch_count >= send_batch_count)
goto over_batch;
+ /* make sure cp_index#0 caught up during fan-out in
+ * order to avoid lane races
+ */
+ if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) {
+ rds_stats_inc(s_mprds_catchup_tx0_retries);
+ goto over_batch;
+ }
+
spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&cp->cp_send_queue)) {
@@ -458,7 +517,8 @@ over_batch:
if (rds_destroy_pending(cp->cp_conn))
ret = -ENETUNREACH;
else
- queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+ queue_delayed_work(cp->cp_wq,
+ &cp->cp_send_w, 1);
rcu_read_unlock();
} else if (raced) {
rds_stats_inc(s_send_lock_queue_raced);
@@ -1041,39 +1101,6 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}
-static int rds_send_mprds_hash(struct rds_sock *rs,
- struct rds_connection *conn, int nonblock)
-{
- int hash;
-
- if (conn->c_npaths == 0)
- hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
- else
- hash = RDS_MPATH_HASH(rs, conn->c_npaths);
- if (conn->c_npaths == 0 && hash != 0) {
- rds_send_ping(conn, 0);
-
- /* The underlying connection is not up yet. Need to wait
- * until it is up to be sure that the non-zero c_path can be
- * used. But if we are interrupted, we have to use the zero
- * c_path in case the connection ends up being non-MP capable.
- */
- if (conn->c_npaths == 0) {
- /* Cannot wait for the connection be made, so just use
- * the base c_path.
- */
- if (nonblock)
- return 0;
- if (wait_event_interruptible(conn->c_hs_waitq,
- conn->c_npaths != 0))
- hash = 0;
- }
- if (conn->c_npaths == 1)
- hash = 0;
- }
- return hash;
-}
-
static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
{
struct rds_rdma_args *args;
@@ -1303,10 +1330,32 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
rs->rs_conn = conn;
}
- if (conn->c_trans->t_mp_capable)
- cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
- else
+ if (conn->c_trans->t_mp_capable) {
+ /* Use c_path[0] until we learn that
+ * the peer supports more (c_npaths > 1)
+ */
+ cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)];
+ } else {
cpath = &conn->c_path[0];
+ }
+
+ /* If we're multipath capable and path 0 is down, queue reconnect
+ * and send a ping. This initiates the multipath handshake through
+ * rds_send_probe(), which sends RDS_EXTHDR_NPATHS to the peer,
+ * starting multipath capability negotiation.
+ */
+ if (conn->c_trans->t_mp_capable &&
+ !rds_conn_path_up(&conn->c_path[0])) {
+ /* Ensures that only one request is queued. And
+ * rds_send_ping() ensures that only one ping is
+ * outstanding.
+ */
+ if (!test_and_set_bit(RDS_RECONNECT_PENDING,
+ &conn->c_path[0].cp_flags))
+ queue_delayed_work(conn->c_path[0].cp_wq,
+ &conn->c_path[0].cp_conn_w, 0);
+ rds_send_ping(conn, 0);
+ }
rm->m_conn_path = cpath;
@@ -1380,7 +1429,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
if (rds_destroy_pending(cpath->cp_conn))
ret = -ENETUNREACH;
else
- queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+ queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1);
rcu_read_unlock();
}
if (ret)
@@ -1456,24 +1505,26 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
cp->cp_conn->c_trans->t_mp_capable) {
__be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
__be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
+ u8 dummy = 0;
rds_message_add_extension(&rm->m_inc.i_hdr,
- RDS_EXTHDR_NPATHS, &npaths,
- sizeof(npaths));
+ RDS_EXTHDR_NPATHS, &npaths);
rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_GEN_NUM,
- &my_gen_num,
- sizeof(u32));
+ &my_gen_num);
+ rds_message_add_extension(&rm->m_inc.i_hdr,
+ RDS_EXTHDR_SPORT_IDX,
+ &dummy);
}
spin_unlock_irqrestore(&cp->cp_lock, flags);
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
- /* schedule the send work on rds_wq */
+ /* schedule the send work on cp_wq */
rcu_read_lock();
if (!rds_destroy_pending(cp->cp_conn))
- queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+ queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1);
rcu_read_unlock();
rds_message_put(rm);