summaryrefslogtreecommitdiff
path: root/fs/smb/client/smbdirect.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/smb/client/smbdirect.c')
-rw-r--r--fs/smb/client/smbdirect.c1199
1 files changed, 732 insertions, 467 deletions
diff --git a/fs/smb/client/smbdirect.c b/fs/smb/client/smbdirect.c
index e0fce5033004..316f398c70f4 100644
--- a/fs/smb/client/smbdirect.c
+++ b/fs/smb/client/smbdirect.c
@@ -13,28 +13,35 @@
#include "cifsproto.h"
#include "smb2proto.h"
+const struct smbdirect_socket_parameters *smbd_get_parameters(struct smbd_connection *conn)
+{
+ struct smbdirect_socket *sc = &conn->socket;
+
+ return &sc->parameters;
+}
+
static struct smbdirect_recv_io *get_receive_buffer(
- struct smbd_connection *info);
+ struct smbdirect_socket *sc);
static void put_receive_buffer(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct smbdirect_recv_io *response);
-static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
-static void destroy_receive_buffers(struct smbd_connection *info);
+static int allocate_receive_buffers(struct smbdirect_socket *sc, int num_buf);
+static void destroy_receive_buffers(struct smbdirect_socket *sc);
static void enqueue_reassembly(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct smbdirect_recv_io *response, int data_length);
static struct smbdirect_recv_io *_get_first_reassembly(
- struct smbd_connection *info);
+ struct smbdirect_socket *sc);
static int smbd_post_recv(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct smbdirect_recv_io *response);
-static int smbd_post_send_empty(struct smbd_connection *info);
+static int smbd_post_send_empty(struct smbdirect_socket *sc);
-static void destroy_mr_list(struct smbd_connection *info);
-static int allocate_mr_list(struct smbd_connection *info);
+static void destroy_mr_list(struct smbdirect_socket *sc);
+static int allocate_mr_list(struct smbdirect_socket *sc);
struct smb_extract_to_rdma {
struct ib_sge *sge;
@@ -57,6 +64,9 @@ static ssize_t smb_extract_iter_to_rdma(struct iov_iter *iter, size_t len,
/* SMBD negotiation timeout in seconds */
#define SMBD_NEGOTIATE_TIMEOUT 120
+/* The timeout to wait for a keepalive message from peer in seconds */
+#define KEEPALIVE_RECV_TIMEOUT 5
+
/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
#define SMBD_MIN_RECEIVE_SIZE 128
#define SMBD_MIN_FRAGMENTED_SIZE 131072
@@ -155,65 +165,277 @@ do { \
#define log_rdma_mr(level, fmt, args...) \
log_rdma(level, LOG_RDMA_MR, fmt, ##args)
+static void smbd_disconnect_wake_up_all(struct smbdirect_socket *sc)
+{
+ /*
+ * Wake up all waiters in all wait queues
+ * in order to notice the broken connection.
+ */
+ wake_up_all(&sc->status_wait);
+ wake_up_all(&sc->send_io.credits.wait_queue);
+ wake_up_all(&sc->send_io.pending.dec_wait_queue);
+ wake_up_all(&sc->send_io.pending.zero_wait_queue);
+ wake_up_all(&sc->recv_io.reassembly.wait_queue);
+ wake_up_all(&sc->mr_io.ready.wait_queue);
+ wake_up_all(&sc->mr_io.cleanup.wait_queue);
+}
+
static void smbd_disconnect_rdma_work(struct work_struct *work)
{
- struct smbd_connection *info =
- container_of(work, struct smbd_connection, disconnect_work);
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket *sc =
+ container_of(work, struct smbdirect_socket, disconnect_work);
- if (sc->status == SMBDIRECT_SOCKET_CONNECTED) {
+ /*
+ * make sure this and other work is not queued again
+ * but here we don't block and avoid
+ * disable[_delayed]_work_sync()
+ */
+ disable_work(&sc->disconnect_work);
+ disable_work(&sc->recv_io.posted.refill_work);
+ disable_work(&sc->mr_io.recovery_work);
+ disable_work(&sc->idle.immediate_work);
+ disable_delayed_work(&sc->idle.timer_work);
+
+ if (sc->first_error == 0)
+ sc->first_error = -ECONNABORTED;
+
+ switch (sc->status) {
+ case SMBDIRECT_SOCKET_NEGOTIATE_NEEDED:
+ case SMBDIRECT_SOCKET_NEGOTIATE_RUNNING:
+ case SMBDIRECT_SOCKET_NEGOTIATE_FAILED:
+ case SMBDIRECT_SOCKET_CONNECTED:
+ case SMBDIRECT_SOCKET_ERROR:
sc->status = SMBDIRECT_SOCKET_DISCONNECTING;
rdma_disconnect(sc->rdma.cm_id);
+ break;
+
+ case SMBDIRECT_SOCKET_CREATED:
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_NEEDED:
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING:
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_FAILED:
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_NEEDED:
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING:
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_FAILED:
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_NEEDED:
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING:
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_FAILED:
+ /*
+ * rdma_connect() never reached
+ * RDMA_CM_EVENT_ESTABLISHED
+ */
+ sc->status = SMBDIRECT_SOCKET_DISCONNECTED;
+ break;
+
+ case SMBDIRECT_SOCKET_DISCONNECTING:
+ case SMBDIRECT_SOCKET_DISCONNECTED:
+ case SMBDIRECT_SOCKET_DESTROYED:
+ break;
}
+
+ /*
+ * Wake up all waiters in all wait queues
+ * in order to notice the broken connection.
+ */
+ smbd_disconnect_wake_up_all(sc);
}
-static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
+static void smbd_disconnect_rdma_connection(struct smbdirect_socket *sc)
{
- queue_work(info->workqueue, &info->disconnect_work);
+ /*
+ * make sure other work (than disconnect_work) is
+ * not queued again but here we don't block and avoid
+ * disable[_delayed]_work_sync()
+ */
+ disable_work(&sc->recv_io.posted.refill_work);
+ disable_work(&sc->mr_io.recovery_work);
+ disable_work(&sc->idle.immediate_work);
+ disable_delayed_work(&sc->idle.timer_work);
+
+ if (sc->first_error == 0)
+ sc->first_error = -ECONNABORTED;
+
+ switch (sc->status) {
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_FAILED:
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_FAILED:
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_FAILED:
+ case SMBDIRECT_SOCKET_NEGOTIATE_FAILED:
+ case SMBDIRECT_SOCKET_ERROR:
+ case SMBDIRECT_SOCKET_DISCONNECTING:
+ case SMBDIRECT_SOCKET_DISCONNECTED:
+ case SMBDIRECT_SOCKET_DESTROYED:
+ /*
+ * Keep the current error status
+ */
+ break;
+
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_NEEDED:
+ case SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING:
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ADDR_FAILED;
+ break;
+
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_NEEDED:
+ case SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING:
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ROUTE_FAILED;
+ break;
+
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_NEEDED:
+ case SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING:
+ sc->status = SMBDIRECT_SOCKET_RDMA_CONNECT_FAILED;
+ break;
+
+ case SMBDIRECT_SOCKET_NEGOTIATE_NEEDED:
+ case SMBDIRECT_SOCKET_NEGOTIATE_RUNNING:
+ sc->status = SMBDIRECT_SOCKET_NEGOTIATE_FAILED;
+ break;
+
+ case SMBDIRECT_SOCKET_CREATED:
+ case SMBDIRECT_SOCKET_CONNECTED:
+ sc->status = SMBDIRECT_SOCKET_ERROR;
+ break;
+ }
+
+ /*
+ * Wake up all waiters in all wait queues
+ * in order to notice the broken connection.
+ */
+ smbd_disconnect_wake_up_all(sc);
+
+ queue_work(sc->workqueue, &sc->disconnect_work);
}
/* Upcall from RDMA CM */
static int smbd_conn_upcall(
struct rdma_cm_id *id, struct rdma_cm_event *event)
{
- struct smbd_connection *info = id->context;
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket *sc = id->context;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
const char *event_name = rdma_event_msg(event->event);
+ u8 peer_initiator_depth;
+ u8 peer_responder_resources;
log_rdma_event(INFO, "event=%s status=%d\n",
event_name, event->status);
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ROUTE_NEEDED;
+ wake_up(&sc->status_wait);
+ break;
+
case RDMA_CM_EVENT_ROUTE_RESOLVED:
- info->ri_rc = 0;
- complete(&info->ri_done);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_RDMA_CONNECT_NEEDED;
+ wake_up(&sc->status_wait);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
log_rdma_event(ERR, "connecting failed event=%s\n", event_name);
- info->ri_rc = -EHOSTUNREACH;
- complete(&info->ri_done);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ADDR_FAILED;
+ smbd_disconnect_rdma_work(&sc->disconnect_work);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
log_rdma_event(ERR, "connecting failed event=%s\n", event_name);
- info->ri_rc = -ENETUNREACH;
- complete(&info->ri_done);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ROUTE_FAILED;
+ smbd_disconnect_rdma_work(&sc->disconnect_work);
break;
case RDMA_CM_EVENT_ESTABLISHED:
log_rdma_event(INFO, "connected event=%s\n", event_name);
- sc->status = SMBDIRECT_SOCKET_CONNECTED;
- wake_up_interruptible(&info->status_wait);
+
+ /*
+ * Here we work around an inconsistency between
+ * iWarp and other devices (at least rxe and irdma using RoCEv2)
+ */
+ if (rdma_protocol_iwarp(id->device, id->port_num)) {
+ /*
+ * iWarp devices report the peer's values
+ * with the perspective of the peer here.
+ * Tested with siw and irdma (in iwarp mode)
+ * We need to change to our perspective here,
+ * so we need to switch the values.
+ */
+ peer_initiator_depth = event->param.conn.responder_resources;
+ peer_responder_resources = event->param.conn.initiator_depth;
+ } else {
+ /*
+ * Non iWarp devices report the peer's values
+ * already changed to our perspective here.
+ * Tested with rxe and irdma (in roce mode).
+ */
+ peer_initiator_depth = event->param.conn.initiator_depth;
+ peer_responder_resources = event->param.conn.responder_resources;
+ }
+ if (rdma_protocol_iwarp(id->device, id->port_num) &&
+ event->param.conn.private_data_len == 8) {
+ /*
+ * Legacy clients with only iWarp MPA v1 support
+ * need a private blob in order to negotiate
+ * the IRD/ORD values.
+ */
+ const __be32 *ird_ord_hdr = event->param.conn.private_data;
+ u32 ird32 = be32_to_cpu(ird_ord_hdr[0]);
+ u32 ord32 = be32_to_cpu(ird_ord_hdr[1]);
+
+ /*
+ * cifs.ko sends the legacy IRD/ORD negotiation
+ * event if iWarp MPA v2 was used.
+ *
+ * Here we check that the values match and only
+ * mark the client as legacy if they don't match.
+ */
+ if ((u32)event->param.conn.initiator_depth != ird32 ||
+ (u32)event->param.conn.responder_resources != ord32) {
+ /*
+ * There are broken clients (old cifs.ko)
+ * using little endian and also
+ * struct rdma_conn_param only uses u8
+ * for initiator_depth and responder_resources,
+ * so we truncate the value to U8_MAX.
+ *
+ * smb_direct_accept_client() will then
+ * do the real negotiation in order to
+ * select the minimum between client and
+ * server.
+ */
+ ird32 = min_t(u32, ird32, U8_MAX);
+ ord32 = min_t(u32, ord32, U8_MAX);
+
+ sc->rdma.legacy_iwarp = true;
+ peer_initiator_depth = (u8)ird32;
+ peer_responder_resources = (u8)ord32;
+ }
+ }
+
+ /*
+ * negotiate the value by using the minimum
+ * between client and server if the client provided
+ * non 0 values.
+ */
+ if (peer_initiator_depth != 0)
+ sp->initiator_depth =
+ min_t(u8, sp->initiator_depth,
+ peer_initiator_depth);
+ if (peer_responder_resources != 0)
+ sp->responder_resources =
+ min_t(u8, sp->responder_resources,
+ peer_responder_resources);
+
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_NEGOTIATE_NEEDED;
+ wake_up(&sc->status_wait);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
case RDMA_CM_EVENT_UNREACHABLE:
case RDMA_CM_EVENT_REJECTED:
log_rdma_event(ERR, "connecting failed event=%s\n", event_name);
- sc->status = SMBDIRECT_SOCKET_DISCONNECTED;
- wake_up_interruptible(&info->status_wait);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING);
+ sc->status = SMBDIRECT_SOCKET_RDMA_CONNECT_FAILED;
+ smbd_disconnect_rdma_work(&sc->disconnect_work);
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -221,15 +443,10 @@ static int smbd_conn_upcall(
/* This happens when we fail the negotiation */
if (sc->status == SMBDIRECT_SOCKET_NEGOTIATE_FAILED) {
log_rdma_event(ERR, "event=%s during negotiation\n", event_name);
- sc->status = SMBDIRECT_SOCKET_DISCONNECTED;
- wake_up(&info->status_wait);
- break;
}
sc->status = SMBDIRECT_SOCKET_DISCONNECTED;
- wake_up_interruptible(&info->status_wait);
- wake_up_interruptible(&sc->recv_io.reassembly.wait_queue);
- wake_up_interruptible_all(&info->wait_send_queue);
+ smbd_disconnect_rdma_work(&sc->disconnect_work);
break;
default:
@@ -245,15 +462,15 @@ static int smbd_conn_upcall(
static void
smbd_qp_async_error_upcall(struct ib_event *event, void *context)
{
- struct smbd_connection *info = context;
+ struct smbdirect_socket *sc = context;
- log_rdma_event(ERR, "%s on device %s info %p\n",
- ib_event_msg(event->event), event->device->name, info);
+ log_rdma_event(ERR, "%s on device %s socket %p\n",
+ ib_event_msg(event->event), event->device->name, sc);
switch (event->event) {
case IB_EVENT_CQ_ERR:
case IB_EVENT_QP_FATAL:
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
break;
default:
@@ -278,11 +495,9 @@ static void send_done(struct ib_cq *cq, struct ib_wc *wc)
struct smbdirect_send_io *request =
container_of(wc->wr_cqe, struct smbdirect_send_io, cqe);
struct smbdirect_socket *sc = request->socket;
- struct smbd_connection *info =
- container_of(sc, struct smbd_connection, socket);
- log_rdma_send(INFO, "smbdirect_send_io 0x%p completed wc->status=%d\n",
- request, wc->status);
+ log_rdma_send(INFO, "smbdirect_send_io 0x%p completed wc->status=%s\n",
+ request, ib_wc_status_msg(wc->status));
for (i = 0; i < request->num_sge; i++)
ib_dma_unmap_single(sc->ib.dev,
@@ -291,17 +506,18 @@ static void send_done(struct ib_cq *cq, struct ib_wc *wc)
DMA_TO_DEVICE);
if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
- log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
- wc->status, wc->opcode);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ log_rdma_send(ERR, "wc->status=%s wc->opcode=%d\n",
+ ib_wc_status_msg(wc->status), wc->opcode);
mempool_free(request, sc->send_io.mem.pool);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
return;
}
- if (atomic_dec_and_test(&info->send_pending))
- wake_up(&info->wait_send_pending);
+ if (atomic_dec_and_test(&sc->send_io.pending.count))
+ wake_up(&sc->send_io.pending.zero_wait_queue);
- wake_up(&info->wait_post_send);
+ wake_up(&sc->send_io.pending.dec_wait_queue);
mempool_free(request, sc->send_io.mem.pool);
}
@@ -325,8 +541,6 @@ static bool process_negotiation_response(
struct smbdirect_recv_io *response, int packet_length)
{
struct smbdirect_socket *sc = response->socket;
- struct smbd_connection *info =
- container_of(sc, struct smbd_connection, socket);
struct smbdirect_socket_parameters *sp = &sc->parameters;
struct smbdirect_negotiate_resp *packet = smbdirect_recv_io_payload(response);
@@ -341,21 +555,19 @@ static bool process_negotiation_response(
le16_to_cpu(packet->negotiated_version));
return false;
}
- info->protocol = le16_to_cpu(packet->negotiated_version);
if (packet->credits_requested == 0) {
log_rdma_event(ERR, "error: credits_requested==0\n");
return false;
}
- info->receive_credit_target = le16_to_cpu(packet->credits_requested);
+ sc->recv_io.credits.target = le16_to_cpu(packet->credits_requested);
+ sc->recv_io.credits.target = min_t(u16, sc->recv_io.credits.target, sp->recv_credit_max);
if (packet->credits_granted == 0) {
log_rdma_event(ERR, "error: credits_granted==0\n");
return false;
}
- atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
-
- atomic_set(&info->receive_credits, 0);
+ atomic_set(&sc->send_io.credits.count, le16_to_cpu(packet->credits_granted));
if (le32_to_cpu(packet->preferred_send_size) > sp->max_recv_size) {
log_rdma_event(ERR, "error: preferred_send_size=%d\n",
@@ -380,16 +592,12 @@ static bool process_negotiation_response(
}
sp->max_fragmented_send_size =
le32_to_cpu(packet->max_fragmented_size);
- info->rdma_readwrite_threshold =
- rdma_readwrite_threshold > sp->max_fragmented_send_size ?
- sp->max_fragmented_send_size :
- rdma_readwrite_threshold;
sp->max_read_write_size = min_t(u32,
le32_to_cpu(packet->max_readwrite_size),
- info->max_frmr_depth * PAGE_SIZE);
- info->max_frmr_depth = sp->max_read_write_size / PAGE_SIZE;
+ sp->max_frmr_depth * PAGE_SIZE);
+ sp->max_frmr_depth = sp->max_read_write_size / PAGE_SIZE;
sc->recv_io.expected = SMBDIRECT_EXPECT_DATA_TRANSFER;
return true;
@@ -397,52 +605,40 @@ static bool process_negotiation_response(
static void smbd_post_send_credits(struct work_struct *work)
{
- int ret = 0;
int rc;
struct smbdirect_recv_io *response;
- struct smbd_connection *info =
- container_of(work, struct smbd_connection,
- post_send_credits_work);
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket *sc =
+ container_of(work, struct smbdirect_socket, recv_io.posted.refill_work);
if (sc->status != SMBDIRECT_SOCKET_CONNECTED) {
- wake_up(&info->wait_receive_queues);
return;
}
- if (info->receive_credit_target >
- atomic_read(&info->receive_credits)) {
+ if (sc->recv_io.credits.target >
+ atomic_read(&sc->recv_io.credits.count)) {
while (true) {
- response = get_receive_buffer(info);
+ response = get_receive_buffer(sc);
if (!response)
break;
response->first_segment = false;
- rc = smbd_post_recv(info, response);
+ rc = smbd_post_recv(sc, response);
if (rc) {
log_rdma_recv(ERR,
"post_recv failed rc=%d\n", rc);
- put_receive_buffer(info, response);
+ put_receive_buffer(sc, response);
break;
}
- ret++;
+ atomic_inc(&sc->recv_io.posted.count);
}
}
- spin_lock(&info->lock_new_credits_offered);
- info->new_credits_offered += ret;
- spin_unlock(&info->lock_new_credits_offered);
-
/* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
- info->send_immediate = true;
- if (atomic_read(&info->receive_credits) <
- info->receive_credit_target - 1) {
- if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
- info->send_immediate) {
- log_keep_alive(INFO, "send an empty message\n");
- smbd_post_send_empty(info);
- }
+ if (atomic_read(&sc->recv_io.credits.count) <
+ sc->recv_io.credits.target - 1) {
+ log_keep_alive(INFO, "schedule send of an empty message\n");
+ queue_work(sc->workqueue, &sc->idle.immediate_work);
}
}
@@ -454,19 +650,22 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
container_of(wc->wr_cqe, struct smbdirect_recv_io, cqe);
struct smbdirect_socket *sc = response->socket;
struct smbdirect_socket_parameters *sp = &sc->parameters;
- struct smbd_connection *info =
- container_of(sc, struct smbd_connection, socket);
+ u16 old_recv_credit_target;
u32 data_offset = 0;
u32 data_length = 0;
u32 remaining_data_length = 0;
+ bool negotiate_done = false;
- log_rdma_recv(INFO, "response=0x%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%u\n",
- response, sc->recv_io.expected, wc->status, wc->opcode,
+ log_rdma_recv(INFO,
+ "response=0x%p type=%d wc status=%s wc opcode %d byte_len=%d pkey_index=%u\n",
+ response, sc->recv_io.expected,
+ ib_wc_status_msg(wc->status), wc->opcode,
wc->byte_len, wc->pkey_index);
if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
- log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
- wc->status, wc->opcode);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ log_rdma_recv(ERR, "wc->status=%s opcode=%d\n",
+ ib_wc_status_msg(wc->status), wc->opcode);
goto error;
}
@@ -476,15 +675,31 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
response->sge.length,
DMA_FROM_DEVICE);
+ /*
+ * Reset timer to the keepalive interval in
+ * order to trigger our next keepalive message.
+ */
+ sc->idle.keepalive = SMBDIRECT_KEEPALIVE_NONE;
+ mod_delayed_work(sc->workqueue, &sc->idle.timer_work,
+ msecs_to_jiffies(sp->keepalive_interval_msec));
+
switch (sc->recv_io.expected) {
/* SMBD negotiation response */
case SMBDIRECT_EXPECT_NEGOTIATE_REP:
dump_smbdirect_negotiate_resp(smbdirect_recv_io_payload(response));
sc->recv_io.reassembly.full_packet_received = true;
- info->negotiate_done =
+ negotiate_done =
process_negotiation_response(response, wc->byte_len);
- put_receive_buffer(info, response);
- complete(&info->negotiate_completion);
+ put_receive_buffer(sc, response);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_NEGOTIATE_RUNNING);
+ if (!negotiate_done) {
+ sc->status = SMBDIRECT_SOCKET_NEGOTIATE_FAILED;
+ smbd_disconnect_rdma_connection(sc);
+ } else {
+ sc->status = SMBDIRECT_SOCKET_CONNECTED;
+ wake_up(&sc->status_wait);
+ }
+
return;
/* SMBD data transfer packet */
@@ -517,17 +732,23 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
sc->recv_io.reassembly.full_packet_received = true;
}
- atomic_dec(&info->receive_credits);
- info->receive_credit_target =
+ atomic_dec(&sc->recv_io.posted.count);
+ atomic_dec(&sc->recv_io.credits.count);
+ old_recv_credit_target = sc->recv_io.credits.target;
+ sc->recv_io.credits.target =
le16_to_cpu(data_transfer->credits_requested);
+ sc->recv_io.credits.target =
+ min_t(u16, sc->recv_io.credits.target, sp->recv_credit_max);
+ sc->recv_io.credits.target =
+ max_t(u16, sc->recv_io.credits.target, 1);
if (le16_to_cpu(data_transfer->credits_granted)) {
atomic_add(le16_to_cpu(data_transfer->credits_granted),
- &info->send_credits);
+ &sc->send_io.credits.count);
/*
* We have new send credits granted from remote peer
* If any sender is waiting for credits, unblock it
*/
- wake_up_interruptible(&info->wait_send_queue);
+ wake_up(&sc->send_io.credits.wait_queue);
}
log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
@@ -536,11 +757,11 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
le32_to_cpu(data_transfer->data_length),
le32_to_cpu(data_transfer->remaining_data_length));
- /* Send a KEEP_ALIVE response right away if requested */
- info->keep_alive_requested = KEEP_ALIVE_NONE;
+ /* Send an immediate response right away if requested */
if (le16_to_cpu(data_transfer->flags) &
SMBDIRECT_FLAG_RESPONSE_REQUESTED) {
- info->keep_alive_requested = KEEP_ALIVE_PENDING;
+ log_keep_alive(INFO, "schedule send of immediate response\n");
+ queue_work(sc->workqueue, &sc->idle.immediate_work);
}
/*
@@ -548,10 +769,13 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
* reassembly queue and wake up the reading thread
*/
if (data_length) {
- enqueue_reassembly(info, response, data_length);
- wake_up_interruptible(&sc->recv_io.reassembly.wait_queue);
+ if (sc->recv_io.credits.target > old_recv_credit_target)
+ queue_work(sc->workqueue, &sc->recv_io.posted.refill_work);
+
+ enqueue_reassembly(sc, response, data_length);
+ wake_up(&sc->recv_io.reassembly.wait_queue);
} else
- put_receive_buffer(info, response);
+ put_receive_buffer(sc, response);
return;
@@ -566,19 +790,20 @@ static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
log_rdma_recv(ERR, "unexpected response type=%d\n", sc->recv_io.expected);
WARN_ON_ONCE(sc->recv_io.expected != SMBDIRECT_EXPECT_DATA_TRANSFER);
error:
- put_receive_buffer(info, response);
- smbd_disconnect_rdma_connection(info);
+ put_receive_buffer(sc, response);
+ smbd_disconnect_rdma_connection(sc);
}
static struct rdma_cm_id *smbd_create_id(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct sockaddr *dstaddr, int port)
{
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
struct rdma_cm_id *id;
int rc;
__be16 *sport;
- id = rdma_create_id(&init_net, smbd_conn_upcall, info,
+ id = rdma_create_id(&init_net, smbd_conn_upcall, sc,
RDMA_PS_TCP, IB_QPT_RC);
if (IS_ERR(id)) {
rc = PTR_ERR(id);
@@ -593,43 +818,57 @@ static struct rdma_cm_id *smbd_create_id(
*sport = htons(port);
- init_completion(&info->ri_done);
- info->ri_rc = -ETIMEDOUT;
-
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ADDR_NEEDED);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING;
rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
- RDMA_RESOLVE_TIMEOUT);
+ sp->resolve_addr_timeout_msec);
if (rc) {
log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
goto out;
}
- rc = wait_for_completion_interruptible_timeout(
- &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
+ rc = wait_event_interruptible_timeout(
+ sc->status_wait,
+ sc->status != SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING,
+ msecs_to_jiffies(sp->resolve_addr_timeout_msec));
/* e.g. if interrupted returns -ERESTARTSYS */
if (rc < 0) {
log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
goto out;
}
- rc = info->ri_rc;
- if (rc) {
+ if (sc->status == SMBDIRECT_SOCKET_RESOLVE_ADDR_RUNNING) {
+ rc = -ETIMEDOUT;
+ log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
+ goto out;
+ }
+ if (sc->status != SMBDIRECT_SOCKET_RESOLVE_ROUTE_NEEDED) {
+ rc = -EHOSTUNREACH;
log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
goto out;
}
- info->ri_rc = -ETIMEDOUT;
- rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RESOLVE_ROUTE_NEEDED);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING;
+ rc = rdma_resolve_route(id, sp->resolve_route_timeout_msec);
if (rc) {
log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
goto out;
}
- rc = wait_for_completion_interruptible_timeout(
- &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
+ rc = wait_event_interruptible_timeout(
+ sc->status_wait,
+ sc->status != SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING,
+ msecs_to_jiffies(sp->resolve_route_timeout_msec));
/* e.g. if interrupted returns -ERESTARTSYS */
if (rc < 0) {
log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
goto out;
}
- rc = info->ri_rc;
- if (rc) {
+ if (sc->status == SMBDIRECT_SOCKET_RESOLVE_ROUTE_RUNNING) {
+ rc = -ETIMEDOUT;
+ log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
+ goto out;
+ }
+ if (sc->status != SMBDIRECT_SOCKET_RDMA_CONNECT_NEEDED) {
+ rc = -ENETUNREACH;
log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
goto out;
}
@@ -656,13 +895,16 @@ static bool frwr_is_supported(struct ib_device_attr *attrs)
}
static int smbd_ia_open(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct sockaddr *dstaddr, int port)
{
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
int rc;
- sc->rdma.cm_id = smbd_create_id(info, dstaddr, port);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_CREATED);
+ sc->status = SMBDIRECT_SOCKET_RESOLVE_ADDR_NEEDED;
+
+ sc->rdma.cm_id = smbd_create_id(sc, dstaddr, port);
if (IS_ERR(sc->rdma.cm_id)) {
rc = PTR_ERR(sc->rdma.cm_id);
goto out1;
@@ -677,19 +919,12 @@ static int smbd_ia_open(
rc = -EPROTONOSUPPORT;
goto out2;
}
- info->max_frmr_depth = min_t(int,
- smbd_max_frmr_depth,
+ sp->max_frmr_depth = min_t(u32,
+ sp->max_frmr_depth,
sc->ib.dev->attrs.max_fast_reg_page_list_len);
- info->mr_type = IB_MR_TYPE_MEM_REG;
+ sc->mr_io.type = IB_MR_TYPE_MEM_REG;
if (sc->ib.dev->attrs.kernel_cap_flags & IBK_SG_GAPS_REG)
- info->mr_type = IB_MR_TYPE_SG_GAPS;
-
- sc->ib.pd = ib_alloc_pd(sc->ib.dev, 0);
- if (IS_ERR(sc->ib.pd)) {
- rc = PTR_ERR(sc->ib.pd);
- log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
- goto out2;
- }
+ sc->mr_io.type = IB_MR_TYPE_SG_GAPS;
return 0;
@@ -707,9 +942,8 @@ out1:
* After negotiation, the transport is connected and ready for
* carrying upper layer SMB payload
*/
-static int smbd_post_send_negotiate_req(struct smbd_connection *info)
+static int smbd_post_send_negotiate_req(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_socket_parameters *sp = &sc->parameters;
struct ib_send_wr send_wr;
int rc = -ENOMEM;
@@ -761,18 +995,18 @@ static int smbd_post_send_negotiate_req(struct smbd_connection *info)
request->sge[0].addr,
request->sge[0].length, request->sge[0].lkey);
- atomic_inc(&info->send_pending);
+ atomic_inc(&sc->send_io.pending.count);
rc = ib_post_send(sc->ib.qp, &send_wr, NULL);
if (!rc)
return 0;
/* if we reach here, post send failed */
log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
- atomic_dec(&info->send_pending);
+ atomic_dec(&sc->send_io.pending.count);
ib_dma_unmap_single(sc->ib.dev, request->sge[0].addr,
request->sge[0].length, DMA_TO_DEVICE);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
dma_mapping_failed:
mempool_free(request, sc->send_io.mem.pool);
@@ -787,14 +1021,20 @@ dma_mapping_failed:
* buffer as possible, and extend the receive credits to remote peer
* return value: the new credtis being granted.
*/
-static int manage_credits_prior_sending(struct smbd_connection *info)
+static int manage_credits_prior_sending(struct smbdirect_socket *sc)
{
int new_credits;
- spin_lock(&info->lock_new_credits_offered);
- new_credits = info->new_credits_offered;
- info->new_credits_offered = 0;
- spin_unlock(&info->lock_new_credits_offered);
+ if (atomic_read(&sc->recv_io.credits.count) >= sc->recv_io.credits.target)
+ return 0;
+
+ new_credits = atomic_read(&sc->recv_io.posted.count);
+ if (new_credits == 0)
+ return 0;
+
+ new_credits -= atomic_read(&sc->recv_io.credits.count);
+ if (new_credits <= 0)
+ return 0;
return new_credits;
}
@@ -808,21 +1048,27 @@ static int manage_credits_prior_sending(struct smbd_connection *info)
* 1 if SMBDIRECT_FLAG_RESPONSE_REQUESTED needs to be set
* 0: otherwise
*/
-static int manage_keep_alive_before_sending(struct smbd_connection *info)
+static int manage_keep_alive_before_sending(struct smbdirect_socket *sc)
{
- if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
- info->keep_alive_requested = KEEP_ALIVE_SENT;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
+
+ if (sc->idle.keepalive == SMBDIRECT_KEEPALIVE_PENDING) {
+ sc->idle.keepalive = SMBDIRECT_KEEPALIVE_SENT;
+ /*
+ * Now use the keepalive timeout (instead of keepalive interval)
+ * in order to wait for a response
+ */
+ mod_delayed_work(sc->workqueue, &sc->idle.timer_work,
+ msecs_to_jiffies(sp->keepalive_timeout_msec));
return 1;
}
return 0;
}
/* Post the send request */
-static int smbd_post_send(struct smbd_connection *info,
+static int smbd_post_send(struct smbdirect_socket *sc,
struct smbdirect_send_io *request)
{
- struct smbdirect_socket *sc = &info->socket;
- struct smbdirect_socket_parameters *sp = &sc->parameters;
struct ib_send_wr send_wr;
int rc, i;
@@ -849,21 +1095,17 @@ static int smbd_post_send(struct smbd_connection *info,
rc = ib_post_send(sc->ib.qp, &send_wr, NULL);
if (rc) {
log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
rc = -EAGAIN;
- } else
- /* Reset timer for idle connection after packet is sent */
- mod_delayed_work(info->workqueue, &info->idle_timer_work,
- msecs_to_jiffies(sp->keepalive_interval_msec));
+ }
return rc;
}
-static int smbd_post_send_iter(struct smbd_connection *info,
+static int smbd_post_send_iter(struct smbdirect_socket *sc,
struct iov_iter *iter,
int *_remaining_data_length)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_socket_parameters *sp = &sc->parameters;
int i, rc;
int header_length;
@@ -874,8 +1116,8 @@ static int smbd_post_send_iter(struct smbd_connection *info,
wait_credit:
/* Wait for send credits. A SMBD packet needs one credit */
- rc = wait_event_interruptible(info->wait_send_queue,
- atomic_read(&info->send_credits) > 0 ||
+ rc = wait_event_interruptible(sc->send_io.credits.wait_queue,
+ atomic_read(&sc->send_io.credits.count) > 0 ||
sc->status != SMBDIRECT_SOCKET_CONNECTED);
if (rc)
goto err_wait_credit;
@@ -885,14 +1127,14 @@ wait_credit:
rc = -EAGAIN;
goto err_wait_credit;
}
- if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
- atomic_inc(&info->send_credits);
+ if (unlikely(atomic_dec_return(&sc->send_io.credits.count) < 0)) {
+ atomic_inc(&sc->send_io.credits.count);
goto wait_credit;
}
wait_send_queue:
- wait_event(info->wait_post_send,
- atomic_read(&info->send_pending) < sp->send_credit_target ||
+ wait_event(sc->send_io.pending.dec_wait_queue,
+ atomic_read(&sc->send_io.pending.count) < sp->send_credit_target ||
sc->status != SMBDIRECT_SOCKET_CONNECTED);
if (sc->status != SMBDIRECT_SOCKET_CONNECTED) {
@@ -901,9 +1143,9 @@ wait_send_queue:
goto err_wait_send_queue;
}
- if (unlikely(atomic_inc_return(&info->send_pending) >
+ if (unlikely(atomic_inc_return(&sc->send_io.pending.count) >
sp->send_credit_target)) {
- atomic_dec(&info->send_pending);
+ atomic_dec(&sc->send_io.pending.count);
goto wait_send_queue;
}
@@ -916,10 +1158,30 @@ wait_send_queue:
request->socket = sc;
memset(request->sge, 0, sizeof(request->sge));
+ /* Map the packet to DMA */
+ header_length = sizeof(struct smbdirect_data_transfer);
+ /* If this is a packet without payload, don't send padding */
+ if (!iter)
+ header_length = offsetof(struct smbdirect_data_transfer, padding);
+
+ packet = smbdirect_send_io_payload(request);
+ request->sge[0].addr = ib_dma_map_single(sc->ib.dev,
+ (void *)packet,
+ header_length,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(sc->ib.dev, request->sge[0].addr)) {
+ rc = -EIO;
+ goto err_dma;
+ }
+
+ request->sge[0].length = header_length;
+ request->sge[0].lkey = sc->ib.pd->local_dma_lkey;
+ request->num_sge = 1;
+
/* Fill in the data payload to find out how much data we can add */
if (iter) {
struct smb_extract_to_rdma extract = {
- .nr_sge = 1,
+ .nr_sge = request->num_sge,
.max_sge = SMBDIRECT_SEND_IO_MAX_SGE,
.sge = request->sge,
.device = sc->ib.dev,
@@ -938,21 +1200,17 @@ wait_send_queue:
*_remaining_data_length -= data_length;
} else {
data_length = 0;
- request->num_sge = 1;
}
/* Fill in the packet header */
- packet = smbdirect_send_io_payload(request);
packet->credits_requested = cpu_to_le16(sp->send_credit_target);
- new_credits = manage_credits_prior_sending(info);
- atomic_add(new_credits, &info->receive_credits);
+ new_credits = manage_credits_prior_sending(sc);
+ atomic_add(new_credits, &sc->recv_io.credits.count);
packet->credits_granted = cpu_to_le16(new_credits);
- info->send_immediate = false;
-
packet->flags = 0;
- if (manage_keep_alive_before_sending(info))
+ if (manage_keep_alive_before_sending(sc))
packet->flags |= cpu_to_le16(SMBDIRECT_FLAG_RESPONSE_REQUESTED);
packet->reserved = 0;
@@ -971,26 +1229,7 @@ wait_send_queue:
le32_to_cpu(packet->data_length),
le32_to_cpu(packet->remaining_data_length));
- /* Map the packet to DMA */
- header_length = sizeof(struct smbdirect_data_transfer);
- /* If this is a packet without payload, don't send padding */
- if (!data_length)
- header_length = offsetof(struct smbdirect_data_transfer, padding);
-
- request->sge[0].addr = ib_dma_map_single(sc->ib.dev,
- (void *)packet,
- header_length,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(sc->ib.dev, request->sge[0].addr)) {
- rc = -EIO;
- request->sge[0].addr = 0;
- goto err_dma;
- }
-
- request->sge[0].length = header_length;
- request->sge[0].lkey = sc->ib.pd->local_dma_lkey;
-
- rc = smbd_post_send(info, request);
+ rc = smbd_post_send(sc, request);
if (!rc)
return 0;
@@ -1003,19 +1242,16 @@ err_dma:
DMA_TO_DEVICE);
mempool_free(request, sc->send_io.mem.pool);
- /* roll back receive credits and credits to be offered */
- spin_lock(&info->lock_new_credits_offered);
- info->new_credits_offered += new_credits;
- spin_unlock(&info->lock_new_credits_offered);
- atomic_sub(new_credits, &info->receive_credits);
+ /* roll back the granted receive credits */
+ atomic_sub(new_credits, &sc->recv_io.credits.count);
err_alloc:
- if (atomic_dec_and_test(&info->send_pending))
- wake_up(&info->wait_send_pending);
+ if (atomic_dec_and_test(&sc->send_io.pending.count))
+ wake_up(&sc->send_io.pending.zero_wait_queue);
err_wait_send_queue:
/* roll back send credits and pending */
- atomic_inc(&info->send_credits);
+ atomic_inc(&sc->send_io.credits.count);
err_wait_credit:
return rc;
@@ -1026,15 +1262,15 @@ err_wait_credit:
* Empty message is used to extend credits to peer to for keep live
* while there is no upper layer payload to send at the time
*/
-static int smbd_post_send_empty(struct smbd_connection *info)
+static int smbd_post_send_empty(struct smbdirect_socket *sc)
{
int remaining_data_length = 0;
- info->count_send_empty++;
- return smbd_post_send_iter(info, NULL, &remaining_data_length);
+ sc->statistics.send_empty++;
+ return smbd_post_send_iter(sc, NULL, &remaining_data_length);
}
-static int smbd_post_send_full_iter(struct smbd_connection *info,
+static int smbd_post_send_full_iter(struct smbdirect_socket *sc,
struct iov_iter *iter,
int *_remaining_data_length)
{
@@ -1047,7 +1283,7 @@ static int smbd_post_send_full_iter(struct smbd_connection *info,
*/
while (iov_iter_count(iter) > 0) {
- rc = smbd_post_send_iter(info, iter, _remaining_data_length);
+ rc = smbd_post_send_iter(sc, iter, _remaining_data_length);
if (rc < 0)
break;
}
@@ -1061,9 +1297,8 @@ static int smbd_post_send_full_iter(struct smbd_connection *info,
* The interaction is controlled by send/receive credit system
*/
static int smbd_post_recv(
- struct smbd_connection *info, struct smbdirect_recv_io *response)
+ struct smbdirect_socket *sc, struct smbdirect_recv_io *response)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_socket_parameters *sp = &sc->parameters;
struct ib_recv_wr recv_wr;
int rc = -EIO;
@@ -1089,7 +1324,7 @@ static int smbd_post_recv(
ib_dma_unmap_single(sc->ib.dev, response->sge.addr,
response->sge.length, DMA_FROM_DEVICE);
response->sge.length = 0;
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
}
@@ -1097,33 +1332,36 @@ static int smbd_post_recv(
}
/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
-static int smbd_negotiate(struct smbd_connection *info)
+static int smbd_negotiate(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
int rc;
- struct smbdirect_recv_io *response = get_receive_buffer(info);
+ struct smbdirect_recv_io *response = get_receive_buffer(sc);
+
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_NEGOTIATE_NEEDED);
+ sc->status = SMBDIRECT_SOCKET_NEGOTIATE_RUNNING;
sc->recv_io.expected = SMBDIRECT_EXPECT_NEGOTIATE_REP;
- rc = smbd_post_recv(info, response);
+ rc = smbd_post_recv(sc, response);
log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=0x%llx iov.length=%u iov.lkey=0x%x\n",
rc, response->sge.addr,
response->sge.length, response->sge.lkey);
if (rc) {
- put_receive_buffer(info, response);
+ put_receive_buffer(sc, response);
return rc;
}
- init_completion(&info->negotiate_completion);
- info->negotiate_done = false;
- rc = smbd_post_send_negotiate_req(info);
+ rc = smbd_post_send_negotiate_req(sc);
if (rc)
return rc;
- rc = wait_for_completion_interruptible_timeout(
- &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
- log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
+ rc = wait_event_interruptible_timeout(
+ sc->status_wait,
+ sc->status != SMBDIRECT_SOCKET_NEGOTIATE_RUNNING,
+ msecs_to_jiffies(sp->negotiate_timeout_msec));
+ log_rdma_event(INFO, "wait_event_interruptible_timeout rc=%d\n", rc);
- if (info->negotiate_done)
+ if (sc->status == SMBDIRECT_SOCKET_CONNECTED)
return 0;
if (rc == 0)
@@ -1147,13 +1385,13 @@ static int smbd_negotiate(struct smbd_connection *info)
* data_length: the size of payload in this packet
*/
static void enqueue_reassembly(
- struct smbd_connection *info,
+ struct smbdirect_socket *sc,
struct smbdirect_recv_io *response,
int data_length)
{
- struct smbdirect_socket *sc = &info->socket;
+ unsigned long flags;
- spin_lock(&sc->recv_io.reassembly.lock);
+ spin_lock_irqsave(&sc->recv_io.reassembly.lock, flags);
list_add_tail(&response->list, &sc->recv_io.reassembly.list);
sc->recv_io.reassembly.queue_length++;
/*
@@ -1164,9 +1402,8 @@ static void enqueue_reassembly(
*/
virt_wmb();
sc->recv_io.reassembly.data_length += data_length;
- spin_unlock(&sc->recv_io.reassembly.lock);
- info->count_reassembly_queue++;
- info->count_enqueue_reassembly_queue++;
+ spin_unlock_irqrestore(&sc->recv_io.reassembly.lock, flags);
+ sc->statistics.enqueue_reassembly_queue++;
}
/*
@@ -1174,9 +1411,8 @@ static void enqueue_reassembly(
* Caller is responsible for locking
* return value: the first entry if any, NULL if queue is empty
*/
-static struct smbdirect_recv_io *_get_first_reassembly(struct smbd_connection *info)
+static struct smbdirect_recv_io *_get_first_reassembly(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_recv_io *ret = NULL;
if (!list_empty(&sc->recv_io.reassembly.list)) {
@@ -1193,9 +1429,8 @@ static struct smbdirect_recv_io *_get_first_reassembly(struct smbd_connection *i
* pre-allocated in advance.
* return value: the receive buffer, NULL if none is available
*/
-static struct smbdirect_recv_io *get_receive_buffer(struct smbd_connection *info)
+static struct smbdirect_recv_io *get_receive_buffer(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_recv_io *ret = NULL;
unsigned long flags;
@@ -1205,8 +1440,7 @@ static struct smbdirect_recv_io *get_receive_buffer(struct smbd_connection *info
&sc->recv_io.free.list,
struct smbdirect_recv_io, list);
list_del(&ret->list);
- info->count_receive_queue--;
- info->count_get_receive_buffer++;
+ sc->statistics.get_receive_buffer++;
}
spin_unlock_irqrestore(&sc->recv_io.free.lock, flags);
@@ -1220,9 +1454,8 @@ static struct smbdirect_recv_io *get_receive_buffer(struct smbd_connection *info
* receive buffer is returned.
*/
static void put_receive_buffer(
- struct smbd_connection *info, struct smbdirect_recv_io *response)
+ struct smbdirect_socket *sc, struct smbdirect_recv_io *response)
{
- struct smbdirect_socket *sc = &info->socket;
unsigned long flags;
if (likely(response->sge.length != 0)) {
@@ -1235,31 +1468,18 @@ static void put_receive_buffer(
spin_lock_irqsave(&sc->recv_io.free.lock, flags);
list_add_tail(&response->list, &sc->recv_io.free.list);
- info->count_receive_queue++;
- info->count_put_receive_buffer++;
+ sc->statistics.put_receive_buffer++;
spin_unlock_irqrestore(&sc->recv_io.free.lock, flags);
- queue_work(info->workqueue, &info->post_send_credits_work);
+ queue_work(sc->workqueue, &sc->recv_io.posted.refill_work);
}
/* Preallocate all receive buffer on transport establishment */
-static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
+static int allocate_receive_buffers(struct smbdirect_socket *sc, int num_buf)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_recv_io *response;
int i;
- INIT_LIST_HEAD(&sc->recv_io.reassembly.list);
- spin_lock_init(&sc->recv_io.reassembly.lock);
- sc->recv_io.reassembly.data_length = 0;
- sc->recv_io.reassembly.queue_length = 0;
-
- INIT_LIST_HEAD(&sc->recv_io.free.list);
- spin_lock_init(&sc->recv_io.free.lock);
- info->count_receive_queue = 0;
-
- init_waitqueue_head(&info->wait_receive_queues);
-
for (i = 0; i < num_buf; i++) {
response = mempool_alloc(sc->recv_io.mem.pool, GFP_KERNEL);
if (!response)
@@ -1268,7 +1488,6 @@ static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
response->socket = sc;
response->sge.length = 0;
list_add_tail(&response->list, &sc->recv_io.free.list);
- info->count_receive_queue++;
}
return 0;
@@ -1279,45 +1498,59 @@ allocate_failed:
&sc->recv_io.free.list,
struct smbdirect_recv_io, list);
list_del(&response->list);
- info->count_receive_queue--;
mempool_free(response, sc->recv_io.mem.pool);
}
return -ENOMEM;
}
-static void destroy_receive_buffers(struct smbd_connection *info)
+static void destroy_receive_buffers(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_recv_io *response;
- while ((response = get_receive_buffer(info)))
+ while ((response = get_receive_buffer(sc)))
mempool_free(response, sc->recv_io.mem.pool);
}
+static void send_immediate_empty_message(struct work_struct *work)
+{
+ struct smbdirect_socket *sc =
+ container_of(work, struct smbdirect_socket, idle.immediate_work);
+
+ if (sc->status != SMBDIRECT_SOCKET_CONNECTED)
+ return;
+
+ log_keep_alive(INFO, "send an empty message\n");
+ smbd_post_send_empty(sc);
+}
+
/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
static void idle_connection_timer(struct work_struct *work)
{
- struct smbd_connection *info = container_of(
- work, struct smbd_connection,
- idle_timer_work.work);
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket *sc =
+ container_of(work, struct smbdirect_socket, idle.timer_work.work);
struct smbdirect_socket_parameters *sp = &sc->parameters;
- if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
+ if (sc->idle.keepalive != SMBDIRECT_KEEPALIVE_NONE) {
log_keep_alive(ERR,
- "error status info->keep_alive_requested=%d\n",
- info->keep_alive_requested);
- smbd_disconnect_rdma_connection(info);
+ "error status sc->idle.keepalive=%d\n",
+ sc->idle.keepalive);
+ smbd_disconnect_rdma_connection(sc);
return;
}
- log_keep_alive(INFO, "about to send an empty idle message\n");
- smbd_post_send_empty(info);
+ if (sc->status != SMBDIRECT_SOCKET_CONNECTED)
+ return;
- /* Setup the next idle timeout work */
- queue_delayed_work(info->workqueue, &info->idle_timer_work,
- msecs_to_jiffies(sp->keepalive_interval_msec));
+ /*
+ * Now use the keepalive timeout (instead of keepalive interval)
+ * in order to wait for a response
+ */
+ sc->idle.keepalive = SMBDIRECT_KEEPALIVE_PENDING;
+ mod_delayed_work(sc->workqueue, &sc->idle.timer_work,
+ msecs_to_jiffies(sp->keepalive_timeout_msec));
+ log_keep_alive(INFO, "schedule send of empty idle message\n");
+ queue_work(sc->workqueue, &sc->idle.immediate_work);
}
/*
@@ -1329,7 +1562,6 @@ void smbd_destroy(struct TCP_Server_Info *server)
{
struct smbd_connection *info = server->smbd_conn;
struct smbdirect_socket *sc;
- struct smbdirect_socket_parameters *sp;
struct smbdirect_recv_io *response;
unsigned long flags;
@@ -1338,19 +1570,30 @@ void smbd_destroy(struct TCP_Server_Info *server)
return;
}
sc = &info->socket;
- sp = &sc->parameters;
+
+ log_rdma_event(INFO, "cancelling and disable disconnect_work\n");
+ disable_work_sync(&sc->disconnect_work);
log_rdma_event(INFO, "destroying rdma session\n");
- if (sc->status != SMBDIRECT_SOCKET_DISCONNECTED) {
- rdma_disconnect(sc->rdma.cm_id);
+ if (sc->status < SMBDIRECT_SOCKET_DISCONNECTING) {
+ smbd_disconnect_rdma_work(&sc->disconnect_work);
log_rdma_event(INFO, "wait for transport being disconnected\n");
wait_event_interruptible(
- info->status_wait,
+ sc->status_wait,
sc->status == SMBDIRECT_SOCKET_DISCONNECTED);
}
- log_rdma_event(INFO, "cancelling post_send_credits_work\n");
- disable_work_sync(&info->post_send_credits_work);
+ /*
+ * Wake up all waiters in all wait queues
+ * in order to notice the broken connection.
+ *
+ * Most likely this was already called via
+ * smbd_disconnect_rdma_work(), but call it again...
+ */
+ smbd_disconnect_wake_up_all(sc);
+
+ log_rdma_event(INFO, "cancelling recv_io.posted.refill_work\n");
+ disable_work_sync(&sc->recv_io.posted.refill_work);
log_rdma_event(INFO, "destroying qp\n");
ib_drain_qp(sc->ib.qp);
@@ -1358,18 +1601,20 @@ void smbd_destroy(struct TCP_Server_Info *server)
sc->ib.qp = NULL;
log_rdma_event(INFO, "cancelling idle timer\n");
- disable_delayed_work_sync(&info->idle_timer_work);
+ disable_delayed_work_sync(&sc->idle.timer_work);
+ log_rdma_event(INFO, "cancelling send immediate work\n");
+ disable_work_sync(&sc->idle.immediate_work);
/* It's not possible for upper layer to get to reassembly */
log_rdma_event(INFO, "drain the reassembly queue\n");
do {
spin_lock_irqsave(&sc->recv_io.reassembly.lock, flags);
- response = _get_first_reassembly(info);
+ response = _get_first_reassembly(sc);
if (response) {
list_del(&response->list);
spin_unlock_irqrestore(
&sc->recv_io.reassembly.lock, flags);
- put_receive_buffer(info, response);
+ put_receive_buffer(sc, response);
} else
spin_unlock_irqrestore(
&sc->recv_io.reassembly.lock, flags);
@@ -1377,9 +1622,7 @@ void smbd_destroy(struct TCP_Server_Info *server)
sc->recv_io.reassembly.data_length = 0;
log_rdma_event(INFO, "free receive buffers\n");
- wait_event(info->wait_receive_queues,
- info->count_receive_queue == sp->recv_credit_max);
- destroy_receive_buffers(info);
+ destroy_receive_buffers(sc);
/*
* For performance reasons, memory registration and deregistration
@@ -1389,13 +1632,12 @@ void smbd_destroy(struct TCP_Server_Info *server)
* path when sending data, and then release memory registrations.
*/
log_rdma_event(INFO, "freeing mr list\n");
- wake_up_interruptible_all(&info->wait_mr);
- while (atomic_read(&info->mr_used_count)) {
+ while (atomic_read(&sc->mr_io.used.count)) {
cifs_server_unlock(server);
msleep(1000);
cifs_server_lock(server);
}
- destroy_mr_list(info);
+ destroy_mr_list(sc);
ib_free_cq(sc->ib.send_cq);
ib_free_cq(sc->ib.recv_cq);
@@ -1411,7 +1653,7 @@ void smbd_destroy(struct TCP_Server_Info *server)
sc->status = SMBDIRECT_SOCKET_DESTROYED;
- destroy_workqueue(info->workqueue);
+ destroy_workqueue(sc->workqueue);
log_rdma_event(INFO, "rdma session destroyed\n");
kfree(info);
server->smbd_conn = NULL;
@@ -1453,12 +1695,9 @@ create_conn:
return -ENOENT;
}
-static void destroy_caches_and_workqueue(struct smbd_connection *info)
+static void destroy_caches(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
-
- destroy_receive_buffers(info);
- destroy_workqueue(info->workqueue);
+ destroy_receive_buffers(sc);
mempool_destroy(sc->recv_io.mem.pool);
kmem_cache_destroy(sc->recv_io.mem.cache);
mempool_destroy(sc->send_io.mem.pool);
@@ -1466,9 +1705,8 @@ static void destroy_caches_and_workqueue(struct smbd_connection *info)
}
#define MAX_NAME_LEN 80
-static int allocate_caches_and_workqueue(struct smbd_connection *info)
+static int allocate_caches(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
struct smbdirect_socket_parameters *sp = &sc->parameters;
char name[MAX_NAME_LEN];
int rc;
@@ -1476,7 +1714,7 @@ static int allocate_caches_and_workqueue(struct smbd_connection *info)
if (WARN_ON_ONCE(sp->max_recv_size < sizeof(struct smbdirect_data_transfer)))
return -ENOMEM;
- scnprintf(name, MAX_NAME_LEN, "smbdirect_send_io_%p", info);
+ scnprintf(name, MAX_NAME_LEN, "smbdirect_send_io_%p", sc);
sc->send_io.mem.cache =
kmem_cache_create(
name,
@@ -1492,7 +1730,7 @@ static int allocate_caches_and_workqueue(struct smbd_connection *info)
if (!sc->send_io.mem.pool)
goto out1;
- scnprintf(name, MAX_NAME_LEN, "smbdirect_recv_io_%p", info);
+ scnprintf(name, MAX_NAME_LEN, "smbdirect_recv_io_%p", sc);
struct kmem_cache_args response_args = {
.align = __alignof__(struct smbdirect_recv_io),
@@ -1513,21 +1751,14 @@ static int allocate_caches_and_workqueue(struct smbd_connection *info)
if (!sc->recv_io.mem.pool)
goto out3;
- scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
- info->workqueue = create_workqueue(name);
- if (!info->workqueue)
- goto out4;
-
- rc = allocate_receive_buffers(info, sp->recv_credit_max);
+ rc = allocate_receive_buffers(sc, sp->recv_credit_max);
if (rc) {
log_rdma_event(ERR, "failed to allocate receive buffers\n");
- goto out5;
+ goto out4;
}
return 0;
-out5:
- destroy_workqueue(info->workqueue);
out4:
mempool_destroy(sc->recv_io.mem.pool);
out3:
@@ -1551,46 +1782,63 @@ static struct smbd_connection *_smbd_get_connection(
struct ib_qp_init_attr qp_attr;
struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
struct ib_port_immutable port_immutable;
- u32 ird_ord_hdr[2];
+ __be32 ird_ord_hdr[2];
+ char wq_name[80];
+ struct workqueue_struct *workqueue;
info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
if (!info)
return NULL;
sc = &info->socket;
+ scnprintf(wq_name, ARRAY_SIZE(wq_name), "smbd_%p", sc);
+ workqueue = create_workqueue(wq_name);
+ if (!workqueue)
+ goto create_wq_failed;
+ smbdirect_socket_init(sc);
+ sc->workqueue = workqueue;
sp = &sc->parameters;
- sc->status = SMBDIRECT_SOCKET_CONNECTING;
- rc = smbd_ia_open(info, dstaddr, port);
+ INIT_WORK(&sc->disconnect_work, smbd_disconnect_rdma_work);
+
+ sp->resolve_addr_timeout_msec = RDMA_RESOLVE_TIMEOUT;
+ sp->resolve_route_timeout_msec = RDMA_RESOLVE_TIMEOUT;
+ sp->rdma_connect_timeout_msec = RDMA_RESOLVE_TIMEOUT;
+ sp->negotiate_timeout_msec = SMBD_NEGOTIATE_TIMEOUT * 1000;
+ sp->initiator_depth = 1;
+ sp->responder_resources = SMBD_CM_RESPONDER_RESOURCES;
+ sp->recv_credit_max = smbd_receive_credit_max;
+ sp->send_credit_target = smbd_send_credit_target;
+ sp->max_send_size = smbd_max_send_size;
+ sp->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
+ sp->max_recv_size = smbd_max_receive_size;
+ sp->max_frmr_depth = smbd_max_frmr_depth;
+ sp->keepalive_interval_msec = smbd_keep_alive_interval * 1000;
+ sp->keepalive_timeout_msec = KEEPALIVE_RECV_TIMEOUT * 1000;
+
+ rc = smbd_ia_open(sc, dstaddr, port);
if (rc) {
log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
goto create_id_failed;
}
- if (smbd_send_credit_target > sc->ib.dev->attrs.max_cqe ||
- smbd_send_credit_target > sc->ib.dev->attrs.max_qp_wr) {
+ if (sp->send_credit_target > sc->ib.dev->attrs.max_cqe ||
+ sp->send_credit_target > sc->ib.dev->attrs.max_qp_wr) {
log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cqe %d max_qp_wr %d\n",
- smbd_send_credit_target,
+ sp->send_credit_target,
sc->ib.dev->attrs.max_cqe,
sc->ib.dev->attrs.max_qp_wr);
goto config_failed;
}
- if (smbd_receive_credit_max > sc->ib.dev->attrs.max_cqe ||
- smbd_receive_credit_max > sc->ib.dev->attrs.max_qp_wr) {
+ if (sp->recv_credit_max > sc->ib.dev->attrs.max_cqe ||
+ sp->recv_credit_max > sc->ib.dev->attrs.max_qp_wr) {
log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cqe %d max_qp_wr %d\n",
- smbd_receive_credit_max,
+ sp->recv_credit_max,
sc->ib.dev->attrs.max_cqe,
sc->ib.dev->attrs.max_qp_wr);
goto config_failed;
}
- sp->recv_credit_max = smbd_receive_credit_max;
- sp->send_credit_target = smbd_send_credit_target;
- sp->max_send_size = smbd_max_send_size;
- sp->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
- sp->max_recv_size = smbd_max_receive_size;
- sp->keepalive_interval_msec = smbd_keep_alive_interval * 1000;
-
if (sc->ib.dev->attrs.max_send_sge < SMBDIRECT_SEND_IO_MAX_SGE ||
sc->ib.dev->attrs.max_recv_sge < SMBDIRECT_RECV_IO_MAX_SGE) {
log_rdma_event(ERR,
@@ -1602,8 +1850,16 @@ static struct smbd_connection *_smbd_get_connection(
goto config_failed;
}
+ sc->ib.pd = ib_alloc_pd(sc->ib.dev, 0);
+ if (IS_ERR(sc->ib.pd)) {
+ rc = PTR_ERR(sc->ib.pd);
+ sc->ib.pd = NULL;
+ log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
+ goto alloc_pd_failed;
+ }
+
sc->ib.send_cq =
- ib_alloc_cq_any(sc->ib.dev, info,
+ ib_alloc_cq_any(sc->ib.dev, sc,
sp->send_credit_target, IB_POLL_SOFTIRQ);
if (IS_ERR(sc->ib.send_cq)) {
sc->ib.send_cq = NULL;
@@ -1611,7 +1867,7 @@ static struct smbd_connection *_smbd_get_connection(
}
sc->ib.recv_cq =
- ib_alloc_cq_any(sc->ib.dev, info,
+ ib_alloc_cq_any(sc->ib.dev, sc,
sp->recv_credit_max, IB_POLL_SOFTIRQ);
if (IS_ERR(sc->ib.recv_cq)) {
sc->ib.recv_cq = NULL;
@@ -1620,7 +1876,7 @@ static struct smbd_connection *_smbd_get_connection(
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.event_handler = smbd_qp_async_error_upcall;
- qp_attr.qp_context = info;
+ qp_attr.qp_context = sc;
qp_attr.cap.max_send_wr = sp->send_credit_target;
qp_attr.cap.max_recv_wr = sp->recv_credit_max;
qp_attr.cap.max_send_sge = SMBDIRECT_SEND_IO_MAX_SGE;
@@ -1639,22 +1895,22 @@ static struct smbd_connection *_smbd_get_connection(
}
sc->ib.qp = sc->rdma.cm_id->qp;
- memset(&conn_param, 0, sizeof(conn_param));
- conn_param.initiator_depth = 0;
-
- conn_param.responder_resources =
- min(sc->ib.dev->attrs.max_qp_rd_atom,
- SMBD_CM_RESPONDER_RESOURCES);
- info->responder_resources = conn_param.responder_resources;
+ sp->responder_resources =
+ min_t(u8, sp->responder_resources,
+ sc->ib.dev->attrs.max_qp_rd_atom);
log_rdma_mr(INFO, "responder_resources=%d\n",
- info->responder_resources);
+ sp->responder_resources);
+
+ memset(&conn_param, 0, sizeof(conn_param));
+ conn_param.initiator_depth = sp->initiator_depth;
+ conn_param.responder_resources = sp->responder_resources;
/* Need to send IRD/ORD in private data for iWARP */
sc->ib.dev->ops.get_port_immutable(
sc->ib.dev, sc->rdma.cm_id->port_num, &port_immutable);
if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
- ird_ord_hdr[0] = info->responder_resources;
- ird_ord_hdr[1] = 1;
+ ird_ord_hdr[0] = cpu_to_be32(conn_param.responder_resources);
+ ird_ord_hdr[1] = cpu_to_be32(conn_param.initiator_depth);
conn_param.private_data = ird_ord_hdr;
conn_param.private_data_len = sizeof(ird_ord_hdr);
} else {
@@ -1669,8 +1925,8 @@ static struct smbd_connection *_smbd_get_connection(
log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
&addr_in->sin_addr, port);
- init_waitqueue_head(&info->status_wait);
- init_waitqueue_head(&sc->recv_io.reassembly.wait_queue);
+ WARN_ON_ONCE(sc->status != SMBDIRECT_SOCKET_RDMA_CONNECT_NEEDED);
+ sc->status = SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING;
rc = rdma_connect(sc->rdma.cm_id, &conn_param);
if (rc) {
log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
@@ -1678,45 +1934,42 @@ static struct smbd_connection *_smbd_get_connection(
}
wait_event_interruptible_timeout(
- info->status_wait,
- sc->status != SMBDIRECT_SOCKET_CONNECTING,
- msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
+ sc->status_wait,
+ sc->status != SMBDIRECT_SOCKET_RDMA_CONNECT_RUNNING,
+ msecs_to_jiffies(sp->rdma_connect_timeout_msec));
- if (sc->status != SMBDIRECT_SOCKET_CONNECTED) {
+ if (sc->status != SMBDIRECT_SOCKET_NEGOTIATE_NEEDED) {
log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
goto rdma_connect_failed;
}
log_rdma_event(INFO, "rdma_connect connected\n");
- rc = allocate_caches_and_workqueue(info);
+ rc = allocate_caches(sc);
if (rc) {
log_rdma_event(ERR, "cache allocation failed\n");
goto allocate_cache_failed;
}
- init_waitqueue_head(&info->wait_send_queue);
- INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
- queue_delayed_work(info->workqueue, &info->idle_timer_work,
- msecs_to_jiffies(sp->keepalive_interval_msec));
-
- init_waitqueue_head(&info->wait_send_pending);
- atomic_set(&info->send_pending, 0);
-
- init_waitqueue_head(&info->wait_post_send);
+ INIT_WORK(&sc->idle.immediate_work, send_immediate_empty_message);
+ INIT_DELAYED_WORK(&sc->idle.timer_work, idle_connection_timer);
+ /*
+ * start with the negotiate timeout and SMBDIRECT_KEEPALIVE_PENDING
+ * so that the timer will cause a disconnect.
+ */
+ sc->idle.keepalive = SMBDIRECT_KEEPALIVE_PENDING;
+ mod_delayed_work(sc->workqueue, &sc->idle.timer_work,
+ msecs_to_jiffies(sp->negotiate_timeout_msec));
- INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
- INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
- info->new_credits_offered = 0;
- spin_lock_init(&info->lock_new_credits_offered);
+ INIT_WORK(&sc->recv_io.posted.refill_work, smbd_post_send_credits);
- rc = smbd_negotiate(info);
+ rc = smbd_negotiate(sc);
if (rc) {
log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
goto negotiation_failed;
}
- rc = allocate_mr_list(info);
+ rc = allocate_mr_list(sc);
if (rc) {
log_rdma_mr(ERR, "memory registration allocation failed\n");
goto allocate_mr_failed;
@@ -1731,11 +1984,11 @@ allocate_mr_failed:
return NULL;
negotiation_failed:
- disable_delayed_work_sync(&info->idle_timer_work);
- destroy_caches_and_workqueue(info);
+ disable_delayed_work_sync(&sc->idle.timer_work);
+ destroy_caches(sc);
sc->status = SMBDIRECT_SOCKET_NEGOTIATE_FAILED;
rdma_disconnect(sc->rdma.cm_id);
- wait_event(info->status_wait,
+ wait_event(sc->status_wait,
sc->status == SMBDIRECT_SOCKET_DISCONNECTED);
allocate_cache_failed:
@@ -1749,11 +2002,15 @@ alloc_cq_failed:
if (sc->ib.recv_cq)
ib_free_cq(sc->ib.recv_cq);
-config_failed:
ib_dealloc_pd(sc->ib.pd);
+
+alloc_pd_failed:
+config_failed:
rdma_destroy_id(sc->rdma.cm_id);
create_id_failed:
+ destroy_workqueue(sc->workqueue);
+create_wq_failed:
kfree(info);
return NULL;
}
@@ -1762,6 +2019,7 @@ struct smbd_connection *smbd_get_connection(
struct TCP_Server_Info *server, struct sockaddr *dstaddr)
{
struct smbd_connection *ret;
+ const struct smbdirect_socket_parameters *sp;
int port = SMBD_PORT;
try_again:
@@ -1772,6 +2030,16 @@ try_again:
port = SMB_PORT;
goto try_again;
}
+ if (!ret)
+ return NULL;
+
+ sp = &ret->socket.parameters;
+
+ server->rdma_readwrite_threshold =
+ rdma_readwrite_threshold > sp->max_fragmented_send_size ?
+ sp->max_fragmented_send_size :
+ rdma_readwrite_threshold;
+
return ret;
}
@@ -1813,6 +2081,7 @@ again:
if (sc->recv_io.reassembly.data_length >= size) {
int queue_length;
int queue_removed = 0;
+ unsigned long flags;
/*
* Need to make sure reassembly_data_length is read before
@@ -1827,7 +2096,7 @@ again:
to_read = size;
offset = sc->recv_io.reassembly.first_entry_offset;
while (data_read < size) {
- response = _get_first_reassembly(info);
+ response = _get_first_reassembly(sc);
data_transfer = smbdirect_recv_io_payload(response);
data_length = le32_to_cpu(data_transfer->data_length);
remaining_data_length =
@@ -1872,16 +2141,15 @@ again:
if (queue_length)
list_del(&response->list);
else {
- spin_lock_irq(
- &sc->recv_io.reassembly.lock);
+ spin_lock_irqsave(
+ &sc->recv_io.reassembly.lock, flags);
list_del(&response->list);
- spin_unlock_irq(
- &sc->recv_io.reassembly.lock);
+ spin_unlock_irqrestore(
+ &sc->recv_io.reassembly.lock, flags);
}
queue_removed++;
- info->count_reassembly_queue--;
- info->count_dequeue_reassembly_queue++;
- put_receive_buffer(info, response);
+ sc->statistics.dequeue_reassembly_queue++;
+ put_receive_buffer(sc, response);
offset = 0;
log_read(INFO, "put_receive_buffer offset=0\n");
} else
@@ -1895,10 +2163,10 @@ again:
to_read, data_read, offset);
}
- spin_lock_irq(&sc->recv_io.reassembly.lock);
+ spin_lock_irqsave(&sc->recv_io.reassembly.lock, flags);
sc->recv_io.reassembly.data_length -= data_read;
sc->recv_io.reassembly.queue_length -= queue_removed;
- spin_unlock_irq(&sc->recv_io.reassembly.lock);
+ spin_unlock_irqrestore(&sc->recv_io.reassembly.lock, flags);
sc->recv_io.reassembly.first_entry_offset = offset;
log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
@@ -1983,13 +2251,13 @@ int smbd_send(struct TCP_Server_Info *server,
klen += rqst->rq_iov[i].iov_len;
iov_iter_kvec(&iter, ITER_SOURCE, rqst->rq_iov, rqst->rq_nvec, klen);
- rc = smbd_post_send_full_iter(info, &iter, &remaining_data_length);
+ rc = smbd_post_send_full_iter(sc, &iter, &remaining_data_length);
if (rc < 0)
break;
if (iov_iter_count(&rqst->rq_iter) > 0) {
/* And then the data pages if there are any */
- rc = smbd_post_send_full_iter(info, &rqst->rq_iter,
+ rc = smbd_post_send_full_iter(sc, &rqst->rq_iter,
&remaining_data_length);
if (rc < 0)
break;
@@ -2004,8 +2272,8 @@ int smbd_send(struct TCP_Server_Info *server,
* that means all the I/Os have been out and we are good to return
*/
- wait_event(info->wait_send_pending,
- atomic_read(&info->send_pending) == 0 ||
+ wait_event(sc->send_io.pending.zero_wait_queue,
+ atomic_read(&sc->send_io.pending.count) == 0 ||
sc->status != SMBDIRECT_SOCKET_CONNECTED);
if (sc->status != SMBDIRECT_SOCKET_CONNECTED && rc == 0)
@@ -2016,14 +2284,13 @@ int smbd_send(struct TCP_Server_Info *server,
static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
{
- struct smbd_mr *mr;
- struct ib_cqe *cqe;
+ struct smbdirect_mr_io *mr =
+ container_of(wc->wr_cqe, struct smbdirect_mr_io, cqe);
+ struct smbdirect_socket *sc = mr->socket;
if (wc->status) {
log_rdma_mr(ERR, "status=%d\n", wc->status);
- cqe = wc->wr_cqe;
- mr = container_of(cqe, struct smbd_mr, cqe);
- smbd_disconnect_rdma_connection(mr->conn);
+ smbd_disconnect_rdma_connection(sc);
}
}
@@ -2038,14 +2305,14 @@ static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
*/
static void smbd_mr_recovery_work(struct work_struct *work)
{
- struct smbd_connection *info =
- container_of(work, struct smbd_connection, mr_recovery_work);
- struct smbdirect_socket *sc = &info->socket;
- struct smbd_mr *smbdirect_mr;
+ struct smbdirect_socket *sc =
+ container_of(work, struct smbdirect_socket, mr_io.recovery_work);
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
+ struct smbdirect_mr_io *smbdirect_mr;
int rc;
- list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
- if (smbdirect_mr->state == MR_ERROR) {
+ list_for_each_entry(smbdirect_mr, &sc->mr_io.all.list, list) {
+ if (smbdirect_mr->state == SMBDIRECT_MR_ERROR) {
/* recover this MR entry */
rc = ib_dereg_mr(smbdirect_mr->mr);
@@ -2053,25 +2320,25 @@ static void smbd_mr_recovery_work(struct work_struct *work)
log_rdma_mr(ERR,
"ib_dereg_mr failed rc=%x\n",
rc);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
continue;
}
smbdirect_mr->mr = ib_alloc_mr(
- sc->ib.pd, info->mr_type,
- info->max_frmr_depth);
+ sc->ib.pd, sc->mr_io.type,
+ sp->max_frmr_depth);
if (IS_ERR(smbdirect_mr->mr)) {
log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
- info->mr_type,
- info->max_frmr_depth);
- smbd_disconnect_rdma_connection(info);
+ sc->mr_io.type,
+ sp->max_frmr_depth);
+ smbd_disconnect_rdma_connection(sc);
continue;
}
} else
/* This MR is being used, don't recover it */
continue;
- smbdirect_mr->state = MR_READY;
+ smbdirect_mr->state = SMBDIRECT_MR_READY;
/* smbdirect_mr->state is updated by this function
* and is read and updated by I/O issuing CPUs trying
@@ -2080,19 +2347,18 @@ static void smbd_mr_recovery_work(struct work_struct *work)
* value is updated before waking up any calls to
* get_mr() from the I/O issuing CPUs
*/
- if (atomic_inc_return(&info->mr_ready_count) == 1)
- wake_up_interruptible(&info->wait_mr);
+ if (atomic_inc_return(&sc->mr_io.ready.count) == 1)
+ wake_up(&sc->mr_io.ready.wait_queue);
}
}
-static void destroy_mr_list(struct smbd_connection *info)
+static void destroy_mr_list(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
- struct smbd_mr *mr, *tmp;
+ struct smbdirect_mr_io *mr, *tmp;
- disable_work_sync(&info->mr_recovery_work);
- list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
- if (mr->state == MR_INVALIDATED)
+ disable_work_sync(&sc->mr_io.recovery_work);
+ list_for_each_entry_safe(mr, tmp, &sc->mr_io.all.list, list) {
+ if (mr->state == SMBDIRECT_MR_INVALIDATED)
ib_dma_unmap_sg(sc->ib.dev, mr->sgt.sgl,
mr->sgt.nents, mr->dir);
ib_dereg_mr(mr->mr);
@@ -2108,32 +2374,32 @@ static void destroy_mr_list(struct smbd_connection *info)
* Recovery is done in smbd_mr_recovery_work. The content of list entry changes
* as MRs are used and recovered for I/O, but the list links will not change
*/
-static int allocate_mr_list(struct smbd_connection *info)
+static int allocate_mr_list(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
int i;
- struct smbd_mr *smbdirect_mr, *tmp;
-
- INIT_LIST_HEAD(&info->mr_list);
- init_waitqueue_head(&info->wait_mr);
- spin_lock_init(&info->mr_list_lock);
- atomic_set(&info->mr_ready_count, 0);
- atomic_set(&info->mr_used_count, 0);
- init_waitqueue_head(&info->wait_for_mr_cleanup);
- INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
+ struct smbdirect_mr_io *smbdirect_mr, *tmp;
+
+ INIT_WORK(&sc->mr_io.recovery_work, smbd_mr_recovery_work);
+
+ if (sp->responder_resources == 0) {
+ log_rdma_mr(ERR, "responder_resources negotiated as 0\n");
+ return -EINVAL;
+ }
+
/* Allocate more MRs (2x) than hardware responder_resources */
- for (i = 0; i < info->responder_resources * 2; i++) {
+ for (i = 0; i < sp->responder_resources * 2; i++) {
smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
if (!smbdirect_mr)
goto cleanup_entries;
- smbdirect_mr->mr = ib_alloc_mr(sc->ib.pd, info->mr_type,
- info->max_frmr_depth);
+ smbdirect_mr->mr = ib_alloc_mr(sc->ib.pd, sc->mr_io.type,
+ sp->max_frmr_depth);
if (IS_ERR(smbdirect_mr->mr)) {
log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
- info->mr_type, info->max_frmr_depth);
+ sc->mr_io.type, sp->max_frmr_depth);
goto out;
}
- smbdirect_mr->sgt.sgl = kcalloc(info->max_frmr_depth,
+ smbdirect_mr->sgt.sgl = kcalloc(sp->max_frmr_depth,
sizeof(struct scatterlist),
GFP_KERNEL);
if (!smbdirect_mr->sgt.sgl) {
@@ -2141,18 +2407,18 @@ static int allocate_mr_list(struct smbd_connection *info)
ib_dereg_mr(smbdirect_mr->mr);
goto out;
}
- smbdirect_mr->state = MR_READY;
- smbdirect_mr->conn = info;
+ smbdirect_mr->state = SMBDIRECT_MR_READY;
+ smbdirect_mr->socket = sc;
- list_add_tail(&smbdirect_mr->list, &info->mr_list);
- atomic_inc(&info->mr_ready_count);
+ list_add_tail(&smbdirect_mr->list, &sc->mr_io.all.list);
+ atomic_inc(&sc->mr_io.ready.count);
}
return 0;
out:
kfree(smbdirect_mr);
cleanup_entries:
- list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
+ list_for_each_entry_safe(smbdirect_mr, tmp, &sc->mr_io.all.list, list) {
list_del(&smbdirect_mr->list);
ib_dereg_mr(smbdirect_mr->mr);
kfree(smbdirect_mr->sgt.sgl);
@@ -2169,14 +2435,14 @@ cleanup_entries:
* issuing I/O trying to get MR at the same time, mr_list_lock is used to
* protect this situation.
*/
-static struct smbd_mr *get_mr(struct smbd_connection *info)
+static struct smbdirect_mr_io *get_mr(struct smbdirect_socket *sc)
{
- struct smbdirect_socket *sc = &info->socket;
- struct smbd_mr *ret;
+ struct smbdirect_mr_io *ret;
+ unsigned long flags;
int rc;
again:
- rc = wait_event_interruptible(info->wait_mr,
- atomic_read(&info->mr_ready_count) ||
+ rc = wait_event_interruptible(sc->mr_io.ready.wait_queue,
+ atomic_read(&sc->mr_io.ready.count) ||
sc->status != SMBDIRECT_SOCKET_CONNECTED);
if (rc) {
log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
@@ -2188,18 +2454,18 @@ again:
return NULL;
}
- spin_lock(&info->mr_list_lock);
- list_for_each_entry(ret, &info->mr_list, list) {
- if (ret->state == MR_READY) {
- ret->state = MR_REGISTERED;
- spin_unlock(&info->mr_list_lock);
- atomic_dec(&info->mr_ready_count);
- atomic_inc(&info->mr_used_count);
+ spin_lock_irqsave(&sc->mr_io.all.lock, flags);
+ list_for_each_entry(ret, &sc->mr_io.all.list, list) {
+ if (ret->state == SMBDIRECT_MR_READY) {
+ ret->state = SMBDIRECT_MR_REGISTERED;
+ spin_unlock_irqrestore(&sc->mr_io.all.lock, flags);
+ atomic_dec(&sc->mr_io.ready.count);
+ atomic_inc(&sc->mr_io.used.count);
return ret;
}
}
- spin_unlock(&info->mr_list_lock);
+ spin_unlock_irqrestore(&sc->mr_io.all.lock, flags);
/*
* It is possible that we could fail to get MR because other processes may
* try to acquire a MR at the same time. If this is the case, retry it.
@@ -2210,8 +2476,7 @@ again:
/*
* Transcribe the pages from an iterator into an MR scatterlist.
*/
-static int smbd_iter_to_mr(struct smbd_connection *info,
- struct iov_iter *iter,
+static int smbd_iter_to_mr(struct iov_iter *iter,
struct sg_table *sgt,
unsigned int max_sg)
{
@@ -2233,25 +2498,26 @@ static int smbd_iter_to_mr(struct smbd_connection *info,
* need_invalidate: true if this MR needs to be locally invalidated after I/O
* return value: the MR registered, NULL if failed.
*/
-struct smbd_mr *smbd_register_mr(struct smbd_connection *info,
+struct smbdirect_mr_io *smbd_register_mr(struct smbd_connection *info,
struct iov_iter *iter,
bool writing, bool need_invalidate)
{
struct smbdirect_socket *sc = &info->socket;
- struct smbd_mr *smbdirect_mr;
+ struct smbdirect_socket_parameters *sp = &sc->parameters;
+ struct smbdirect_mr_io *smbdirect_mr;
int rc, num_pages;
enum dma_data_direction dir;
struct ib_reg_wr *reg_wr;
- num_pages = iov_iter_npages(iter, info->max_frmr_depth + 1);
- if (num_pages > info->max_frmr_depth) {
+ num_pages = iov_iter_npages(iter, sp->max_frmr_depth + 1);
+ if (num_pages > sp->max_frmr_depth) {
log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
- num_pages, info->max_frmr_depth);
+ num_pages, sp->max_frmr_depth);
WARN_ON_ONCE(1);
return NULL;
}
- smbdirect_mr = get_mr(info);
+ smbdirect_mr = get_mr(sc);
if (!smbdirect_mr) {
log_rdma_mr(ERR, "get_mr returning NULL\n");
return NULL;
@@ -2264,8 +2530,8 @@ struct smbd_mr *smbd_register_mr(struct smbd_connection *info,
smbdirect_mr->sgt.orig_nents = 0;
log_rdma_mr(INFO, "num_pages=0x%x count=0x%zx depth=%u\n",
- num_pages, iov_iter_count(iter), info->max_frmr_depth);
- smbd_iter_to_mr(info, iter, &smbdirect_mr->sgt, info->max_frmr_depth);
+ num_pages, iov_iter_count(iter), sp->max_frmr_depth);
+ smbd_iter_to_mr(iter, &smbdirect_mr->sgt, sp->max_frmr_depth);
rc = ib_dma_map_sg(sc->ib.dev, smbdirect_mr->sgt.sgl,
smbdirect_mr->sgt.nents, dir);
@@ -2310,32 +2576,32 @@ struct smbd_mr *smbd_register_mr(struct smbd_connection *info,
log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
rc, reg_wr->key);
- /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
+ /* If all failed, attempt to recover this MR by setting it SMBDIRECT_MR_ERROR*/
map_mr_error:
ib_dma_unmap_sg(sc->ib.dev, smbdirect_mr->sgt.sgl,
smbdirect_mr->sgt.nents, smbdirect_mr->dir);
dma_map_error:
- smbdirect_mr->state = MR_ERROR;
- if (atomic_dec_and_test(&info->mr_used_count))
- wake_up(&info->wait_for_mr_cleanup);
+ smbdirect_mr->state = SMBDIRECT_MR_ERROR;
+ if (atomic_dec_and_test(&sc->mr_io.used.count))
+ wake_up(&sc->mr_io.cleanup.wait_queue);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
return NULL;
}
static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
{
- struct smbd_mr *smbdirect_mr;
+ struct smbdirect_mr_io *smbdirect_mr;
struct ib_cqe *cqe;
cqe = wc->wr_cqe;
- smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
- smbdirect_mr->state = MR_INVALIDATED;
+ smbdirect_mr = container_of(cqe, struct smbdirect_mr_io, cqe);
+ smbdirect_mr->state = SMBDIRECT_MR_INVALIDATED;
if (wc->status != IB_WC_SUCCESS) {
log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
- smbdirect_mr->state = MR_ERROR;
+ smbdirect_mr->state = SMBDIRECT_MR_ERROR;
}
complete(&smbdirect_mr->invalidate_done);
}
@@ -2346,11 +2612,10 @@ static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
* and we have to locally invalidate the buffer to prevent data is being
* modified by remote peer after upper layer consumes it
*/
-int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
+int smbd_deregister_mr(struct smbdirect_mr_io *smbdirect_mr)
{
struct ib_send_wr *wr;
- struct smbd_connection *info = smbdirect_mr->conn;
- struct smbdirect_socket *sc = &info->socket;
+ struct smbdirect_socket *sc = smbdirect_mr->socket;
int rc = 0;
if (smbdirect_mr->need_invalidate) {
@@ -2367,36 +2632,36 @@ int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
rc = ib_post_send(sc->ib.qp, wr, NULL);
if (rc) {
log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
- smbd_disconnect_rdma_connection(info);
+ smbd_disconnect_rdma_connection(sc);
goto done;
}
wait_for_completion(&smbdirect_mr->invalidate_done);
smbdirect_mr->need_invalidate = false;
} else
/*
- * For remote invalidation, just set it to MR_INVALIDATED
+ * For remote invalidation, just set it to SMBDIRECT_MR_INVALIDATED
* and defer to mr_recovery_work to recover the MR for next use
*/
- smbdirect_mr->state = MR_INVALIDATED;
+ smbdirect_mr->state = SMBDIRECT_MR_INVALIDATED;
- if (smbdirect_mr->state == MR_INVALIDATED) {
+ if (smbdirect_mr->state == SMBDIRECT_MR_INVALIDATED) {
ib_dma_unmap_sg(
sc->ib.dev, smbdirect_mr->sgt.sgl,
smbdirect_mr->sgt.nents,
smbdirect_mr->dir);
- smbdirect_mr->state = MR_READY;
- if (atomic_inc_return(&info->mr_ready_count) == 1)
- wake_up_interruptible(&info->wait_mr);
+ smbdirect_mr->state = SMBDIRECT_MR_READY;
+ if (atomic_inc_return(&sc->mr_io.ready.count) == 1)
+ wake_up(&sc->mr_io.ready.wait_queue);
} else
/*
* Schedule the work to do MR recovery for future I/Os MR
* recovery is slow and don't want it to block current I/O
*/
- queue_work(info->workqueue, &info->mr_recovery_work);
+ queue_work(sc->workqueue, &sc->mr_io.recovery_work);
done:
- if (atomic_dec_and_test(&info->mr_used_count))
- wake_up(&info->wait_for_mr_cleanup);
+ if (atomic_dec_and_test(&sc->mr_io.used.count))
+ wake_up(&sc->mr_io.cleanup.wait_queue);
return rc;
}