diff options
| -rw-r--r-- | include/rxrpc/call.h | 218 | ||||
| -rw-r--r-- | include/rxrpc/connection.h | 83 | ||||
| -rw-r--r-- | include/rxrpc/krxiod.h | 27 | ||||
| -rw-r--r-- | include/rxrpc/krxsecd.h | 22 | ||||
| -rw-r--r-- | include/rxrpc/krxtimod.h | 45 | ||||
| -rw-r--r-- | include/rxrpc/message.h | 72 | ||||
| -rw-r--r-- | include/rxrpc/packet.h | 128 | ||||
| -rw-r--r-- | include/rxrpc/peer.h | 80 | ||||
| -rw-r--r-- | include/rxrpc/rxrpc.h | 29 | ||||
| -rw-r--r-- | include/rxrpc/transport.h | 115 | ||||
| -rw-r--r-- | include/rxrpc/types.h | 39 | ||||
| -rw-r--r-- | net/Makefile | 1 | ||||
| -rw-r--r-- | net/rxrpc/Makefile | 33 | ||||
| -rw-r--r-- | net/rxrpc/call.c | 2122 | ||||
| -rw-r--r-- | net/rxrpc/connection.c | 687 | ||||
| -rw-r--r-- | net/rxrpc/internal.h | 107 | ||||
| -rw-r--r-- | net/rxrpc/krxiod.c | 262 | ||||
| -rw-r--r-- | net/rxrpc/krxsecd.c | 278 | ||||
| -rw-r--r-- | net/rxrpc/krxtimod.c | 210 | ||||
| -rw-r--r-- | net/rxrpc/main.c | 127 | ||||
| -rw-r--r-- | net/rxrpc/peer.c | 380 | ||||
| -rw-r--r-- | net/rxrpc/proc.c | 612 | ||||
| -rw-r--r-- | net/rxrpc/rxrpc_syms.c | 51 | ||||
| -rw-r--r-- | net/rxrpc/sysctl.c | 73 | ||||
| -rw-r--r-- | net/rxrpc/transport.c | 824 |
25 files changed, 6625 insertions, 0 deletions
diff --git a/include/rxrpc/call.h b/include/rxrpc/call.h new file mode 100644 index 000000000000..0ae39ad9c612 --- /dev/null +++ b/include/rxrpc/call.h @@ -0,0 +1,218 @@ +/* call.h: Rx call record + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_CALL_H +#define _LINUX_RXRPC_CALL_H + +#include <rxrpc/types.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/packet.h> +#include <linux/timer.h> + +#define RXRPC_CALL_ACK_WINDOW_SIZE 16 + +extern unsigned rxrpc_call_rcv_timeout; /* receive activity timeout (secs) */ +extern unsigned rxrpc_call_acks_timeout; /* pending ACK (retransmit) timeout (secs) */ +extern unsigned rxrpc_call_dfr_ack_timeout; /* deferred ACK timeout (secs) */ +extern unsigned short rxrpc_call_max_resend; /* maximum consecutive resend count */ + +/* application call state + * - only state 0 and ffff are reserved, the state is set to 1 after an opid is received + */ +enum rxrpc_app_cstate { + RXRPC_CSTATE_COMPLETE = 0, /* operation complete */ + RXRPC_CSTATE_ERROR, /* operation ICMP error or aborted */ + RXRPC_CSTATE_SRVR_RCV_OPID, /* [SERVER] receiving operation ID */ + RXRPC_CSTATE_SRVR_RCV_ARGS, /* [SERVER] receiving operation data */ + RXRPC_CSTATE_SRVR_GOT_ARGS, /* [SERVER] completely received operation data */ + RXRPC_CSTATE_SRVR_SND_REPLY, /* [SERVER] sending operation reply */ + RXRPC_CSTATE_SRVR_RCV_FINAL_ACK, /* [SERVER] receiving final ACK */ + RXRPC_CSTATE_CLNT_SND_ARGS, /* [CLIENT] sending operation args */ + RXRPC_CSTATE_CLNT_RCV_REPLY, /* [CLIENT] receiving operation reply */ + RXRPC_CSTATE_CLNT_GOT_REPLY, /* [CLIENT] completely received operation reply */ +} __attribute__((packed)); + +extern const char *rxrpc_call_states[]; + +enum rxrpc_app_estate { + RXRPC_ESTATE_NO_ERROR = 0, /* no error */ + RXRPC_ESTATE_LOCAL_ABORT, /* aborted locally by application layer */ + RXRPC_ESTATE_PEER_ABORT, /* aborted remotely by peer */ + RXRPC_ESTATE_LOCAL_ERROR, /* local ICMP network error */ + RXRPC_ESTATE_REMOTE_ERROR, /* remote ICMP network error */ +} __attribute__((packed)); + +extern const char *rxrpc_call_error_states[]; + +/*****************************************************************************/ +/* + * Rx call record and application scratch buffer + * - the call record occupies the bottom of a complete page + * - the application scratch buffer occupies the rest + */ +struct rxrpc_call +{ + atomic_t usage; + struct rxrpc_connection *conn; /* connection upon which active */ + spinlock_t lock; /* access lock */ + struct module *owner; /* owner module */ + wait_queue_head_t waitq; /* wait queue for events to happen */ + struct list_head link; /* general internal list link */ + struct list_head call_link; /* master call list link */ + u32 chan_ix; /* connection channel index (net order) */ + u32 call_id; /* call ID on connection (net order) */ + unsigned long cjif; /* jiffies at call creation */ + unsigned long flags; /* control flags */ +#define RXRPC_CALL_ACKS_TIMO 0x00000001 /* ACKS timeout reached */ +#define RXRPC_CALL_ACKR_TIMO 0x00000002 /* ACKR timeout reached */ +#define RXRPC_CALL_RCV_TIMO 0x00000004 /* RCV timeout reached */ +#define RXRPC_CALL_RCV_PKT 0x00000008 /* received packet */ + + /* transmission */ + rxrpc_seq_t snd_seq_count; /* outgoing packet sequence number counter */ + struct rxrpc_message *snd_nextmsg; /* next message being constructed for sending */ + struct rxrpc_message *snd_ping; /* last ping message sent */ + unsigned short snd_resend_cnt; /* count of resends since last ACK */ + + /* transmission ACK tracking */ + struct list_head acks_pendq; /* messages pending ACK (ordered by seq) */ + unsigned acks_pend_cnt; /* number of un-ACK'd packets */ + rxrpc_seq_t acks_dftv_seq; /* highest definitively ACK'd msg seq */ + struct timer_list acks_timeout; /* timeout on expected ACK */ + + /* reception */ + struct list_head rcv_receiveq; /* messages pending reception (ordered by seq) */ + struct list_head rcv_krxiodq_lk; /* krxiod queue for new inbound packets */ + struct timer_list rcv_timeout; /* call receive activity timeout */ + + /* reception ACK'ing */ + rxrpc_seq_t ackr_win_bot; /* bottom of ACK window */ + rxrpc_seq_t ackr_win_top; /* top of ACK window */ + rxrpc_seq_t ackr_high_seq; /* highest seqno yet received */ + rxrpc_seq_t ackr_prev_seq; /* previous seqno received */ + unsigned ackr_pend_cnt; /* number of pending ACKs */ + struct timer_list ackr_dfr_timo; /* timeout on deferred ACK */ + char ackr_dfr_perm; /* request for deferred ACKs permitted */ + rxrpc_seq_t ackr_dfr_seq; /* seqno for deferred ACK */ + struct rxrpc_ackpacket ackr; /* pending normal ACK packet */ + u8 ackr_array[RXRPC_CALL_ACK_WINDOW_SIZE]; /* ACK records */ + + /* presentation layer */ + char app_last_rcv; /* T if received last packet from remote end */ + enum rxrpc_app_cstate app_call_state; /* call state */ + enum rxrpc_app_estate app_err_state; /* abort/error state */ + struct list_head app_readyq; /* ordered ready received packet queue */ + struct list_head app_unreadyq; /* ordered post-hole recv'd packet queue */ + rxrpc_seq_t app_ready_seq; /* last seq number dropped into readyq */ + size_t app_ready_qty; /* amount of data ready in readyq */ + unsigned app_opcode; /* operation ID */ + unsigned app_abort_code; /* abort code (when aborted) */ + int app_errno; /* error number (when ICMP error received) */ + + /* statisics */ + unsigned pkt_rcv_count; /* count of received packets on this call */ + unsigned pkt_snd_count; /* count of sent packets on this call */ + unsigned app_read_count; /* number of reads issued */ + + /* bits for the application to use */ + rxrpc_call_attn_func_t app_attn_func; /* callback when attention required */ + rxrpc_call_error_func_t app_error_func; /* callback when abort sent (cleanup and put) */ + rxrpc_call_aemap_func_t app_aemap_func; /* callback to map abort code to/from errno */ + void *app_user; /* application data */ + struct list_head app_link; /* application list linkage */ + struct list_head app_attn_link; /* application attention list linkage */ + size_t app_mark; /* trigger callback when app_ready_qty>=app_mark */ + char app_async_read; /* T if in async-read mode */ + u8 *app_read_buf; /* application async read buffer (app_mark size) */ + u8 *app_scr_alloc; /* application scratch allocation pointer */ + void *app_scr_ptr; /* application pointer into scratch buffer */ + +#define RXRPC_APP_MARK_EOF 0xFFFFFFFFU /* mark at end of input */ + + /* application scratch buffer */ + u8 app_scratch[0] __attribute__((aligned(sizeof(long)))); +}; + +#define RXRPC_CALL_SCRATCH_SIZE (PAGE_SIZE - sizeof(struct rxrpc_call)) + +#define rxrpc_call_reset_scratch(CALL) \ +do { (CALL)->app_scr_alloc = (CALL)->app_scratch; } while(0) + +#define rxrpc_call_alloc_scratch(CALL,SIZE) \ +({ \ + void *ptr; \ + ptr = (CALL)->app_scr_alloc; \ + (CALL)->app_scr_alloc += (SIZE); \ + if ((SIZE)>RXRPC_CALL_SCRATCH_SIZE || \ + (size_t)((CALL)->app_scr_alloc - (u8*)(CALL)) > RXRPC_CALL_SCRATCH_SIZE) { \ + printk("rxrpc_call_alloc_scratch(%p,%u)\n",(CALL),(SIZE)); \ + BUG(); \ + } \ + ptr; \ +}) + +#define rxrpc_call_alloc_scratch_s(CALL,TYPE) \ +({ \ + size_t size = sizeof(TYPE); \ + TYPE *ptr; \ + ptr = (TYPE*)(CALL)->app_scr_alloc; \ + (CALL)->app_scr_alloc += size; \ + if (size>RXRPC_CALL_SCRATCH_SIZE || \ + (size_t)((CALL)->app_scr_alloc - (u8*)(CALL)) > RXRPC_CALL_SCRATCH_SIZE) { \ + printk("rxrpc_call_alloc_scratch(%p,%u)\n",(CALL),size); \ + BUG(); \ + } \ + ptr; \ +}) + +#define rxrpc_call_is_ack_pending(CALL) ((CALL)->ackr.reason != 0) + +extern int rxrpc_create_call(struct rxrpc_connection *conn, + rxrpc_call_attn_func_t attn, + rxrpc_call_error_func_t error, + rxrpc_call_aemap_func_t aemap, + struct rxrpc_call **_call); + +extern int rxrpc_incoming_call(struct rxrpc_connection *conn, + struct rxrpc_message *msg, + struct rxrpc_call **_call); + +static inline void rxrpc_get_call(struct rxrpc_call *call) +{ + if (atomic_read(&call->usage)<=0) + BUG(); + atomic_inc(&call->usage); + /*printk("rxrpc_get_call(%p{u=%d})\n",(C),atomic_read(&(C)->usage));*/ +} + +extern void rxrpc_put_call(struct rxrpc_call *call); + +extern void rxrpc_call_do_stuff(struct rxrpc_call *call); + +extern int rxrpc_call_abort(struct rxrpc_call *call, int error); + +#define RXRPC_CALL_READ_BLOCK 0x0001 /* block if not enough data and not yet EOF */ +#define RXRPC_CALL_READ_ALL 0x0002 /* error if insufficient data received */ +extern int rxrpc_call_read_data(struct rxrpc_call *call, void *buffer, size_t size, int flags); + +extern int rxrpc_call_write_data(struct rxrpc_call *call, + size_t sioc, + struct iovec siov[], + u8 rxhdr_flags, + int alloc_flags, + int dup_data, + size_t *size_sent); + +extern int rxrpc_call_flush(struct rxrpc_call *call); + +extern void rxrpc_call_handle_error(struct rxrpc_call *conn, int local, int errno); + +#endif /* _LINUX_RXRPC_CALL_H */ diff --git a/include/rxrpc/connection.h b/include/rxrpc/connection.h new file mode 100644 index 000000000000..fc10fed01b21 --- /dev/null +++ b/include/rxrpc/connection.h @@ -0,0 +1,83 @@ +/* connection.h: Rx connection record + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_CONNECTION_H +#define _LINUX_RXRPC_CONNECTION_H + +#include <rxrpc/types.h> +#include <rxrpc/krxtimod.h> + +struct sk_buff; + +/*****************************************************************************/ +/* + * Rx connection + * - connections are matched by (rmt_port,rmt_addr,service_id,conn_id,clientflag) + * - connections only retain a refcount on the peer when they are active + * - connections with refcount==0 are inactive and reside in the peer's graveyard + */ +struct rxrpc_connection +{ + atomic_t usage; + struct rxrpc_transport *trans; /* transport endpoint */ + struct rxrpc_peer *peer; /* peer from/to which connected */ + struct rxrpc_service *service; /* responsible service (inbound conns) */ + struct rxrpc_timer timeout; /* decaching timer */ + struct list_head link; /* link in peer's list */ + struct list_head proc_link; /* link in proc list */ + struct list_head err_link; /* link in ICMP error processing list */ + struct sockaddr_in addr; /* remote address */ + struct rxrpc_call *channels[4]; /* channels (active calls) */ + wait_queue_head_t chanwait; /* wait for channel to become available */ + spinlock_t lock; /* access lock */ + struct timeval atime; /* last access time */ + size_t mtu_size; /* MTU size for outbound messages */ + unsigned call_counter; /* call ID counter */ + rxrpc_serial_t serial_counter; /* packet serial number counter */ + + /* the following should all be in net order */ + u32 in_epoch; /* peer's epoch */ + u32 out_epoch; /* my epoch */ + u32 conn_id; /* connection ID, appropriately shifted */ + u16 service_id; /* service ID */ + u8 security_ix; /* security ID */ + u8 in_clientflag; /* RXRPC_CLIENT_INITIATED if we are server */ + u8 out_clientflag; /* RXRPC_CLIENT_INITIATED if we are client */ +}; + +extern int rxrpc_create_connection(struct rxrpc_transport *trans, + u16 port, + u32 addr, + unsigned short service_id, + void *security, + struct rxrpc_connection **_conn); + +extern int rxrpc_connection_lookup(struct rxrpc_peer *peer, + struct rxrpc_message *msg, + struct rxrpc_connection **_conn); + +static inline void rxrpc_get_connection(struct rxrpc_connection *conn) +{ + if (atomic_read(&conn->usage)<0) + BUG(); + atomic_inc(&conn->usage); + //printk("rxrpc_get_conn(%p{u=%d})\n",conn,atomic_read(&conn->usage)); +} + +extern void rxrpc_put_connection(struct rxrpc_connection *conn); + +extern int rxrpc_conn_receive_call_packet(struct rxrpc_connection *conn, + struct rxrpc_call *call, + struct rxrpc_message *msg); + +extern void rxrpc_conn_handle_error(struct rxrpc_connection *conn, int local, int errno); + +#endif /* _LINUX_RXRPC_CONNECTION_H */ diff --git a/include/rxrpc/krxiod.h b/include/rxrpc/krxiod.h new file mode 100644 index 000000000000..c0e0e82e4df2 --- /dev/null +++ b/include/rxrpc/krxiod.h @@ -0,0 +1,27 @@ +/* krxiod.h: Rx RPC I/O kernel thread interface + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_KRXIOD_H +#define _LINUX_RXRPC_KRXIOD_H + +#include <rxrpc/types.h> + +extern int rxrpc_krxiod_init(void); +extern void rxrpc_krxiod_kill(void); +extern void rxrpc_krxiod_queue_transport(struct rxrpc_transport *trans); +extern void rxrpc_krxiod_dequeue_transport(struct rxrpc_transport *trans); +extern void rxrpc_krxiod_queue_peer(struct rxrpc_peer *peer); +extern void rxrpc_krxiod_dequeue_peer(struct rxrpc_peer *peer); +extern void rxrpc_krxiod_clear_peers(struct rxrpc_transport *trans); +extern void rxrpc_krxiod_queue_call(struct rxrpc_call *call); +extern void rxrpc_krxiod_dequeue_call(struct rxrpc_call *call); + +#endif /* _LINUX_RXRPC_KRXIOD_H */ diff --git a/include/rxrpc/krxsecd.h b/include/rxrpc/krxsecd.h new file mode 100644 index 000000000000..55ce43a25b38 --- /dev/null +++ b/include/rxrpc/krxsecd.h @@ -0,0 +1,22 @@ +/* krxsecd.h: Rx RPC security kernel thread interface + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_KRXSECD_H +#define _LINUX_RXRPC_KRXSECD_H + +#include <rxrpc/types.h> + +extern int rxrpc_krxsecd_init(void); +extern void rxrpc_krxsecd_kill(void); +extern void rxrpc_krxsecd_clear_transport(struct rxrpc_transport *trans); +extern void rxrpc_krxsecd_queue_incoming_call(struct rxrpc_message *msg); + +#endif /* _LINUX_RXRPC_KRXSECD_H */ diff --git a/include/rxrpc/krxtimod.h b/include/rxrpc/krxtimod.h new file mode 100644 index 000000000000..b3d298b612f2 --- /dev/null +++ b/include/rxrpc/krxtimod.h @@ -0,0 +1,45 @@ +/* krxtimod.h: RxRPC timeout daemon + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_KRXTIMOD_H +#define _LINUX_RXRPC_KRXTIMOD_H + +#include <rxrpc/types.h> + +struct rxrpc_timer_ops { + /* called when the front of the timer queue has timed out */ + void (*timed_out)(struct rxrpc_timer *timer); +}; + +/*****************************************************************************/ +/* + * RXRPC timer/timeout record + */ +struct rxrpc_timer +{ + struct list_head link; /* link in timer queue */ + unsigned long timo_jif; /* timeout time */ + const struct rxrpc_timer_ops *ops; /* timeout expiry function */ +}; + +static inline void rxrpc_timer_init(rxrpc_timer_t *timer, const struct rxrpc_timer_ops *ops) +{ + INIT_LIST_HEAD(&timer->link); + timer->ops = ops; +} + +extern int rxrpc_krxtimod_start(void); +extern void rxrpc_krxtimod_kill(void); + +extern void rxrpc_krxtimod_add_timer(rxrpc_timer_t *timer, unsigned long timeout); +extern int rxrpc_krxtimod_del_timer(rxrpc_timer_t *timer); + +#endif /* _LINUX_RXRPC_KRXTIMOD_H */ diff --git a/include/rxrpc/message.h b/include/rxrpc/message.h new file mode 100644 index 000000000000..2e43c03c6857 --- /dev/null +++ b/include/rxrpc/message.h @@ -0,0 +1,72 @@ +/* message.h: Rx message caching + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _H_3AD3363A_3A9C_11D6_83D8_0002B3163499 +#define _H_3AD3363A_3A9C_11D6_83D8_0002B3163499 + +#include <rxrpc/packet.h> + +/*****************************************************************************/ +/* + * Rx message record + */ +struct rxrpc_message +{ + atomic_t usage; + struct list_head link; /* list link */ + struct timeval stamp; /* time received or last sent */ + rxrpc_seq_t seq; /* message sequence number */ + + int state; /* the state the message is currently in */ +#define RXRPC_MSG_PREPARED 0 +#define RXRPC_MSG_SENT 1 +#define RXRPC_MSG_ACKED 2 /* provisionally ACK'd */ +#define RXRPC_MSG_DONE 3 /* definitively ACK'd (msg->seq<ack.firstPacket) */ +#define RXRPC_MSG_RECEIVED 4 +#define RXRPC_MSG_ERROR -1 + char rttdone; /* used for RTT */ + + struct rxrpc_transport *trans; /* transport received through */ + struct rxrpc_connection *conn; /* connection received over */ + struct sk_buff *pkt; /* received packet */ + off_t offset; /* offset into pkt of next byte of data */ + + struct rxrpc_header hdr; /* message header */ + + int dcount; /* data part count */ + size_t dsize; /* data size */ +#define RXRPC_MSG_MAX_IOCS 8 + struct iovec data[RXRPC_MSG_MAX_IOCS]; /* message data */ + unsigned long dfree; /* bit mask indicating kfree(data[x]) if T */ +}; + +#define rxrpc_get_message(M) do { atomic_inc(&(M)->usage); } while(0) + +extern void __rxrpc_put_message(struct rxrpc_message *msg); +static inline void rxrpc_put_message(struct rxrpc_message *msg) +{ + if (atomic_read(&msg->usage)<=0) + BUG(); + if (atomic_dec_and_test(&msg->usage)) + __rxrpc_put_message(msg); +} + +extern int rxrpc_conn_newmsg(struct rxrpc_connection *conn, + struct rxrpc_call *call, + u8 type, + int count, + struct iovec diov[], + int alloc_flags, + struct rxrpc_message **_msg); + +extern int rxrpc_conn_sendmsg(struct rxrpc_connection *conn, struct rxrpc_message *msg); + +#endif /* _H_3AD3363A_3A9C_11D6_83D8_0002B3163499 */ diff --git a/include/rxrpc/packet.h b/include/rxrpc/packet.h new file mode 100644 index 000000000000..78999077f5b8 --- /dev/null +++ b/include/rxrpc/packet.h @@ -0,0 +1,128 @@ +/* packet.h: Rx packet layout and definitions + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_PACKET_H +#define _LINUX_RXRPC_PACKET_H + +#include <rxrpc/types.h> + +#define RXRPC_IPUDP_SIZE 28 +extern size_t RXRPC_MAX_PACKET_SIZE; +#define RXRPC_MAX_PACKET_DATA_SIZE (RXRPC_MAX_PACKET_SIZE - sizeof(struct rxrpc_header)) +#define RXRPC_LOCAL_PACKET_SIZE RXRPC_MAX_PACKET_SIZE +#define RXRPC_REMOTE_PACKET_SIZE (576 - RXRPC_IPUDP_SIZE) + +/*****************************************************************************/ +/* + * on-the-wire Rx packet header + * - all multibyte fields should be in network byte order + */ +struct rxrpc_header +{ + u32 epoch; /* client boot timestamp */ + + u32 cid; /* connection and channel ID */ +#define RXRPC_MAXCALLS 4 /* max active calls per conn */ +#define RXRPC_CHANNELMASK (RXRPC_MAXCALLS-1) /* mask for channel ID */ +#define RXRPC_CIDMASK (~RXRPC_CHANNELMASK) /* mask for connection ID */ +#define RXRPC_CIDSHIFT 2 /* shift for connection ID */ + + u32 callNumber; /* call ID (0 for connection-level packets) */ +#define RXRPC_PROCESS_MAXCALLS (1<<2) /* maximum number of active calls per conn (power of 2) */ + + u32 seq; /* sequence number of pkt in call stream */ + u32 serial; /* serial number of pkt sent to network */ + + u8 type; /* packet type */ +#define RXRPC_PACKET_TYPE_DATA 1 /* data */ +#define RXRPC_PACKET_TYPE_ACK 2 /* ACK */ +#define RXRPC_PACKET_TYPE_BUSY 3 /* call reject */ +#define RXRPC_PACKET_TYPE_ABORT 4 /* call/connection abort */ +#define RXRPC_PACKET_TYPE_ACKALL 5 /* ACK all outstanding packets on call */ +#define RXRPC_PACKET_TYPE_CHALLENGE 6 /* connection security challenge (SRVR->CLNT) */ +#define RXRPC_PACKET_TYPE_RESPONSE 7 /* connection secutity response (CLNT->SRVR) */ +#define RXRPC_PACKET_TYPE_DEBUG 8 /* debug info request */ +#define RXRPC_N_PACKET_TYPES 9 /* number of packet types (incl type 0) */ + + u8 flags; /* packet flags */ +#define RXRPC_CLIENT_INITIATED 0x01 /* signifies a packet generated by a client */ +#define RXRPC_REQUEST_ACK 0x02 /* request an unconditional ACK of this packet */ +#define RXRPC_LAST_PACKET 0x04 /* the last packet from this side for this call */ +#define RXRPC_MORE_PACKETS 0x08 /* more packets to come */ +#define RXRPC_JUMBO_PACKET 0x20 /* [DATA] this is a jumbo packet */ +#define RXRPC_SLOW_START_OK 0x20 /* [ACK] slow start supported */ + + u8 userStatus; /* app-layer defined status */ + u8 securityIndex; /* security protocol ID */ + u16 _rsvd; /* reserved (used by kerberos security as cksum) */ + u16 serviceId; /* service ID */ + +} __attribute__((packed)); + +#define __rxrpc_header_off(X) offsetof(struct rxrpc_header,X) + +extern const char *rxrpc_pkts[]; + +/*****************************************************************************/ +/* + * jumbo packet secondary header + * - can be mapped to read header by: + * - new_serial = serial + 1 + * - new_seq = seq + 1 + * - new_flags = j_flags + * - new__rsvd = j__rsvd + * - duplicating all other fields + */ +struct rxrpc_jumbo_header +{ + u8 flags; /* packet flags (as per rxrpc_header) */ + u8 pad; + u16 _rsvd; /* reserved (used by kerberos security as cksum) */ +}; + +#define RXRPC_JUMBO_DATALEN 1412 /* non-terminal jumbo packet data length */ + +/*****************************************************************************/ +/* + * on-the-wire Rx ACK packet data payload + * - all multibyte fields should be in network byte order + */ +struct rxrpc_ackpacket +{ + u16 bufferSpace; /* number of packet buffers available */ + u16 maxSkew; /* diff between serno being ACK'd and highest serial no received */ + u32 firstPacket; /* sequence no of first ACK'd packet in attached list */ + u32 previousPacket; /* sequence no of previous packet received */ + u32 serial; /* serial no of packet that prompted this ACK */ + + u8 reason; /* reason for ACK */ +#define RXRPC_ACK_REQUESTED 1 /* ACK was requested on packet */ +#define RXRPC_ACK_DUPLICATE 2 /* duplicate packet received */ +#define RXRPC_ACK_OUT_OF_SEQUENCE 3 /* out of sequence packet received */ +#define RXRPC_ACK_EXCEEDS_WINDOW 4 /* packet received beyond end of ACK window */ +#define RXRPC_ACK_NOSPACE 5 /* packet discarded due to lack of buffer space */ +#define RXRPC_ACK_PING 6 /* keep alive ACK */ +#define RXRPC_ACK_PING_RESPONSE 7 /* response to RXRPC_ACK_PING */ +#define RXRPC_ACK_DELAY 8 /* nothing happened since received packet */ +#define RXRPC_ACK_IDLE 9 /* ACK due to fully received ACK window */ + + u8 nAcks; /* number of ACKs */ +#define RXRPC_MAXACKS 255 + + u8 acks[0]; /* list of ACK/NAKs */ +#define RXRPC_ACK_TYPE_NACK 0 +#define RXRPC_ACK_TYPE_ACK 1 + +} __attribute__((packed)); + +extern const char *rxrpc_acks[]; + +#endif /* _LINUX_RXRPC_PACKET_H */ diff --git a/include/rxrpc/peer.h b/include/rxrpc/peer.h new file mode 100644 index 000000000000..9f09bc95a40f --- /dev/null +++ b/include/rxrpc/peer.h @@ -0,0 +1,80 @@ +/* peer.h: Rx RPC per-transport peer record + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_PEER_H +#define _LINUX_RXRPC_PEER_H + +#include <linux/wait.h> +#include <rxrpc/types.h> +#include <rxrpc/krxtimod.h> + +struct rxrpc_peer_ops +{ + /* peer record being added */ + int (*adding)(struct rxrpc_peer *peer); + + /* peer record being discarded from graveyard */ + void (*discarding)(struct rxrpc_peer *peer); + + /* change of epoch detected on connection */ + void (*change_of_epoch)(struct rxrpc_connection *conn); +}; + +/*****************************************************************************/ +/* + * Rx RPC per-transport peer record + * - peers only retain a refcount on the transport when they are active + * - peers with refcount==0 are inactive and reside in the transport's graveyard + */ +struct rxrpc_peer +{ + atomic_t usage; + struct rxrpc_peer_ops *ops; /* operations on this peer */ + struct rxrpc_transport *trans; /* owner transport */ + struct rxrpc_timer timeout; /* timeout for grave destruction */ + struct list_head link; /* link in transport's peer list */ + struct list_head proc_link; /* link in /proc list */ + rwlock_t conn_lock; /* lock for connections */ + struct list_head conn_active; /* active connections to/from this peer */ + struct list_head conn_graveyard; /* graveyard for inactive connections */ + spinlock_t conn_gylock; /* lock for conn_graveyard */ + wait_queue_head_t conn_gy_waitq; /* wait queue hit when graveyard is empty */ + atomic_t conn_count; /* number of attached connections */ + struct in_addr addr; /* remote address */ + size_t if_mtu; /* interface MTU for this peer */ + spinlock_t lock; /* access lock */ + + void *user; /* application layer data */ + + /* calculated RTT cache */ +#define RXRPC_RTT_CACHE_SIZE 32 + suseconds_t rtt; /* current RTT estimate (in uS) */ + unsigned short rtt_point; /* next entry at which to insert */ + unsigned short rtt_usage; /* amount of cache actually used */ + suseconds_t rtt_cache[RXRPC_RTT_CACHE_SIZE]; /* calculated RTT cache */ +}; + + +extern int rxrpc_peer_lookup(struct rxrpc_transport *trans, + u32 addr, + struct rxrpc_peer **_peer); + +static inline void rxrpc_get_peer(struct rxrpc_peer *peer) +{ + if (atomic_read(&peer->usage)<0) + BUG(); + atomic_inc(&peer->usage); + //printk("rxrpc_get_peer(%p{u=%d})\n",peer,atomic_read(&peer->usage)); +} + +extern void rxrpc_put_peer(struct rxrpc_peer *peer); + +#endif /* _LINUX_RXRPC_PEER_H */ diff --git a/include/rxrpc/rxrpc.h b/include/rxrpc/rxrpc.h new file mode 100644 index 000000000000..454d59933675 --- /dev/null +++ b/include/rxrpc/rxrpc.h @@ -0,0 +1,29 @@ +/* rx.h: Rx RPC interface + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_RXRPC_H +#define _LINUX_RXRPC_RXRPC_H + +#ifdef __KERNEL__ + +extern u32 rxrpc_epoch; + +extern int rxrpc_ktrace; +extern int rxrpc_kdebug; +extern int rxrpc_kproto; +extern int rxrpc_knet; + +extern int rxrpc_sysctl_init(void); +extern void rxrpc_sysctl_cleanup(void); + +#endif /* __KERNEL__ */ + +#endif /* _LINUX_RXRPC_RXRPC_H */ diff --git a/include/rxrpc/transport.h b/include/rxrpc/transport.h new file mode 100644 index 000000000000..b9c225533158 --- /dev/null +++ b/include/rxrpc/transport.h @@ -0,0 +1,115 @@ +/* transport.h: Rx transport management + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_TRANSPORT_H +#define _LINUX_RXRPC_TRANSPORT_H + +#include <rxrpc/types.h> +#include <rxrpc/krxiod.h> +#include <rxrpc/rxrpc.h> +#include <linux/skbuff.h> +#include <linux/rwsem.h> + +typedef int (*rxrpc_newcall_fnx_t)(struct rxrpc_call *call); + +extern wait_queue_head_t rxrpc_krxiod_wq; + +/*****************************************************************************/ +/* + * Rx operation specification + * - tables of these must be sorted by op ID so that they can be binary-chop searched + */ +struct rxrpc_operation +{ + unsigned id; /* operation ID */ + size_t asize; /* minimum size of argument block */ + const char *name; /* name of operation */ + void *user; /* initial user data */ +}; + +/*****************************************************************************/ +/* + * Rx transport service record + */ +struct rxrpc_service +{ + struct list_head link; /* link in services list on transport */ + struct module *owner; /* owner module */ + rxrpc_newcall_fnx_t new_call; /* new call handler function */ + const char *name; /* name of service */ + unsigned short service_id; /* Rx service ID */ + rxrpc_call_attn_func_t attn_func; /* call requires attention callback */ + rxrpc_call_error_func_t error_func; /* call error callback */ + rxrpc_call_aemap_func_t aemap_func; /* abort -> errno mapping callback */ + + const struct rxrpc_operation *ops_begin; /* beginning of operations table */ + const struct rxrpc_operation *ops_end; /* end of operations table */ +}; + +/*****************************************************************************/ +/* + * Rx transport endpoint record + */ +struct rxrpc_transport +{ + atomic_t usage; + struct socket *socket; /* my UDP socket */ + struct list_head services; /* services listening on this socket */ + struct list_head link; /* link in transport list */ + struct list_head proc_link; /* link in transport proc list */ + struct list_head krxiodq_link; /* krxiod attention queue link */ + spinlock_t lock; /* access lock */ + struct list_head peer_active; /* active peers connected to over this socket */ + struct list_head peer_graveyard; /* inactive peer list */ + spinlock_t peer_gylock; /* peer graveyard lock */ + wait_queue_head_t peer_gy_waitq; /* wait queue hit when peer graveyard is empty */ + rwlock_t peer_lock; /* peer list access lock */ + atomic_t peer_count; /* number of peers */ + struct rxrpc_peer_ops *peer_ops; /* default peer operations */ + unsigned short port; /* port upon which listening */ + volatile char error_rcvd; /* T if received ICMP error outstanding */ +}; + +extern struct list_head rxrpc_transports; + +extern int rxrpc_create_transport(unsigned short port, + struct rxrpc_transport **_trans); + +static inline void rxrpc_get_transport(struct rxrpc_transport *trans) +{ + if (atomic_read(&trans->usage)<=0) + BUG(); + atomic_inc(&trans->usage); + //printk("rxrpc_get_transport(%p{u=%d})\n",trans,atomic_read(&trans->usage)); +} + +extern void rxrpc_put_transport(struct rxrpc_transport *trans); + +extern int rxrpc_add_service(struct rxrpc_transport *trans, + struct rxrpc_service *srv); + +extern void rxrpc_del_service(struct rxrpc_transport *trans, + struct rxrpc_service *srv); + +#if 0 +extern int rxrpc_trans_add_connection(struct rxrpc_transport *trans, + struct rxrpc_connection *conn); +#endif + +extern void rxrpc_trans_receive_packet(struct rxrpc_transport *trans); + +extern int rxrpc_trans_immediate_abort(struct rxrpc_transport *trans, + struct rxrpc_message *msg, + int error); + +extern void rxrpc_clear_transport(struct rxrpc_transport *trans); + +#endif /* _LINUX_RXRPC_TRANSPORT_H */ diff --git a/include/rxrpc/types.h b/include/rxrpc/types.h new file mode 100644 index 000000000000..40700bc61a6f --- /dev/null +++ b/include/rxrpc/types.h @@ -0,0 +1,39 @@ +/* types.h: Rx types + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#ifndef _LINUX_RXRPC_TYPES_H +#define _LINUX_RXRPC_TYPES_H + +#include <linux/types.h> +#include <linux/list.h> +#include <linux/socket.h> +#include <linux/in.h> +#include <linux/spinlock.h> +#include <asm/atomic.h> + +typedef unsigned rxrpc_seq_t; /* Rx message sequence number */ +typedef unsigned rxrpc_serial_t; /* Rx message serial number */ + +struct rxrpc_call; +struct rxrpc_connection; +struct rxrpc_header; +struct rxrpc_message; +struct rxrpc_operation; +struct rxrpc_peer; +struct rxrpc_service; +typedef struct rxrpc_timer rxrpc_timer_t; +struct rxrpc_transport; + +typedef void (*rxrpc_call_attn_func_t)(struct rxrpc_call *call); +typedef void (*rxrpc_call_error_func_t)(struct rxrpc_call *call); +typedef void (*rxrpc_call_aemap_func_t)(struct rxrpc_call *call); + +#endif /* _LINUX_RXRPC_TYPES_H */ diff --git a/net/Makefile b/net/Makefile index 4d09f62752fa..f7e9d2b90fd0 100644 --- a/net/Makefile +++ b/net/Makefile @@ -29,6 +29,7 @@ obj-$(CONFIG_AX25) += ax25/ obj-$(CONFIG_IRDA) += irda/ obj-$(CONFIG_BT) += bluetooth/ obj-$(CONFIG_SUNRPC) += sunrpc/ +obj-$(CONFIG_RXRPC) += rxrpc/ obj-$(CONFIG_ATM) += atm/ obj-$(CONFIG_DECNET) += decnet/ obj-$(CONFIG_ECONET) += econet/ diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile new file mode 100644 index 000000000000..63e3027738c4 --- /dev/null +++ b/net/rxrpc/Makefile @@ -0,0 +1,33 @@ +# +# Makefile for Linux kernel Rx RPC +# + +export-objs := rxrpc_syms.o + +rxrpc-objs := \ + call.o \ + connection.o \ + krxiod.o \ + krxsecd.o \ + krxtimod.o \ + main.o \ + peer.o \ + rxrpc_syms.o \ + transport.o + +#ifeq ($(CONFIG_PROC_FS),y) +rxrpc-objs += proc.o +#endif +#ifeq ($(CONFIG_SYSCTL),y) +rxrpc-objs += sysctl.o +#endif + +obj-m := rxrpc.o + +# superfluous for 2.5, but needed for 2.4.. +ifeq "$(VERSION).$(PATCHLEVEL)" "2.4" +rxrpc.o: $(rxrpc-objs) + $(LD) -r -o $@ $(rxrpc-objs) +endif + +include $(TOPDIR)/Rules.make diff --git a/net/rxrpc/call.c b/net/rxrpc/call.c new file mode 100644 index 000000000000..475fd925e5fa --- /dev/null +++ b/net/rxrpc/call.c @@ -0,0 +1,2122 @@ +/* call.c: Rx call routines + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include "internal.h" + +__RXACCT_DECL(atomic_t rxrpc_call_count); +__RXACCT_DECL(atomic_t rxrpc_message_count); + +LIST_HEAD(rxrpc_calls); +DECLARE_RWSEM(rxrpc_calls_sem); + +unsigned rxrpc_call_rcv_timeout = 30; +unsigned rxrpc_call_acks_timeout = 30; +unsigned rxrpc_call_dfr_ack_timeout = 5; +unsigned short rxrpc_call_max_resend = 10; + +const char *rxrpc_call_states[] = { + "COMPLETE", + "ERROR", + "SRVR_RCV_OPID", + "SRVR_RCV_ARGS", + "SRVR_GOT_ARGS", + "SRVR_SND_REPLY", + "SRVR_RCV_FINAL_ACK", + "CLNT_SND_ARGS", + "CLNT_RCV_REPLY", + "CLNT_GOT_REPLY" +}; + +const char *rxrpc_call_error_states[] = { + "NO_ERROR", + "LOCAL_ABORT", + "PEER_ABORT", + "LOCAL_ERROR", + "REMOTE_ERROR" +}; + +const char *rxrpc_pkts[] = { + "?00", "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", + "?09", "?10", "?11", "?12", "?13", "?14", "?15" +}; + +const char *rxrpc_acks[] = { + "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", "-?-" +}; + +static const char _acktype[] = "NA-"; + +static void rxrpc_call_receive_packet(struct rxrpc_call *call); +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg); +static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg); +static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t higest); +static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); +static int __rxrpc_call_read_data(struct rxrpc_call *call); + +static int rxrpc_call_record_ACK(struct rxrpc_call *call, + struct rxrpc_message *msg, + rxrpc_seq_t seq, + size_t count); +#define _state(call) \ + _debug("[[[ state %s ]]]",rxrpc_call_states[call->app_call_state]); + +static void rxrpc_call_default_attn_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_error_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) +{ + switch (call->app_err_state) { + case RXRPC_ESTATE_LOCAL_ABORT: + call->app_abort_code = -call->app_errno; + case RXRPC_ESTATE_PEER_ABORT: + call->app_errno = -ECONNABORTED; + default: + break; + } +} + +static void __rxrpc_call_acks_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKS TIMEOUT %05lu",jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKS_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_rcv_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("RCV TIMEOUT %05lu",jiffies - call->cjif); + + call->flags |= RXRPC_CALL_RCV_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_ackr_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKR_TIMO; + rxrpc_krxiod_queue_call(call); +} + +/*****************************************************************************/ +/* + * create a new call record + */ +static inline int __rxrpc_create_call(struct rxrpc_connection *conn, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + + _enter("%p",conn); + + /* allocate and initialise a call record */ + call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); + if (!call) { + _leave(" ENOMEM"); + return -ENOMEM; + } + + atomic_set(&call->usage,1); + + init_waitqueue_head(&call->waitq); + spin_lock_init(&call->lock); + INIT_LIST_HEAD(&call->link); + INIT_LIST_HEAD(&call->acks_pendq); + INIT_LIST_HEAD(&call->rcv_receiveq); + INIT_LIST_HEAD(&call->rcv_krxiodq_lk); + INIT_LIST_HEAD(&call->app_readyq); + INIT_LIST_HEAD(&call->app_unreadyq); + INIT_LIST_HEAD(&call->app_link); + INIT_LIST_HEAD(&call->app_attn_link); + + init_timer(&call->acks_timeout); + call->acks_timeout.data = (unsigned long) call; + call->acks_timeout.function = __rxrpc_call_acks_timeout; + + init_timer(&call->rcv_timeout); + call->rcv_timeout.data = (unsigned long) call; + call->rcv_timeout.function = __rxrpc_call_rcv_timeout; + + init_timer(&call->ackr_dfr_timo); + call->ackr_dfr_timo.data = (unsigned long) call; + call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; + + call->conn = conn; + call->ackr_win_bot = 1; + call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; + call->ackr_prev_seq = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_attn_func = rxrpc_call_default_attn_func; + call->app_error_func = rxrpc_call_default_error_func; + call->app_aemap_func = rxrpc_call_default_aemap_func; + call->app_scr_alloc = call->app_scratch; + + call->cjif = jiffies; + + _leave(" = 0 (%p)",call); + + *_call = call; + + return 0; +} /* end __rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for outgoing calls + */ +int rxrpc_create_call(struct rxrpc_connection *conn, + rxrpc_call_attn_func_t attn, + rxrpc_call_error_func_t error, + rxrpc_call_aemap_func_t aemap, + struct rxrpc_call **_call) +{ + DECLARE_WAITQUEUE(myself,current); + + struct rxrpc_call *call; + int ret, cix, loop; + + _enter("%p",conn); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn,&call); + if (ret<0) { + _leave(" = %d",ret); + return ret; + } + + call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; + if (attn) call->app_attn_func = attn; + if (error) call->app_error_func = error; + if (aemap) call->app_aemap_func = aemap; + + _state(call); + + spin_lock(&conn->lock); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&conn->chanwait,&myself); + + try_again: + /* try to find an unused channel */ + for (cix=0; cix<4; cix++) + if (!conn->channels[cix]) + goto obtained_chan; + + /* no free channels - wait for one to become available */ + ret = -EINTR; + if (signal_pending(current)) + goto error_unwait; + + spin_unlock(&conn->lock); + + schedule(); + set_current_state(TASK_INTERRUPTIBLE); + + spin_lock(&conn->lock); + goto try_again; + + /* got a channel - now attach to the connection */ + obtained_chan: + remove_wait_queue(&conn->chanwait,&myself); + set_current_state(TASK_RUNNING); + + /* concoct a unique call number */ + next_callid: + call->call_id = htonl(++conn->call_counter); + for (loop=0; loop<4; loop++) + if (conn->channels[loop] && conn->channels[loop]->call_id==call->call_id) + goto next_callid; + + rxrpc_get_connection(conn); + conn->channels[cix] = call; /* assign _after_ done callid check loop */ + do_gettimeofday(&conn->atime); + call->chan_ix = htonl(cix); + + spin_unlock(&conn->lock); + + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link,&rxrpc_calls); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + + _leave(" = 0 (call=%p cix=%u)",call,cix); + return 0; + + error_unwait: + remove_wait_queue(&conn->chanwait,&myself); + set_current_state(TASK_RUNNING); + spin_unlock(&conn->lock); + + free_page((unsigned long)call); + _leave(" = %d",ret); + return ret; + +} /* end rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for incoming calls + */ +int rxrpc_incoming_call(struct rxrpc_connection *conn, + struct rxrpc_message *msg, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + unsigned cix; + int ret; + + cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; + + _enter("%p,%u,%u",conn,ntohl(msg->hdr.callNumber),cix); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn,&call); + if (ret<0) { + _leave(" = %d",ret); + return ret; + } + + call->pkt_rcv_count = 1; + call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; + call->app_mark = sizeof(u32); + + _state(call); + + /* attach to the connection */ + ret = -EBUSY; + call->chan_ix = htonl(cix); + call->call_id = msg->hdr.callNumber; + + spin_lock(&conn->lock); + + if (!conn->channels[cix]) { + conn->channels[cix] = call; + rxrpc_get_connection(conn); + ret = 0; + } + + spin_unlock(&conn->lock); + + if (ret<0) free_page((unsigned long)call); + + _leave(" = %p",call); + + if (ret==0) { + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link,&rxrpc_calls); + up_write(&rxrpc_calls_sem); + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + } + + return ret; +} /* end rxrpc_incoming_call() */ + +/*****************************************************************************/ +/* + * free a call record + */ +void rxrpc_put_call(struct rxrpc_call *call) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + + _enter("%p{u=%d}",call,atomic_read(&call->usage)); + + /* sanity check */ + if (atomic_read(&call->usage)<=0) + BUG(); + + /* to prevent a race, the decrement and the de-list must be effectively atomic */ + spin_lock(&conn->lock); + if (likely(!atomic_dec_and_test(&call->usage))) { + spin_unlock(&conn->lock); + _leave(""); + return; + } + + conn->channels[ntohl(call->chan_ix)] = NULL; + + spin_unlock(&conn->lock); + + wake_up(&conn->chanwait); + + rxrpc_put_connection(conn); + + /* clear the timers and dequeue from krxiod */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + rxrpc_krxiod_dequeue_call(call); + + /* clean up the contents of the struct */ + if (call->snd_nextmsg) + rxrpc_put_message(call->snd_nextmsg); + + if (call->snd_ping) + rxrpc_put_message(call->snd_ping); + + while (!list_empty(&call->acks_pendq)) { + msg = list_entry(call->acks_pendq.next,struct rxrpc_message,link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->rcv_receiveq)) { + msg = list_entry(call->rcv_receiveq.next,struct rxrpc_message,link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_readyq)) { + msg = list_entry(call->app_readyq.next,struct rxrpc_message,link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_unreadyq)) { + msg = list_entry(call->app_unreadyq.next,struct rxrpc_message,link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + if (call->owner) __MOD_DEC_USE_COUNT(call->owner); + + down_write(&rxrpc_calls_sem); + list_del(&call->call_link); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_dec(&rxrpc_call_count)); + free_page((unsigned long)call); + + _leave(" [destroyed]"); +} /* end rxrpc_put_call() */ + +/*****************************************************************************/ +/* + * actually generate a normal ACK + */ +static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, rxrpc_seq_t seq) +{ + struct rxrpc_message *msg; + struct iovec diov[3]; + unsigned aux[4]; + int delta, ret; + + /* ACKs default to DELAY */ + if (!call->ackr.reason) + call->ackr.reason = RXRPC_ACK_DELAY; + + _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + jiffies - call->cjif, + ntohs(call->ackr.maxSkew), + ntohl(call->ackr.firstPacket), + ntohl(call->ackr.previousPacket), + ntohl(call->ackr.serial), + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ + aux[1] = htonl(1444); /* max MTU */ + aux[2] = htonl(16); /* rwind */ + aux[3] = htonl(4); /* max packets */ + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &call->ackr; + diov[1].iov_len = (call->ackr_pend_cnt+3); + diov[1].iov_base = call->ackr_array; + diov[2].iov_len = sizeof(aux); + diov[2].iov_base = &aux; + + /* build and send the message */ + ret = rxrpc_conn_newmsg(call->conn,call,RXRPC_PACKET_TYPE_ACK,3,diov,GFP_KERNEL,&msg); + if (ret<0) + goto out; + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + ret = rxrpc_conn_sendmsg(call->conn,msg); + rxrpc_put_message(msg); + if (ret<0) + goto out; + call->pkt_snd_count++; + + /* count how many actual ACKs there were at the front */ + for (delta=0; delta<call->ackr_pend_cnt; delta++) + if (call->ackr_array[delta]!=RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ + + /* crank the ACK window around */ + if (delta==0) { + /* un-ACK'd window */ + } + else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { + /* partially ACK'd window + * - shuffle down to avoid losing out-of-sequence packets + */ + call->ackr_win_bot += delta; + call->ackr_win_top += delta; + + memmove(&call->ackr_array[0], + &call->ackr_array[delta], + call->ackr_pend_cnt); + + memset(&call->ackr_array[call->ackr_pend_cnt], + RXRPC_ACK_TYPE_NACK, + sizeof(call->ackr_array) - call->ackr_pend_cnt); + } + else { + /* fully ACK'd window + * - just clear the whole thing + */ + memset(&call->ackr_array,RXRPC_ACK_TYPE_NACK,sizeof(call->ackr_array)); + } + + /* clear this ACK */ + memset(&call->ackr,0,sizeof(call->ackr)); + + out: + if (!call->app_call_state) printk("___ STATE 0 ___\n"); + return ret; +} /* end __rxrpc_call_gen_normal_ACK() */ + +/*****************************************************************************/ +/* + * note the reception of a packet in the call's ACK records and generate an appropriate ACK packet + * if necessary + * - returns 0 if packet should be processed, 1 if packet should be ignored and -ve on an error + */ +static int rxrpc_call_generate_ACK(struct rxrpc_call *call, + struct rxrpc_header *hdr, + struct rxrpc_ackpacket *ack) +{ + struct rxrpc_message *msg; + rxrpc_seq_t seq; + unsigned offset; + int ret = 0, err; + u8 special_ACK, do_ACK, force; + + _enter("%p,%p { seq=%d tp=%d fl=%02x }",call,hdr,ntohl(hdr->seq),hdr->type,hdr->flags); + + seq = ntohl(hdr->seq); + offset = seq - call->ackr_win_bot; + do_ACK = RXRPC_ACK_DELAY; + special_ACK = 0; + force = (seq==1); + + if (call->ackr_high_seq < seq) + call->ackr_high_seq = seq; + + /* deal with generation of obvious special ACKs first */ + if (ack && ack->reason==RXRPC_ACK_PING) { + special_ACK = RXRPC_ACK_PING_RESPONSE; + ret = 1; + goto gen_ACK; + } + + if (seq < call->ackr_win_bot) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + if (seq >= call->ackr_win_top) { + special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; + ret = 1; + goto gen_ACK; + } + + if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + /* okay... it's a normal data packet inside the ACK window */ + call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; + + if (offset<call->ackr_pend_cnt) { + } + else if (offset>call->ackr_pend_cnt) { + do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; + call->ackr_pend_cnt = offset; + goto gen_ACK; + } + + if (hdr->flags & RXRPC_REQUEST_ACK) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* generate an ACK on the final packet of a reply just received */ + if (hdr->flags & RXRPC_LAST_PACKET) { + if (call->conn->out_clientflag) + force = 1; + } + else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* re-ACK packets previously received out-of-order */ + for (offset++; offset<RXRPC_CALL_ACK_WINDOW_SIZE; offset++) + if (call->ackr_array[offset]!=RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt = offset; + + /* generate an ACK if we fill up the window */ + if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) + force = 1; + + gen_ACK: + _debug("%05lu ACKs pend=%u norm=%s special=%s%s", + jiffies - call->cjif, + call->ackr_pend_cnt,rxrpc_acks[do_ACK],rxrpc_acks[special_ACK], + force ? " immediate" : + do_ACK==RXRPC_ACK_REQUESTED ? " merge-req" : + hdr->flags & RXRPC_LAST_PACKET ? " finalise" : + " defer" + ); + + /* send any pending normal ACKs if need be */ + if (call->ackr_pend_cnt>0) { + /* fill out the appropriate form */ + call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,65535U)); + call->ackr.firstPacket = htonl(call->ackr_win_bot); + call->ackr.previousPacket = call->ackr_prev_seq; + call->ackr.serial = hdr->serial; + call->ackr.nAcks = call->ackr_pend_cnt; + + if (do_ACK==RXRPC_ACK_REQUESTED) + call->ackr.reason = do_ACK; + + /* generate the ACK immediately if necessary */ + if (special_ACK || force) { + err = __rxrpc_call_gen_normal_ACK(call,do_ACK==RXRPC_ACK_DELAY ? 0 : seq); + if (err<0) { + ret = err; + goto out; + } + } + } + + if (call->ackr.reason==RXRPC_ACK_REQUESTED) + call->ackr_dfr_seq = seq; + + /* start the ACK timer if not running if there are any pending deferred ACKs */ + if (call->ackr_pend_cnt>0 && + call->ackr.reason!=RXRPC_ACK_REQUESTED && + !timer_pending(&call->ackr_dfr_timo) + ) { + unsigned long timo; + + timo = rxrpc_call_dfr_ack_timeout + jiffies; + + _debug("START ACKR TIMER for cj=%lu",timo-call->cjif); + + spin_lock(&call->lock); + mod_timer(&call->ackr_dfr_timo,timo); + spin_unlock(&call->lock); + } + else if ((call->ackr_pend_cnt==0 || call->ackr.reason==RXRPC_ACK_REQUESTED) && + timer_pending(&call->ackr_dfr_timo) + ) { + /* stop timer if no pending ACKs */ + _debug("CLEAR ACKR TIMER"); + del_timer_sync(&call->ackr_dfr_timo); + } + + /* send a special ACK if one is required */ + if (special_ACK) { + struct rxrpc_ackpacket ack; + struct iovec diov[2]; + u8 acks[1] = { RXRPC_ACK_TYPE_ACK }; + + /* fill out the appropriate form */ + ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + ack.maxSkew = htons(min(call->ackr_high_seq - seq,65535U)); + ack.firstPacket = htonl(call->ackr_win_bot); + ack.previousPacket = call->ackr_prev_seq; + ack.serial = hdr->serial; + ack.reason = special_ACK; + ack.nAcks = 0; + //ack.nAcks = special_ACK==RXRPC_ACK_OUT_OF_SEQUENCE ? 0 : hdr->seq ? 1 : 0; + + _proto("Rx Sending s-ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + ntohs(ack.maxSkew),ntohl(ack.firstPacket),ntohl(ack.previousPacket), + ntohl(ack.serial),rxrpc_acks[ack.reason],ack.nAcks); + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &ack; + diov[1].iov_len = sizeof(acks); + diov[1].iov_base = acks; + + /* build and send the message */ + err = rxrpc_conn_newmsg(call->conn,call,RXRPC_PACKET_TYPE_ACK, + hdr->seq ? 2 : 1,diov, + GFP_KERNEL, + &msg); + if (err<0) { + ret = err; + goto out; + } + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + err = rxrpc_conn_sendmsg(call->conn,msg); + rxrpc_put_message(msg); + if (err<0) { + ret = err; + goto out; + } + call->pkt_snd_count++; + } + + out: + if (hdr->seq) + call->ackr_prev_seq = hdr->seq; + + _leave(" = %d",ret); + return ret; +} /* end rxrpc_call_generate_ACK() */ + +/*****************************************************************************/ +/* + * handle work to be done on a call + * - includes packet reception and timeout processing + */ +void rxrpc_call_do_stuff(struct rxrpc_call *call) +{ + _enter("%p{flags=%lx}",call,call->flags); + + /* handle packet reception */ + if (call->flags & RXRPC_CALL_RCV_PKT) { + _debug("- receive packet"); + call->flags &= ~RXRPC_CALL_RCV_PKT; + rxrpc_call_receive_packet(call); + } + + /* handle overdue ACKs */ + if (call->flags & RXRPC_CALL_ACKS_TIMO) { + _debug("- overdue ACK timeout"); + call->flags &= ~RXRPC_CALL_ACKS_TIMO; + rxrpc_call_resend(call,call->snd_seq_count); + } + + /* handle lack of reception */ + if (call->flags & RXRPC_CALL_RCV_TIMO) { + _debug("- reception timeout"); + call->flags &= ~RXRPC_CALL_RCV_TIMO; + rxrpc_call_abort(call,-EIO); + } + + /* handle deferred ACKs */ + if (call->flags & RXRPC_CALL_ACKR_TIMO || + (call->ackr.nAcks>0 && call->ackr.reason==RXRPC_ACK_REQUESTED) + ) { + _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", + jiffies - call->cjif, + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + call->flags &= ~RXRPC_CALL_ACKR_TIMO; + + if (call->ackr.nAcks>0 && call->app_call_state!=RXRPC_CSTATE_ERROR) { + /* generate ACK */ + __rxrpc_call_gen_normal_ACK(call,call->ackr_dfr_seq); + call->ackr_dfr_seq = 0; + } + } + + _leave(""); + +} /* end rxrpc_call_do_timeout() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - must be called with call->lock held + * - the supplied error code is sent as the packet data + */ +static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + struct iovec diov[1]; + int ret; + u32 _error; + + _enter("%p{%08x},%p{%d},%d",conn,ntohl(conn->conn_id),call,ntohl(call->call_id),errno); + + /* if this call is already aborted, then just wake up any waiters */ + if (call->app_call_state==RXRPC_CSTATE_ERROR) { + spin_unlock(&call->lock); + call->app_error_func(call); + _leave(" = 0"); + return 0; + } + + rxrpc_get_call(call); + + /* change the state _with_ the lock still held */ + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; + call->app_errno = errno; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + _state(call); + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + + spin_unlock(&call->lock); + + /* flush any outstanding ACKs */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + if (rxrpc_call_is_ack_pending(call)) + __rxrpc_call_gen_normal_ACK(call,0); + + /* send the abort packet only if we actually traded some other packets */ + ret = 0; + if (call->pkt_snd_count || call->pkt_rcv_count) { + /* actually send the abort */ + _proto("Rx Sending Call ABORT { data=%d }",call->app_abort_code); + + _error = htonl(call->app_abort_code); + + diov[0].iov_len = sizeof(_error); + diov[0].iov_base = &_error; + + ret = rxrpc_conn_newmsg(conn,call,RXRPC_PACKET_TYPE_ABORT,1,diov,GFP_KERNEL,&msg); + if (ret==0) { + ret = rxrpc_conn_sendmsg(conn,msg); + rxrpc_put_message(msg); + } + } + + /* tell the app layer to let go */ + call->app_error_func(call); + + rxrpc_put_call(call); + + _leave(" = %d",ret); + + return ret; +} /* end __rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - the supplied error code is sent as the packet data + */ +int rxrpc_call_abort(struct rxrpc_call *call, int error) +{ + spin_lock(&call->lock); + + return __rxrpc_call_abort(call,error); + +} /* end rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * process packets waiting for this call + */ +static void rxrpc_call_receive_packet(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + struct list_head *_p; + u32 data32; + + _enter("%p",call); + + rxrpc_get_call(call); /* must not go away too soon if aborted by app-layer */ + + while (!list_empty(&call->rcv_receiveq)) { + /* try to get next packet */ + _p = NULL; + spin_lock(&call->lock); + if (!list_empty(&call->rcv_receiveq)) { + _p = call->rcv_receiveq.next; + list_del_init(_p); + } + spin_unlock(&call->lock); + + if (!_p) break; + + msg = list_entry(_p,struct rxrpc_message,link); + + _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", + jiffies - call->cjif, + rxrpc_pkts[msg->hdr.type], + ntohl(msg->hdr.serial), + msg->seq, + msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', + msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', + msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', + msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', + msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' + ); + + switch (msg->hdr.type) { + /* deal with data packets */ + case RXRPC_PACKET_TYPE_DATA: + /* ACK the packet if necessary */ + switch (rxrpc_call_generate_ACK(call,&msg->hdr,NULL)) { + case 0: /* useful packet */ + rxrpc_call_receive_data_packet(call,msg); + break; + case 1: /* duplicate or out-of-window packet */ + break; + default: + rxrpc_put_message(msg); + goto out; + } + break; + + /* deal with ACK packets */ + case RXRPC_PACKET_TYPE_ACK: + rxrpc_call_receive_ack_packet(call,msg); + break; + + /* deal with abort packets */ + case RXRPC_PACKET_TYPE_ABORT: + data32 = 0; + if (skb_copy_bits(msg->pkt,msg->offset,&data32,sizeof(data32))<0) { + printk("Rx Received short ABORT packet\n"); + } + else { + data32 = ntohl(data32); + } + + _proto("Rx Received Call ABORT { data=%d }",data32); + + spin_lock(&call->lock); + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_PEER_ABORT; + call->app_abort_code = data32; + call->app_errno = -ECONNABORTED; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + _state(call); + spin_unlock(&call->lock); + call->app_error_func(call); + break; + + default: + /* deal with other packet types */ + _proto("Rx Unsupported packet type %u (#%u)",msg->hdr.type,msg->seq); + break; + } + + rxrpc_put_message(msg); + } + + out: + rxrpc_put_call(call); + _leave(""); +} /* end rxrpc_call_receive_packet() */ + +/*****************************************************************************/ +/* + * process next data packet + * - as the next data packet arrives: + * - it is queued on app_readyq _if_ it is the next one expected (app_ready_seq+1) + * - it is queued on app_unreadyq _if_ it is not the next one expected + * - if a packet placed on app_readyq completely fills a hole leading up to the first packet + * on app_unreadyq, then packets now in sequence are tranferred to app_readyq + * - the application layer can only see packets on app_readyq (app_ready_qty bytes) + * - the application layer is prodded every time a new packet arrives + */ +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg) +{ + const struct rxrpc_operation *optbl, *op; + struct rxrpc_message *pmsg; + struct list_head *_p; + int ret, lo, hi, rmtimo; + u32 opid; + + _enter("%p{%u},%p{%u}",call,ntohl(call->call_id),msg,msg->seq); + + rxrpc_get_message(msg); + + /* add to the unready queue if we'd have to create a hole in the ready queue otherwise */ + if (msg->seq != call->app_ready_seq+1) { + _debug("Call add packet %d to unreadyq",msg->seq); + + /* insert in seq order */ + list_for_each(_p,&call->app_unreadyq) { + pmsg = list_entry(_p,struct rxrpc_message,link); + if (pmsg->seq>msg->seq) + break; + } + + list_add_tail(&msg->link,_p); + + _leave(" [unreadyq]"); + return; + } + + /* next in sequence - simply append into the call's ready queue */ + _debug("Call add packet %d to readyq (+%d => %d bytes)", + msg->seq,msg->dsize,call->app_ready_qty); + + spin_lock(&call->lock); + call->app_ready_seq = msg->seq; + call->app_ready_qty += msg->dsize; + list_add_tail(&msg->link,&call->app_readyq); + + /* move unready packets to the readyq if we got rid of a hole */ + while (!list_empty(&call->app_unreadyq)) { + pmsg = list_entry(call->app_unreadyq.next,struct rxrpc_message,link); + + if (pmsg->seq != call->app_ready_seq+1) + break; + + /* next in sequence - just move list-to-list */ + _debug("Call transfer packet %d to readyq (+%d => %d bytes)", + pmsg->seq,pmsg->dsize,call->app_ready_qty); + + call->app_ready_seq = pmsg->seq; + call->app_ready_qty += pmsg->dsize; + list_del_init(&pmsg->link); + list_add_tail(&pmsg->link,&call->app_readyq); + } + + /* see if we've got the last packet yet */ + if (!list_empty(&call->app_readyq)) { + pmsg = list_entry(call->app_readyq.prev,struct rxrpc_message,link); + if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_last_rcv = 1; + _debug("Last packet on readyq"); + } + } + + switch (call->app_call_state) { + /* do nothing if call already aborted */ + case RXRPC_CSTATE_ERROR: + spin_unlock(&call->lock); + _leave(" [error]"); + return; + + /* extract the operation ID from an incoming call if that's not yet been done */ + case RXRPC_CSTATE_SRVR_RCV_OPID: + spin_unlock(&call->lock); + + /* handle as yet insufficient data for the operation ID */ + if (call->app_ready_qty<4) { + if (call->app_last_rcv) + rxrpc_call_abort(call,-EINVAL); /* trouble - last packet seen */ + + _leave(""); + return; + } + + /* pull the operation ID out of the buffer */ + ret = rxrpc_call_read_data(call,&opid,sizeof(opid),0); + if (ret<0) { + printk("Unexpected error from read-data: %d\n",ret); + if (call->app_call_state!=RXRPC_CSTATE_ERROR) + rxrpc_call_abort(call,ret); + _leave(""); + return; + } + call->app_opcode = ntohl(opid); + + /* locate the operation in the available ops table */ + optbl = call->conn->service->ops_begin; + lo = 0; + hi = call->conn->service->ops_end - optbl; + + while (lo<hi) { + int mid = (hi+lo) / 2; + op = &optbl[mid]; + if (call->app_opcode==op->id) + goto found_op; + if (call->app_opcode>op->id) + lo = mid+1; + else + hi = mid; + } + + /* search failed */ + kproto("Rx Client requested operation %d from %s service", + call->app_opcode,call->conn->service->name); + rxrpc_call_abort(call,-EINVAL); + _leave(" [inval]"); + return; + + found_op: + _proto("Rx Client requested operation %s from %s service", + op->name,call->conn->service->name); + + /* we're now waiting for the argument block (unless the call was aborted) */ + spin_lock(&call->lock); + if (call->app_call_state==RXRPC_CSTATE_SRVR_RCV_OPID || + call->app_call_state==RXRPC_CSTATE_SRVR_SND_REPLY) { + if (!call->app_last_rcv) + call->app_call_state = RXRPC_CSTATE_SRVR_RCV_ARGS; + else if (call->app_ready_qty>0) + call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; + else + call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; + call->app_mark = op->asize; + call->app_user = op->user; + } + spin_unlock(&call->lock); + + _state(call); + break; + + case RXRPC_CSTATE_SRVR_RCV_ARGS: + /* change state if just received last packet of arg block */ + if (call->app_last_rcv) + call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; + spin_unlock(&call->lock); + + _state(call); + break; + + case RXRPC_CSTATE_CLNT_RCV_REPLY: + /* change state if just received last packet of reply block */ + rmtimo = 0; + if (call->app_last_rcv) { + call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; + rmtimo = 1; + } + spin_unlock(&call->lock); + + if (rmtimo) { + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + } + + _state(call); + break; + + default: + /* deal with data reception in an unexpected state */ + printk("Unexpected state [[[ %u ]]]\n",call->app_call_state); + __rxrpc_call_abort(call,-EBADMSG); + _leave(""); + return; + } + + if (call->app_call_state==RXRPC_CSTATE_CLNT_RCV_REPLY && call->app_last_rcv) + BUG(); + + /* otherwise just invoke the data function whenever we can satisfy its desire for more + * data + */ + _proto("Rx Received Op Data: st=%u qty=%u mk=%u%s", + call->app_call_state,call->app_ready_qty,call->app_mark, + call->app_last_rcv ? " last-rcvd" : ""); + + spin_lock(&call->lock); + + ret = __rxrpc_call_read_data(call); + switch (ret) { + case 0: + spin_unlock(&call->lock); + call->app_attn_func(call); + break; + case -EAGAIN: + spin_unlock(&call->lock); + break; + case -ECONNABORTED: + spin_unlock(&call->lock); + break; + default: + __rxrpc_call_abort(call,ret); + break; + } + + _state(call); + + _leave(""); + +} /* end rxrpc_call_receive_data_packet() */ + +/*****************************************************************************/ +/* + * received an ACK packet + */ +static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg) +{ + struct rxrpc_ackpacket ack; + rxrpc_serial_t serial; + rxrpc_seq_t seq; + int ret; + + _enter("%p{%u},%p{%u}",call,ntohl(call->call_id),msg,msg->seq); + + /* extract the basic ACK record */ + if (skb_copy_bits(msg->pkt,msg->offset,&ack,sizeof(ack))<0) { + printk("Rx Received short ACK packet\n"); + return; + } + msg->offset += sizeof(ack); + + serial = ack.serial; + seq = ntohl(ack.firstPacket); + + _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", + ntohl(msg->hdr.serial), + ntohs(ack.bufferSpace), + ntohs(ack.maxSkew), + seq, + ntohl(ack.previousPacket), + ntohl(serial), + rxrpc_acks[ack.reason], + call->ackr.nAcks + ); + + /* check the other side isn't ACK'ing a sequence number I haven't sent yet */ + if (ack.nAcks>0 && (seq > call->snd_seq_count || seq+ack.nAcks-1 > call->snd_seq_count)) { + printk("Received ACK (#%u-#%u) for unsent packet\n",seq,seq+ack.nAcks-1); + rxrpc_call_abort(call,-EINVAL); + _leave(""); + return; + } + + /* deal with RTT calculation */ + if (serial) { + struct rxrpc_message *rttmsg; + + /* find the prompting packet */ + spin_lock(&call->lock); + if (call->snd_ping && call->snd_ping->hdr.serial==serial) { + /* it was a ping packet */ + rttmsg = call->snd_ping; + call->snd_ping = NULL; + spin_unlock(&call->lock); + + if (rttmsg) { + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt(call->conn->peer,rttmsg,msg); + rxrpc_put_message(rttmsg); + } + } + else { + struct list_head *_p; + + /* it ought to be a data packet - look in the pending ACK list */ + list_for_each(_p,&call->acks_pendq) { + rttmsg = list_entry(_p,struct rxrpc_message,link); + if (rttmsg->hdr.serial==serial) { + if (rttmsg->rttdone) + break; /* never do RTT twice without resending */ + + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt(call->conn->peer,rttmsg,msg); + break; + } + } + spin_unlock(&call->lock); + } + } + + switch (ack.reason) { + /* deal with negative/positive acknowledgement of data packets */ + case RXRPC_ACK_REQUESTED: + case RXRPC_ACK_DELAY: + case RXRPC_ACK_IDLE: + rxrpc_call_definitively_ACK(call,seq-1); + + case RXRPC_ACK_DUPLICATE: + case RXRPC_ACK_OUT_OF_SEQUENCE: + case RXRPC_ACK_EXCEEDS_WINDOW: + call->snd_resend_cnt = 0; + ret = rxrpc_call_record_ACK(call,msg,seq,ack.nAcks); + if (ret<0) + rxrpc_call_abort(call,ret); + break; + + /* respond to ping packets immediately */ + case RXRPC_ACK_PING: + rxrpc_call_generate_ACK(call,&msg->hdr,&ack); + break; + + /* only record RTT on ping response packets */ + case RXRPC_ACK_PING_RESPONSE: + if (call->snd_ping) { + struct rxrpc_message *rttmsg; + + /* only do RTT stuff if the response matches the retained ping */ + rttmsg = NULL; + spin_lock(&call->lock); + if (call->snd_ping && call->snd_ping->hdr.serial==ack.serial) { + rttmsg = call->snd_ping; + call->snd_ping = NULL; + } + spin_unlock(&call->lock); + + if (rttmsg) { + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt(call->conn->peer,rttmsg,msg); + rxrpc_put_message(rttmsg); + } + } + break; + + default: + printk("Unsupported ACK reason %u\n",ack.reason); + break; + } + + _leave(""); +} /* end rxrpc_call_receive_ack_packet() */ + +/*****************************************************************************/ +/* + * record definitive ACKs for all messages up to and including the one with the 'highest' seq + */ +static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t highest) +{ + struct rxrpc_message *msg; + int now_complete; + + _enter("%p{ads=%u},%u",call,call->acks_dftv_seq,highest); + + while (call->acks_dftv_seq<highest) { + call->acks_dftv_seq++; + + _proto("Definitive ACK on packet #%u",call->acks_dftv_seq); + + /* discard those at front of queue until message with highest ACK is found */ + spin_lock(&call->lock); + msg = NULL; + if (!list_empty(&call->acks_pendq)) { + msg = list_entry(call->acks_pendq.next,struct rxrpc_message,link); + list_del_init(&msg->link); /* dequeue */ + if (msg->state==RXRPC_MSG_SENT) + call->acks_pend_cnt--; + } + spin_unlock(&call->lock); + + /* insanity check */ + if (!msg) + panic("%s(): acks_pendq unexpectedly empty\n",__FUNCTION__); + + if (msg->seq!=call->acks_dftv_seq) + panic("%s(): Packet #%u expected at front of acks_pendq (#%u found)\n", + __FUNCTION__,call->acks_dftv_seq,msg->seq); + + /* discard the message */ + msg->state = RXRPC_MSG_DONE; + rxrpc_put_message(msg); + } + + /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ + now_complete = 0; + spin_lock(&call->lock); + if (call->acks_dftv_seq==call->snd_seq_count) { + if (call->app_call_state!=RXRPC_CSTATE_COMPLETE) { + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + now_complete = 1; + } + } + spin_unlock(&call->lock); + + if (now_complete) { + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + call->app_attn_func(call); + } + + _leave(""); +} /* end rxrpc_call_definitively_ACK() */ + +/*****************************************************************************/ +/* + * record the specified amount of ACKs/NAKs + */ +static int rxrpc_call_record_ACK(struct rxrpc_call *call, + struct rxrpc_message *msg, + rxrpc_seq_t seq, + size_t count) +{ + struct rxrpc_message *dmsg; + struct list_head *_p; + rxrpc_seq_t highest; + unsigned ix; + size_t chunk; + char resend, now_complete; + u8 acks[16]; + + _enter("%p{apc=%u ads=%u},%p,%u,%u", + call,call->acks_pend_cnt,call->acks_dftv_seq,msg,seq,count); + + /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order ACKs) */ + if (seq<=call->acks_dftv_seq) { + unsigned delta = call->acks_dftv_seq - seq; + + if (count<=delta) { + _leave(" = 0 [all definitively ACK'd]"); + return 0; + } + + seq += delta; + count -= delta; + msg->offset += delta; + } + + highest = seq + count - 1; + resend = 0; + while (count>0) { + /* extract up to 16 ACK slots at a time */ + chunk = min(count,sizeof(acks)); + count -= chunk; + + memset(acks,2,sizeof(acks)); + + if (skb_copy_bits(msg->pkt,msg->offset,&acks,chunk)<0) { + printk("Rx Received short ACK packet\n"); + _leave(" = -EINVAL"); + return -EINVAL; + } + msg->offset += chunk; + + /* check that the ACK set is valid */ + for (ix=0; ix<chunk; ix++) { + switch (acks[ix]) { + case RXRPC_ACK_TYPE_ACK: + break; + case RXRPC_ACK_TYPE_NACK: + resend = 1; + break; + default: + printk("Rx Received unsupported ACK state %u\n",acks[ix]); + _leave(" = -EINVAL"); + return -EINVAL; + } + } + + _proto("Rx ACK of packets #%u-#%u [%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", + seq,seq+chunk-1, + _acktype[acks[0x0]], + _acktype[acks[0x1]], + _acktype[acks[0x2]], + _acktype[acks[0x3]], + _acktype[acks[0x4]], + _acktype[acks[0x5]], + _acktype[acks[0x6]], + _acktype[acks[0x7]], + _acktype[acks[0x8]], + _acktype[acks[0x9]], + _acktype[acks[0xA]], + _acktype[acks[0xB]], + _acktype[acks[0xC]], + _acktype[acks[0xD]], + _acktype[acks[0xE]], + _acktype[acks[0xF]], + call->acks_pend_cnt + ); + + /* mark the packets in the ACK queue as being provisionally ACK'd */ + ix = 0; + spin_lock(&call->lock); + + /* find the first packet ACK'd/NAK'd here */ + list_for_each(_p,&call->acks_pendq) { + dmsg = list_entry(_p,struct rxrpc_message,link); + if (dmsg->seq==seq) + goto found_first; + _debug("- %u: skipping #%u",ix,dmsg->seq); + } + goto bad_queue; + + found_first: + do { + _debug("- %u: processing #%u (%c) apc=%u", + ix,dmsg->seq,_acktype[acks[ix]],call->acks_pend_cnt); + + if (acks[ix]==RXRPC_ACK_TYPE_ACK) { + if (dmsg->state==RXRPC_MSG_SENT) call->acks_pend_cnt--; + dmsg->state = RXRPC_MSG_ACKED; + } + else { + if (dmsg->state==RXRPC_MSG_ACKED) call->acks_pend_cnt++; + dmsg->state = RXRPC_MSG_SENT; + } + ix++; + seq++; + + _p = dmsg->link.next; + dmsg = list_entry(_p,struct rxrpc_message,link); + } while(ix<chunk && _p!=&call->acks_pendq && dmsg->seq==seq); + + if (ix<chunk) + goto bad_queue; + + spin_unlock(&call->lock); + } + + if (resend) + rxrpc_call_resend(call,highest); + + /* if all packets are provisionally ACK'd, then wake up anyone who's waiting for that */ + now_complete = 0; + spin_lock(&call->lock); + if (call->acks_pend_cnt==0) { + if (call->app_call_state==RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + } + now_complete = 1; + } + spin_unlock(&call->lock); + + if (now_complete) { + _debug("- wake up waiters"); + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + call->app_attn_func(call); + } + + _leave(" = 0 (apc=%u)",call->acks_pend_cnt); + return 0; + + bad_queue: + panic("%s(): acks_pendq in bad state (packet #%u absent)\n",__FUNCTION__,seq); + +} /* end rxrpc_call_record_ACK() */ + +/*****************************************************************************/ +/* + * transfer data from the ready packet queue to the asynchronous read buffer + * - since this func is the only one going to look at packets queued on app_readyq, we don't need + * a lock to modify or access them, only to modify the queue pointers + * - called with call->lock held + * - the buffer must be in kernel space + * - returns: + * 0 if buffer filled + * -EAGAIN if buffer not filled and more data to come + * -EBADMSG if last packet received and insufficient data left + * -ECONNABORTED if the call has in an error state + */ +static int __rxrpc_call_read_data(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + size_t qty; + int ret; + + _enter("%p{as=%d buf=%p qty=%u/%u}", + call,call->app_async_read,call->app_read_buf,call->app_ready_qty,call->app_mark); + + /* check the state */ + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_RCV_ARGS: + case RXRPC_CSTATE_CLNT_RCV_REPLY: + if (call->app_last_rcv) { + printk("%s(%p,%p,%d): Inconsistent call state (%s, last pkt)", + __FUNCTION__,call,call->app_read_buf,call->app_mark, + rxrpc_call_states[call->app_call_state]); + BUG(); + } + break; + + case RXRPC_CSTATE_SRVR_RCV_OPID: + case RXRPC_CSTATE_SRVR_GOT_ARGS: + case RXRPC_CSTATE_CLNT_GOT_REPLY: + break; + + case RXRPC_CSTATE_SRVR_SND_REPLY: + if (!call->app_last_rcv) { + printk("%s(%p,%p,%d): Inconsistent call state (%s, not last pkt)", + __FUNCTION__,call,call->app_read_buf,call->app_mark, + rxrpc_call_states[call->app_call_state]); + BUG(); + } + _debug("Trying to read data from call in SND_REPLY state"); + break; + + case RXRPC_CSTATE_ERROR: + _leave(" = -ECONNABORTED"); + return -ECONNABORTED; + + default: + printk("reading in unexpected state [[[ %u ]]]\n",call->app_call_state); + BUG(); + } + + /* handle the case of not having an async buffer */ + if (!call->app_async_read) { + if (call->app_mark==RXRPC_APP_MARK_EOF) { + ret = call->app_last_rcv ? 0 : -EAGAIN; + } + else { + if (call->app_mark >= call->app_ready_qty) { + call->app_mark = RXRPC_APP_MARK_EOF; + ret = 0; + } + else { + ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; + } + } + + _leave(" = %d [no buf]",ret); + return 0; + } + + while (!list_empty(&call->app_readyq) && call->app_mark>0) { + msg = list_entry(call->app_readyq.next,struct rxrpc_message,link); + + /* drag as much data as we need out of this packet */ + qty = min(call->app_mark,msg->dsize); + + _debug("reading %u from skb=%p off=%lu",qty,msg->pkt,msg->offset); + + if (call->app_read_buf) + if (skb_copy_bits(msg->pkt,msg->offset,call->app_read_buf,qty)<0) + panic("%s: Failed to copy data from packet: (%p,%p,%d)", + __FUNCTION__,call,call->app_read_buf,qty); + + /* if that packet is now empty, discard it */ + call->app_ready_qty -= qty; + msg->dsize -= qty; + + if (msg->dsize==0) { + list_del_init(&msg->link); + rxrpc_put_message(msg); + } + else { + msg->offset += qty; + } + + call->app_mark -= qty; + if (call->app_read_buf) call->app_read_buf += qty; + } + + if (call->app_mark==0) { + call->app_async_read = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + + /* adjust the state if used up all packets */ + if (list_empty(&call->app_readyq) && call->app_last_rcv) { + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_RCV_OPID: + call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; + call->app_mark = RXRPC_APP_MARK_EOF; + _state(call); + del_timer_sync(&call->rcv_timeout); + break; + case RXRPC_CSTATE_SRVR_GOT_ARGS: + call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; + _state(call); + del_timer_sync(&call->rcv_timeout); + break; + default: + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->ackr_dfr_timo); + del_timer_sync(&call->rcv_timeout); + break; + } + } + + _leave(" = 0"); + return 0; + } + + if (call->app_last_rcv) { + _debug("Insufficient data (%u/%u)",call->app_ready_qty,call->app_mark); + call->app_async_read = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + + _leave(" = -EBADMSG"); + return -EBADMSG; + } + + _leave(" = -EAGAIN"); + return -EAGAIN; +} /* end __rxrpc_call_read_data() */ + +/*****************************************************************************/ +/* + * attempt to read the specified amount of data from the call's ready queue into the buffer + * provided + * - since this func is the only one going to look at packets queued on app_readyq, we don't need + * a lock to modify or access them, only to modify the queue pointers + * - if the buffer pointer is NULL, then data is merely drained, not copied + * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is enough data or an + * error will be generated + * - note that the caller must have added the calling task to the call's wait queue beforehand + * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this function doesn't read + * all available data + */ +int rxrpc_call_read_data(struct rxrpc_call *call, void *buffer, size_t size, int flags) +{ + int ret; + + _enter("%p{arq=%u},%p,%d,%x",call,call->app_ready_qty,buffer,size,flags); + + spin_lock(&call->lock); + + if (unlikely(!!call->app_read_buf)) { + spin_unlock(&call->lock); + _leave(" = -EBUSY"); + return -EBUSY; + } + + call->app_mark = size; + call->app_read_buf = buffer; + call->app_async_read = 1; + call->app_read_count++; + + /* read as much data as possible */ + ret = __rxrpc_call_read_data(call); + switch (ret) { + case 0: + if (flags&RXRPC_CALL_READ_ALL && (!call->app_last_rcv || call->app_ready_qty>0)) { + _leave(" = -EBADMSG"); + __rxrpc_call_abort(call,-EBADMSG); + return -EBADMSG; + } + + spin_unlock(&call->lock); + call->app_attn_func(call); + _leave(" = 0"); + return ret; + + case -ECONNABORTED: + spin_unlock(&call->lock); + _leave(" = %d [aborted]",ret); + return ret; + + default: + __rxrpc_call_abort(call,ret); + _leave(" = %d",ret); + return ret; + + case -EAGAIN: + spin_unlock(&call->lock); + + if (!(flags&RXRPC_CALL_READ_BLOCK)) { + _leave(" = -EAGAIN"); + return -EAGAIN; + } + + /* wait for the data to arrive */ + _debug("blocking for data arrival"); + + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (!call->app_async_read || signal_pending(current)) + break; + schedule(); + } + set_current_state(TASK_RUNNING); + + if (signal_pending(current)) { + _leave(" = -EINTR"); + return -EINTR; + } + + if (call->app_call_state==RXRPC_CSTATE_ERROR) { + _leave(" = -ECONNABORTED"); + return -ECONNABORTED; + } + + _leave(" = 0"); + return 0; + } + +} /* end rxrpc_call_read_data() */ + +/*****************************************************************************/ +/* + * write data to a call + * - the data may not be sent immediately if it doesn't fill a buffer + * - if we can't queue all the data for buffering now, siov[] will have been adjusted to take + * account of what has been sent + */ +int rxrpc_call_write_data(struct rxrpc_call *call, + size_t sioc, + struct iovec siov[], + u8 rxhdr_flags, + int alloc_flags, + int dup_data, + size_t *size_sent) +{ + struct rxrpc_message *msg; + struct iovec *sptr; + size_t space, size, chunk, tmp; + char *buf; + int ret; + + _enter("%p,%u,%p,%02x,%x,%d,%p",call,sioc,siov,rxhdr_flags,alloc_flags,dup_data,size_sent); + + *size_sent = 0; + size = 0; + ret = -EINVAL; + + /* can't send more if we've sent last packet from this end */ + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_SND_REPLY: + case RXRPC_CSTATE_CLNT_SND_ARGS: + break; + case RXRPC_CSTATE_ERROR: + ret = call->app_errno; + default: + goto out; + } + + /* calculate how much data we've been given */ + sptr = siov; + for (; sioc>0; sptr++, sioc--) { + if (!sptr->iov_len) continue; + + if (!sptr->iov_base) + goto out; + + size += sptr->iov_len; + } + + _debug("- size=%u mtu=%u",size,call->conn->mtu_size); + + do { + /* make sure there's a message under construction */ + if (!call->snd_nextmsg) { + /* no - allocate a message with no data yet attached */ + ret = rxrpc_conn_newmsg(call->conn,call,RXRPC_PACKET_TYPE_DATA, + 0,NULL,alloc_flags,&call->snd_nextmsg); + if (ret<0) + goto out; + _debug("- allocated new message [ds=%u]",call->snd_nextmsg->dsize); + } + + msg = call->snd_nextmsg; + msg->hdr.flags |= rxhdr_flags; + + /* deal with zero-length terminal packet */ + if (size==0) { + if (rxhdr_flags & RXRPC_LAST_PACKET) { + ret = rxrpc_call_flush(call); + if (ret<0) + goto out; + } + break; + } + + /* work out how much space current packet has available */ + space = call->conn->mtu_size - msg->dsize; + chunk = min(space,size); + + _debug("- [before] space=%u chunk=%u",space,chunk); + + while (!siov->iov_len) + siov++; + + /* if we are going to have to duplicate the data then coalesce it too */ + if (dup_data) { + /* don't allocate more that 1 page at a time */ + if (chunk>PAGE_SIZE) + chunk = PAGE_SIZE; + + /* allocate a data buffer and attach to the message */ + buf = kmalloc(chunk,alloc_flags); + if (unlikely(!buf)) { + if (msg->dsize==sizeof(struct rxrpc_header)) { + /* discard an empty msg and wind back the seq counter */ + rxrpc_put_message(msg); + call->snd_nextmsg = NULL; + call->snd_seq_count--; + } + + ret = -ENOMEM; + goto out; + } + + tmp = msg->dcount++; + set_bit(tmp,&msg->dfree); + msg->data[tmp].iov_base = buf; + msg->data[tmp].iov_len = chunk; + msg->dsize += chunk; + *size_sent += chunk; + size -= chunk; + + /* load the buffer with data */ + while (chunk>0) { + tmp = min(chunk,siov->iov_len); + memcpy(buf,siov->iov_base,tmp); + buf += tmp; + siov->iov_base += tmp; + siov->iov_len -= tmp; + if (!siov->iov_len) + siov++; + chunk -= tmp; + } + } + else { + /* we want to attach the supplied buffers directly */ + while (chunk>0 && msg->dcount<RXRPC_MSG_MAX_IOCS) { + tmp = msg->dcount++; + msg->data[tmp].iov_base = siov->iov_base; + msg->data[tmp].iov_len = siov->iov_len; + msg->dsize += siov->iov_len; + *size_sent += siov->iov_len; + size -= siov->iov_len; + chunk -= siov->iov_len; + siov++; + } + } + + _debug("- [loaded] chunk=%u size=%u",chunk,size); + + /* dispatch the message when full, final or requesting ACK */ + if (msg->dsize>=call->conn->mtu_size || rxhdr_flags) { + ret = rxrpc_call_flush(call); + if (ret<0) + goto out; + } + + } while(size>0); + + ret = 0; + out: + _leave(" = %d (%d queued, %d rem)",ret,*size_sent,size); + return ret; + +} /* end rxrpc_call_write_data() */ + +/*****************************************************************************/ +/* + * flush outstanding packets to the network + */ +int rxrpc_call_flush(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + int ret = 0; + + _enter("%p",call); + + rxrpc_get_call(call); + + /* if there's a packet under construction, then dispatch it now */ + if (call->snd_nextmsg) { + msg = call->snd_nextmsg; + call->snd_nextmsg = NULL; + + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + msg->hdr.flags &= ~RXRPC_MORE_PACKETS; + msg->hdr.flags |= RXRPC_REQUEST_ACK; + } + else { + msg->hdr.flags |= RXRPC_MORE_PACKETS; + } + + _proto("Sending DATA message { ds=%u dc=%u df=%02lu }", + msg->dsize,msg->dcount,msg->dfree); + + /* queue and adjust call state */ + spin_lock(&call->lock); + list_add_tail(&msg->link,&call->acks_pendq); + + /* decide what to do depending on current state and if this is the last packet */ + ret = -EINVAL; + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_SND_REPLY: + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_call_state = RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; + _state(call); + } + break; + + case RXRPC_CSTATE_CLNT_SND_ARGS: + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_call_state = RXRPC_CSTATE_CLNT_RCV_REPLY; + _state(call); + } + break; + + case RXRPC_CSTATE_ERROR: + ret = call->app_errno; + default: + spin_unlock(&call->lock); + goto out; + } + + call->acks_pend_cnt++; + + mod_timer(&call->acks_timeout,jiffies + rxrpc_call_acks_timeout); + + spin_unlock(&call->lock); + + ret = rxrpc_conn_sendmsg(call->conn,msg); + if (ret==0) + call->pkt_snd_count++; + } + + out: + rxrpc_put_call(call); + + _leave(" = %d",ret); + return ret; + +} /* end rxrpc_call_flush() */ + +/*****************************************************************************/ +/* + * resend NAK'd or unacknowledged packets up to the highest one specified + */ +static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest) +{ + struct rxrpc_message *msg; + struct list_head *_p; + rxrpc_seq_t seq = 0; + + _enter("%p,%u",call,highest); + + _proto("Rx Resend required"); + + /* handle too many resends */ + if (call->snd_resend_cnt>=rxrpc_call_max_resend) { + _debug("Aborting due to too many resends (rcv=%d)",call->pkt_rcv_count); + rxrpc_call_abort(call,call->pkt_rcv_count>0?-EIO:-ETIMEDOUT); + _leave(""); + return; + } + + spin_lock(&call->lock); + call->snd_resend_cnt++; + for (;;) { + /* determine which the next packet we might need to ACK is */ + if (seq<=call->acks_dftv_seq) + seq = call->acks_dftv_seq; + seq++; + + if (seq>highest) + break; + + /* look for the packet in the pending-ACK queue */ + list_for_each(_p,&call->acks_pendq) { + msg = list_entry(_p,struct rxrpc_message,link); + if (msg->seq==seq) + goto found_msg; + } + + panic("%s(%p,%d): Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", + __FUNCTION__,call,highest,call->acks_dftv_seq,call->snd_seq_count,seq); + + found_msg: + if (msg->state!=RXRPC_MSG_SENT) + continue; /* only un-ACK'd packets */ + + rxrpc_get_message(msg); + spin_unlock(&call->lock); + + /* send each message again (and ignore any errors we might incur) */ + _proto("Resending DATA message { ds=%u dc=%u df=%02lu }", + msg->dsize,msg->dcount,msg->dfree); + + if (rxrpc_conn_sendmsg(call->conn,msg)==0) + call->pkt_snd_count++; + + rxrpc_put_message(msg); + + spin_lock(&call->lock); + } + + /* reset the timeout */ + mod_timer(&call->acks_timeout,jiffies + rxrpc_call_acks_timeout); + + spin_unlock(&call->lock); + + _leave(""); +} /* end rxrpc_call_resend() */ + +/*****************************************************************************/ +/* + * handle an ICMP error being applied to a call + */ +void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno) +{ + _enter("%p{%u},%d",call,ntohl(call->call_id),errno); + + /* if this call is already aborted, then just wake up any waiters */ + if (call->app_call_state==RXRPC_CSTATE_ERROR) { + call->app_error_func(call); + } + else { + /* tell the app layer what happened */ + spin_lock(&call->lock); + call->app_call_state = RXRPC_CSTATE_ERROR; + _state(call); + if (local) + call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; + else + call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; + call->app_errno = errno; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + /* map the error */ + call->app_aemap_func(call); + + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + spin_unlock(&call->lock); + + call->app_error_func(call); + } + + _leave(""); +} /* end rxrpc_call_handle_error() */ diff --git a/net/rxrpc/connection.c b/net/rxrpc/connection.c new file mode 100644 index 000000000000..e54dd472b5e4 --- /dev/null +++ b/net/rxrpc/connection.c @@ -0,0 +1,687 @@ +/* connection.c: Rx connection routines + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include <linux/udp.h> +#include <linux/ip.h> +#include <net/sock.h> +#include <asm/uaccess.h> +#include "internal.h" + +__RXACCT_DECL(atomic_t rxrpc_connection_count); + +LIST_HEAD(rxrpc_conns); +DECLARE_RWSEM(rxrpc_conns_sem); + +static void __rxrpc_conn_timeout(rxrpc_timer_t *timer) +{ + struct rxrpc_connection *conn = list_entry(timer,struct rxrpc_connection,timeout); + + _debug("Rx CONN TIMEOUT [%p{u=%d}]",conn,atomic_read(&conn->usage)); + + rxrpc_conn_do_timeout(conn); +} + +static const struct rxrpc_timer_ops rxrpc_conn_timer_ops = { + timed_out: __rxrpc_conn_timeout, +}; + +/*****************************************************************************/ +/* + * create a new connection record + */ +static inline int __rxrpc_create_connection(struct rxrpc_peer *peer, + struct rxrpc_connection **_conn) +{ + struct rxrpc_connection *conn; + + _enter("%p",peer); + + /* allocate and initialise a connection record */ + conn = kmalloc(sizeof(struct rxrpc_connection),GFP_KERNEL); + if (!conn) { + _leave(" = -ENOMEM"); + return -ENOMEM; + } + + memset(conn,0,sizeof(struct rxrpc_connection)); + atomic_set(&conn->usage,1); + + INIT_LIST_HEAD(&conn->link); + init_waitqueue_head(&conn->chanwait); + spin_lock_init(&conn->lock); + rxrpc_timer_init(&conn->timeout,&rxrpc_conn_timer_ops); + + do_gettimeofday(&conn->atime); + conn->mtu_size = 1024; + conn->peer = peer; + conn->trans = peer->trans; + + __RXACCT(atomic_inc(&rxrpc_connection_count)); + *_conn = conn; + _leave(" = 0 (%p)",conn); + + return 0; +} /* end __rxrpc_create_connection() */ + +/*****************************************************************************/ +/* + * create a new connection record for outgoing connections + */ +int rxrpc_create_connection(struct rxrpc_transport *trans, + u16 port, + u32 addr, + unsigned short service_id, + void *security, + struct rxrpc_connection **_conn) +{ + struct rxrpc_connection *conn; + struct rxrpc_peer *peer; + int ret; + + _enter("%p{%hu},%u,%hu",trans,trans->port,ntohs(port),service_id); + + /* get a peer record */ + ret = rxrpc_peer_lookup(trans,addr,&peer); + if (ret<0) { + _leave(" = %d",ret); + return ret; + } + + /* allocate and initialise a connection record */ + ret = __rxrpc_create_connection(peer,&conn); + if (ret<0) { + rxrpc_put_peer(peer); + _leave(" = %d",ret); + return ret; + } + + /* fill in the specific bits */ + conn->addr.sin_family = AF_INET; + conn->addr.sin_port = port; + conn->addr.sin_addr.s_addr = addr; + + conn->in_epoch = rxrpc_epoch; + conn->out_epoch = rxrpc_epoch; + conn->in_clientflag = 0; + conn->out_clientflag = RXRPC_CLIENT_INITIATED; + conn->conn_id = htonl((unsigned) conn & RXRPC_CIDMASK); + conn->service_id = htons(service_id); + + /* attach to peer */ + conn->peer = peer; + + write_lock(&peer->conn_lock); + list_add_tail(&conn->link,&peer->conn_active); + atomic_inc(&peer->conn_count); + write_unlock(&peer->conn_lock); + + down_write(&rxrpc_conns_sem); + list_add_tail(&conn->proc_link,&rxrpc_conns); + up_write(&rxrpc_conns_sem); + + *_conn = conn; + _leave(" = 0 (%p)",conn); + + return 0; +} /* end rxrpc_create_connection() */ + +/*****************************************************************************/ +/* + * lookup the connection for an incoming packet + * - create a new connection record for unrecorded incoming connections + */ +int rxrpc_connection_lookup(struct rxrpc_peer *peer, + struct rxrpc_message *msg, + struct rxrpc_connection **_conn) +{ + struct rxrpc_connection *conn, *candidate = NULL; + struct list_head *_p; + int ret, fresh = 0; + u32 x_epoch, x_connid; + u16 x_port, x_secix, x_servid; + u8 x_clflag; + + _enter("%p{{%hu}},%u,%hu", + peer,peer->trans->port,ntohs(msg->pkt->h.uh->source),ntohs(msg->hdr.serviceId)); + + x_port = msg->pkt->h.uh->source; + x_epoch = msg->hdr.epoch; + x_clflag = msg->hdr.flags & RXRPC_CLIENT_INITIATED; + x_connid = htonl(ntohl(msg->hdr.cid) & RXRPC_CIDMASK); + x_servid = msg->hdr.serviceId; + x_secix = msg->hdr.securityIndex; + + /* [common case] search the transport's active list first */ + read_lock(&peer->conn_lock); + list_for_each(_p,&peer->conn_active) { + conn = list_entry(_p,struct rxrpc_connection,link); + if (conn->addr.sin_port == x_port && + conn->in_epoch == x_epoch && + conn->conn_id == x_connid && + conn->security_ix == x_secix && + conn->service_id == x_servid && + conn->in_clientflag == x_clflag) + goto found_active; + } + read_unlock(&peer->conn_lock); + + /* [uncommon case] not active + * - create a candidate for a new record if an inbound connection + * - only examine the graveyard for an outbound connection + */ + if (x_clflag) { + ret = __rxrpc_create_connection(peer,&candidate); + if (ret<0) { + _leave(" = %d",ret); + return ret; + } + + /* fill in the specifics */ + candidate->addr.sin_family = AF_INET; + candidate->addr.sin_port = x_port; + candidate->addr.sin_addr.s_addr = msg->pkt->nh.iph->saddr; + candidate->in_epoch = x_epoch; + candidate->out_epoch = x_epoch; + candidate->in_clientflag = RXRPC_CLIENT_INITIATED; + candidate->out_clientflag = 0; + candidate->conn_id = x_connid; + candidate->service_id = x_servid; + candidate->security_ix = x_secix; + } + + /* search the active list again, just in case it appeared whilst we were busy */ + write_lock(&peer->conn_lock); + list_for_each(_p,&peer->conn_active) { + conn = list_entry(_p,struct rxrpc_connection,link); + if (conn->addr.sin_port == x_port && + conn->in_epoch == x_epoch && + conn->conn_id == x_connid && + conn->security_ix == x_secix && + conn->service_id == x_servid && + conn->in_clientflag == x_clflag) + goto found_active_second_chance; + } + + /* search the transport's graveyard list */ + spin_lock(&peer->conn_gylock); + list_for_each(_p,&peer->conn_graveyard) { + conn = list_entry(_p,struct rxrpc_connection,link); + if (conn->addr.sin_port == x_port && + conn->in_epoch == x_epoch && + conn->conn_id == x_connid && + conn->security_ix == x_secix && + conn->service_id == x_servid && + conn->in_clientflag == x_clflag) + goto found_in_graveyard; + } + spin_unlock(&peer->conn_gylock); + + /* outbound connections aren't created here */ + if (!x_clflag) { + write_unlock(&peer->conn_lock); + _leave(" = -ENOENT"); + return -ENOENT; + } + + /* we can now add the new candidate to the list */ + rxrpc_get_peer(peer); + conn = candidate; + candidate = NULL; + atomic_inc(&peer->conn_count); + fresh = 1; + + make_active: + list_add_tail(&conn->link,&peer->conn_active); + + success_uwfree: + write_unlock(&peer->conn_lock); + + if (candidate) { + __RXACCT(atomic_dec(&rxrpc_connection_count)); + kfree(candidate); + } + + if (fresh) { + down_write(&rxrpc_conns_sem); + list_add_tail(&conn->proc_link,&rxrpc_conns); + up_write(&rxrpc_conns_sem); + } + + success: + *_conn = conn; + _leave(" = 0 (%p)",conn); + return 0; + + /* handle the connection being found in the active list straight off */ + found_active: + rxrpc_get_connection(conn); + read_unlock(&peer->conn_lock); + goto success; + + /* handle resurrecting a connection from the graveyard */ + found_in_graveyard: + rxrpc_get_peer(peer); + rxrpc_get_connection(conn); + rxrpc_krxtimod_del_timer(&conn->timeout); + list_del_init(&conn->link); + spin_unlock(&peer->conn_gylock); + goto make_active; + + /* handle finding the connection on the second time through the active list */ + found_active_second_chance: + rxrpc_get_connection(conn); + goto success_uwfree; + +} /* end rxrpc_connection_lookup() */ + +/*****************************************************************************/ +/* + * finish using a connection record + * - it will be transferred to the peer's connection graveyard when refcount reaches 0 + */ +void rxrpc_put_connection(struct rxrpc_connection *conn) +{ + struct rxrpc_peer *peer = conn->peer; + + _enter("%p{u=%d p=%hu}",conn,atomic_read(&conn->usage),ntohs(conn->addr.sin_port)); + + /* sanity check */ + if (atomic_read(&conn->usage)<=0) + BUG(); + + spin_lock(&peer->conn_gylock); + if (likely(!atomic_dec_and_test(&conn->usage))) { + spin_unlock(&peer->conn_gylock); + _leave(""); + return; + } + + /* move to graveyard queue */ + list_del(&conn->link); + list_add_tail(&conn->link,&peer->conn_graveyard); + + /* discard in 100 secs */ + rxrpc_krxtimod_add_timer(&conn->timeout,20*HZ); + + spin_unlock(&peer->conn_gylock); + + rxrpc_put_peer(conn->peer); + + _leave(" [killed]"); +} /* end rxrpc_put_connection() */ + +/*****************************************************************************/ +/* + * free a connection record + */ +void rxrpc_conn_do_timeout(struct rxrpc_connection *conn) +{ + struct rxrpc_peer *peer; + + _enter("%p{u=%d p=%hu}",conn,atomic_read(&conn->usage),ntohs(conn->addr.sin_port)); + + peer = conn->peer; + + if (atomic_read(&conn->usage)<0) + BUG(); + + /* remove from graveyard if still dead */ + spin_lock(&peer->conn_gylock); + if (atomic_read(&conn->usage)==0) { + list_del_init(&conn->link); + } + else { + conn = NULL; + } + spin_unlock(&peer->conn_gylock); + + if (!conn) { + _leave(""); + return; /* resurrected */ + } + + _debug("--- Destroying Connection %p ---",conn); + + down_write(&rxrpc_conns_sem); + list_del(&conn->proc_link); + up_write(&rxrpc_conns_sem); + + __RXACCT(atomic_dec(&rxrpc_connection_count)); + kfree(conn); + + /* if the graveyard is now empty, wake up anyone waiting for that */ + if (atomic_dec_and_test(&peer->conn_count)) + wake_up(&peer->conn_gy_waitq); + + _leave(" [destroyed]"); +} /* end rxrpc_conn_do_timeout() */ + +/*****************************************************************************/ +/* + * clear all connection records from a peer endpoint + */ +void rxrpc_conn_clearall(struct rxrpc_peer *peer) +{ + DECLARE_WAITQUEUE(myself,current); + + struct rxrpc_connection *conn; + int err; + + _enter("%p",peer); + + /* there shouldn't be any active conns remaining */ + if (!list_empty(&peer->conn_active)) + BUG(); + + /* manually timeout all conns in the graveyard */ + spin_lock(&peer->conn_gylock); + while (!list_empty(&peer->conn_graveyard)) { + conn = list_entry(peer->conn_graveyard.next,struct rxrpc_connection,link); + err = rxrpc_krxtimod_del_timer(&conn->timeout); + spin_unlock(&peer->conn_gylock); + + if (err==0) + rxrpc_conn_do_timeout(conn); + + spin_lock(&peer->conn_gylock); + } + spin_unlock(&peer->conn_gylock); + + /* wait for the the conn graveyard to be completely cleared */ + set_current_state(TASK_UNINTERRUPTIBLE); + add_wait_queue(&peer->conn_gy_waitq,&myself); + + while (atomic_read(&peer->conn_count)!=0) { + schedule(); + set_current_state(TASK_UNINTERRUPTIBLE); + } + + remove_wait_queue(&peer->conn_gy_waitq,&myself); + set_current_state(TASK_RUNNING); + + _leave(""); + +} /* end rxrpc_conn_clearall() */ + +/*****************************************************************************/ +/* + * allocate and prepare a message for sending out through the transport endpoint + */ +int rxrpc_conn_newmsg(struct rxrpc_connection *conn, + struct rxrpc_call *call, + u8 type, + int dcount, + struct iovec diov[], + int alloc_flags, + struct rxrpc_message **_msg) +{ + struct rxrpc_message *msg; + int loop; + + _enter("%p{%d},%p,%u",conn,ntohs(conn->addr.sin_port),call,type); + + if (dcount>3) { + _leave(" = -EINVAL"); + return -EINVAL; + } + + msg = kmalloc(sizeof(struct rxrpc_message),alloc_flags); + if (!msg) { + _leave(" = -ENOMEM"); + return -ENOMEM; + } + + memset(msg,0,sizeof(*msg)); + atomic_set(&msg->usage,1); + + INIT_LIST_HEAD(&msg->link); + + msg->state = RXRPC_MSG_PREPARED; + + msg->hdr.epoch = conn->out_epoch; + msg->hdr.cid = conn->conn_id | (call ? call->chan_ix : 0); + msg->hdr.callNumber = call ? call->call_id : 0; + msg->hdr.type = type; + msg->hdr.flags = conn->out_clientflag; + msg->hdr.securityIndex = conn->security_ix; + msg->hdr.serviceId = conn->service_id; + + /* generate sequence numbers for data packets */ + if (call) { + switch (type) { + case RXRPC_PACKET_TYPE_DATA: + msg->seq = ++call->snd_seq_count; + msg->hdr.seq = htonl(msg->seq); + break; + case RXRPC_PACKET_TYPE_ACK: + /* ACK sequence numbers are complicated. The following may be wrong: + * - jumbo packet ACKs should have a seq number + * - normal ACKs should not + */ + default: + break; + } + } + + msg->dcount = dcount + 1; + msg->dsize = sizeof(msg->hdr); + msg->data[0].iov_len = sizeof(msg->hdr); + msg->data[0].iov_base = &msg->hdr; + + for (loop=0; loop<dcount; loop++) { + msg->dsize += diov[loop].iov_len; + msg->data[loop+1].iov_len = diov[loop].iov_len; + msg->data[loop+1].iov_base = diov[loop].iov_base; + } + + __RXACCT(atomic_inc(&rxrpc_message_count)); + *_msg = msg; + _leave(" = 0 (%p) #%d",msg,atomic_read(&rxrpc_message_count)); + return 0; +} /* end rxrpc_conn_newmsg() */ + +/*****************************************************************************/ +/* + * free a message + */ +void __rxrpc_put_message(struct rxrpc_message *msg) +{ + int loop; + + _enter("%p #%d",msg,atomic_read(&rxrpc_message_count)); + + if (msg->pkt) kfree_skb(msg->pkt); + if (msg->conn) rxrpc_put_connection(msg->conn); + + for (loop=0; loop<8; loop++) + if (test_bit(loop,&msg->dfree)) + kfree(msg->data[loop].iov_base); + + __RXACCT(atomic_dec(&rxrpc_message_count)); + kfree(msg); + + _leave(""); +} /* end __rxrpc_put_message() */ + +/*****************************************************************************/ +/* + * send a message out through the transport endpoint + */ +int rxrpc_conn_sendmsg(struct rxrpc_connection *conn, struct rxrpc_message *msg) +{ + struct msghdr msghdr; + mm_segment_t oldfs; + int ret; + + _enter("%p{%d}",conn,ntohs(conn->addr.sin_port)); + + /* fill in some fields in the header */ + spin_lock(&conn->lock); + msg->hdr.serial = htonl(++conn->serial_counter); + msg->rttdone = 0; + spin_unlock(&conn->lock); + + /* set up the message to be transmitted */ + msghdr.msg_name = &conn->addr; + msghdr.msg_namelen = sizeof(conn->addr); + msghdr.msg_iov = msg->data; + msghdr.msg_iovlen = msg->dcount; + msghdr.msg_control = NULL; + msghdr.msg_controllen = 0; + msghdr.msg_flags = MSG_CONFIRM|MSG_DONTWAIT; + + _net("Sending message type %d of %d bytes to %08x:%d", + msg->hdr.type, + msg->dsize, + htonl(conn->addr.sin_addr.s_addr), + htons(conn->addr.sin_port)); + + /* send the message */ + oldfs = get_fs(); + set_fs(KERNEL_DS); + ret = sock_sendmsg(conn->trans->socket,&msghdr,msg->dsize); + set_fs(oldfs); + + if (ret<0) { + msg->state = RXRPC_MSG_ERROR; + } + else { + msg->state = RXRPC_MSG_SENT; + ret = 0; + + spin_lock(&conn->lock); + do_gettimeofday(&conn->atime); + msg->stamp = conn->atime; + spin_unlock(&conn->lock); + } + + _leave(" = %d",ret); + + return ret; +} /* end rxrpc_conn_sendmsg() */ + +/*****************************************************************************/ +/* + * deal with a subsequent call packet + */ +int rxrpc_conn_receive_call_packet(struct rxrpc_connection *conn, + struct rxrpc_call *call, + struct rxrpc_message *msg) +{ + struct rxrpc_message *pmsg; + struct list_head *_p; + unsigned cix, seq; + int ret = 0; + + _enter("%p,%p,%p",conn,call,msg); + + if (!call) { + cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; + + spin_lock(&conn->lock); + call = conn->channels[cix]; + + if (!call || call->call_id != msg->hdr.callNumber) { + spin_unlock(&conn->lock); + rxrpc_trans_immediate_abort(conn->trans,msg,-ENOENT); + goto out; + } + else { + rxrpc_get_call(call); + spin_unlock(&conn->lock); + } + } + else { + rxrpc_get_call(call); + } + + _proto("Received packet %%%u [%u] on call %hu:%u:%u", + htonl(msg->hdr.serial), + htonl(msg->hdr.seq), + htons(msg->hdr.serviceId), + htonl(conn->conn_id), + htonl(call->call_id)); + + call->pkt_rcv_count++; + + if (msg->pkt->dst && msg->pkt->dst->dev) + conn->peer->if_mtu = msg->pkt->dst->dev->mtu - msg->pkt->dst->dev->hard_header_len; + + /* queue on the call in seq order */ + rxrpc_get_message(msg); + seq = msg->seq; + + spin_lock(&call->lock); + list_for_each(_p,&call->rcv_receiveq) { + pmsg = list_entry(_p,struct rxrpc_message,link); + if (pmsg->seq>seq) + break; + } + list_add_tail(&msg->link,_p); + + /* reset the activity timeout */ + call->flags |= RXRPC_CALL_RCV_PKT; + mod_timer(&call->rcv_timeout,jiffies + rxrpc_call_rcv_timeout * HZ); + + spin_unlock(&call->lock); + + rxrpc_krxiod_queue_call(call); + + rxrpc_put_call(call); + out: + _leave(" = %d",ret); + + return ret; +} /* end rxrpc_conn_receive_call_packet() */ + +/*****************************************************************************/ +/* + * handle an ICMP error being applied to a connection + */ +void rxrpc_conn_handle_error(struct rxrpc_connection *conn, int local, int errno) +{ + struct rxrpc_call *calls[4]; + int loop; + + _enter("%p{%d},%d",conn,ntohs(conn->addr.sin_port),errno); + + /* get a ref to all my calls in one go */ + memset(calls,0,sizeof(calls)); + spin_lock(&conn->lock); + + for (loop=3; loop>=0; loop--) { + if (conn->channels[loop]) { + calls[loop] = conn->channels[loop]; + rxrpc_get_call(calls[loop]); + } + } + + spin_unlock(&conn->lock); + + /* now kick them all */ + for (loop=3; loop>=0; loop--) { + if (calls[loop]) { + rxrpc_call_handle_error(calls[loop],local,errno); + rxrpc_put_call(calls[loop]); + } + } + + _leave(""); +} /* end rxrpc_conn_handle_error() */ diff --git a/net/rxrpc/internal.h b/net/rxrpc/internal.h new file mode 100644 index 000000000000..afd712a439f9 --- /dev/null +++ b/net/rxrpc/internal.h @@ -0,0 +1,107 @@ +/* internal.h: internal Rx RPC stuff + * + * Copyright (c) 2002 David Howells (dhowells@redhat.com). + */ + +#ifndef RXRPC_INTERNAL_H +#define RXRPC_INTERNAL_H + +#include <linux/compiler.h> +#include <linux/kernel.h> + +/* + * debug accounting + */ +#if 1 +#define __RXACCT_DECL(X) X +#define __RXACCT(X) do { X; } while(0) +#else +#define __RXACCT_DECL(X) +#define __RXACCT(X) do { } while(0) +#endif + +__RXACCT_DECL(extern atomic_t rxrpc_transport_count); +__RXACCT_DECL(extern atomic_t rxrpc_peer_count); +__RXACCT_DECL(extern atomic_t rxrpc_connection_count); +__RXACCT_DECL(extern atomic_t rxrpc_call_count); +__RXACCT_DECL(extern atomic_t rxrpc_message_count); + +/* + * debug tracing + */ +#define kenter(FMT,...) printk("==> %s("FMT")\n",__FUNCTION__,##__VA_ARGS__) +#define kleave(FMT,...) printk("<== %s()"FMT"\n",__FUNCTION__,##__VA_ARGS__) +#define kdebug(FMT,...) printk(" "FMT"\n",##__VA_ARGS__) +#define kproto(FMT,...) printk("### "FMT"\n",##__VA_ARGS__) +#define knet(FMT,...) printk(" "FMT"\n",##__VA_ARGS__) + +#if 0 +#define _enter(FMT,...) kenter(FMT,##__VA_ARGS__) +#define _leave(FMT,...) kleave(FMT,##__VA_ARGS__) +#define _debug(FMT,...) kdebug(FMT,##__VA_ARGS__) +#define _proto(FMT,...) kproto(FMT,##__VA_ARGS__) +#define _net(FMT,...) knet(FMT,##__VA_ARGS__) +#else +#define _enter(FMT,...) do { if (rxrpc_ktrace) kenter(FMT,##__VA_ARGS__); } while(0) +#define _leave(FMT,...) do { if (rxrpc_ktrace) kleave(FMT,##__VA_ARGS__); } while(0) +#define _debug(FMT,...) do { if (rxrpc_kdebug) kdebug(FMT,##__VA_ARGS__); } while(0) +#define _proto(FMT,...) do { if (rxrpc_kproto) kproto(FMT,##__VA_ARGS__); } while(0) +#define _net(FMT,...) do { if (rxrpc_knet) knet (FMT,##__VA_ARGS__); } while(0) +#endif + +static inline void rxrpc_discard_my_signals(void) +{ + while (signal_pending(current)) { + siginfo_t sinfo; + + spin_lock_irq(¤t->sig->siglock); + dequeue_signal(¤t->blocked,&sinfo); + spin_unlock_irq(¤t->sig->siglock); + } +} + +/* + * call.c + */ +extern struct list_head rxrpc_calls; +extern struct rw_semaphore rxrpc_calls_sem; + +/* + * connection.c + */ +extern struct list_head rxrpc_conns; +extern struct rw_semaphore rxrpc_conns_sem; + +extern void rxrpc_conn_do_timeout(struct rxrpc_connection *conn); +extern void rxrpc_conn_clearall(struct rxrpc_peer *peer); + +/* + * peer.c + */ +extern struct list_head rxrpc_peers; +extern struct rw_semaphore rxrpc_peers_sem; + +extern void rxrpc_peer_calculate_rtt(struct rxrpc_peer *peer, + struct rxrpc_message *msg, + struct rxrpc_message *resp); + +extern void rxrpc_peer_clearall(struct rxrpc_transport *trans); + +extern void rxrpc_peer_do_timeout(struct rxrpc_peer *peer); + + +/* + * proc.c + */ +#ifdef CONFIG_PROC_FS +extern int rxrpc_proc_init(void); +extern void rxrpc_proc_cleanup(void); +#endif + +/* + * transport.c + */ +extern struct list_head rxrpc_proc_transports; +extern struct rw_semaphore rxrpc_proc_transports_sem; + +#endif /* RXRPC_INTERNAL_H */ diff --git a/net/rxrpc/krxiod.c b/net/rxrpc/krxiod.c new file mode 100644 index 000000000000..4bbc3f6b3418 --- /dev/null +++ b/net/rxrpc/krxiod.c @@ -0,0 +1,262 @@ +/* krxiod.c: Rx I/O daemon + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/version.h> +#include <linux/sched.h> +#include <linux/completion.h> +#include <linux/spinlock.h> +#include <linux/init.h> +#include <rxrpc/krxiod.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/call.h> +#include "internal.h" + +static DECLARE_WAIT_QUEUE_HEAD(rxrpc_krxiod_sleepq); +static DECLARE_COMPLETION(rxrpc_krxiod_dead); + +static atomic_t rxrpc_krxiod_qcount = ATOMIC_INIT(0); + +static LIST_HEAD(rxrpc_krxiod_transportq); +static spinlock_t rxrpc_krxiod_transportq_lock = SPIN_LOCK_UNLOCKED; + +static LIST_HEAD(rxrpc_krxiod_callq); +static spinlock_t rxrpc_krxiod_callq_lock = SPIN_LOCK_UNLOCKED; + +static volatile int rxrpc_krxiod_die; + +/*****************************************************************************/ +/* + * Rx I/O daemon + */ +static int rxrpc_krxiod(void *arg) +{ + DECLARE_WAITQUEUE(krxiod,current); + + printk("Started krxiod %d\n",current->pid); + strcpy(current->comm,"krxiod"); + + daemonize(); + + /* only certain signals are of interest */ + spin_lock_irq(¤t->sig->siglock); + siginitsetinv(¤t->blocked,0); +#if LINUX_VERSION_CODE > KERNEL_VERSION(2,5,3) + recalc_sigpending(); +#else + recalc_sigpending(current); +#endif + spin_unlock_irq(¤t->sig->siglock); + + /* loop around waiting for work to do */ + do { + /* wait for work or to be told to exit */ + _debug("### Begin Wait"); + if (!atomic_read(&rxrpc_krxiod_qcount)) { + set_current_state(TASK_INTERRUPTIBLE); + + add_wait_queue(&rxrpc_krxiod_sleepq,&krxiod); + + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (atomic_read(&rxrpc_krxiod_qcount) || + rxrpc_krxiod_die || + signal_pending(current)) + break; + + schedule(); + } + + remove_wait_queue(&rxrpc_krxiod_sleepq,&krxiod); + set_current_state(TASK_RUNNING); + } + _debug("### End Wait"); + + /* do work if been given some to do */ + _debug("### Begin Work"); + + /* see if there's a transport in need of attention */ + if (!list_empty(&rxrpc_krxiod_transportq)) { + struct rxrpc_transport *trans = NULL; + + spin_lock_irq(&rxrpc_krxiod_transportq_lock); + + if (!list_empty(&rxrpc_krxiod_transportq)) { + trans = list_entry(rxrpc_krxiod_transportq.next, + struct rxrpc_transport,krxiodq_link); + list_del_init(&trans->krxiodq_link); + atomic_dec(&rxrpc_krxiod_qcount); + + /* make sure it hasn't gone away and doesn't go away */ + if (atomic_read(&trans->usage)>0) + rxrpc_get_transport(trans); + else + trans = NULL; + } + + spin_unlock_irq(&rxrpc_krxiod_transportq_lock); + + if (trans) { + rxrpc_trans_receive_packet(trans); + rxrpc_put_transport(trans); + } + } + + /* see if there's a call in need of attention */ + if (!list_empty(&rxrpc_krxiod_callq)) { + struct rxrpc_call *call = NULL; + + spin_lock_irq(&rxrpc_krxiod_callq_lock); + + if (!list_empty(&rxrpc_krxiod_callq)) { + call = list_entry(rxrpc_krxiod_callq.next, + struct rxrpc_call,rcv_krxiodq_lk); + list_del_init(&call->rcv_krxiodq_lk); + atomic_dec(&rxrpc_krxiod_qcount); + + /* make sure it hasn't gone away and doesn't go away */ + if (atomic_read(&call->usage)>0) { + _debug("@@@ KRXIOD Begin Attend Call %p",call); + rxrpc_get_call(call); + } + else { + call = NULL; + } + } + + spin_unlock_irq(&rxrpc_krxiod_callq_lock); + + if (call) { + rxrpc_call_do_stuff(call); + rxrpc_put_call(call); + _debug("@@@ KRXIOD End Attend Call %p",call); + } + } + + _debug("### End Work"); + + /* discard pending signals */ + rxrpc_discard_my_signals(); + + } while (!rxrpc_krxiod_die); + + /* and that's all */ + complete_and_exit(&rxrpc_krxiod_dead,0); + +} /* end rxrpc_krxiod() */ + +/*****************************************************************************/ +/* + * start up a krxiod daemon + */ +int __init rxrpc_krxiod_init(void) +{ + return kernel_thread(rxrpc_krxiod,NULL,0); + +} /* end rxrpc_krxiod_init() */ + +/*****************************************************************************/ +/* + * kill the krxiod daemon and wait for it to complete + */ +void rxrpc_krxiod_kill(void) +{ + rxrpc_krxiod_die = 1; + wake_up_all(&rxrpc_krxiod_sleepq); + wait_for_completion(&rxrpc_krxiod_dead); + +} /* end rxrpc_krxiod_kill() */ + +/*****************************************************************************/ +/* + * queue a transport for attention by krxiod + */ +void rxrpc_krxiod_queue_transport(struct rxrpc_transport *trans) +{ + unsigned long flags; + + _enter(""); + + if (list_empty(&trans->krxiodq_link)) { + spin_lock_irqsave(&rxrpc_krxiod_transportq_lock,flags); + + if (list_empty(&trans->krxiodq_link)) { + if (atomic_read(&trans->usage)>0) { + list_add_tail(&trans->krxiodq_link,&rxrpc_krxiod_transportq); + atomic_inc(&rxrpc_krxiod_qcount); + } + } + + spin_unlock_irqrestore(&rxrpc_krxiod_transportq_lock,flags); + wake_up_all(&rxrpc_krxiod_sleepq); + } + + _leave(""); + +} /* end rxrpc_krxiod_queue_transport() */ + +/*****************************************************************************/ +/* + * dequeue a transport from krxiod's attention queue + */ +void rxrpc_krxiod_dequeue_transport(struct rxrpc_transport *trans) +{ + unsigned long flags; + + _enter(""); + + spin_lock_irqsave(&rxrpc_krxiod_transportq_lock,flags); + if (!list_empty(&trans->krxiodq_link)) { + list_del_init(&trans->krxiodq_link); + atomic_dec(&rxrpc_krxiod_qcount); + } + spin_unlock_irqrestore(&rxrpc_krxiod_transportq_lock,flags); + + _leave(""); + +} /* end rxrpc_krxiod_dequeue_transport() */ + +/*****************************************************************************/ +/* + * queue a call for attention by krxiod + */ +void rxrpc_krxiod_queue_call(struct rxrpc_call *call) +{ + unsigned long flags; + + if (list_empty(&call->rcv_krxiodq_lk)) { + spin_lock_irqsave(&rxrpc_krxiod_callq_lock,flags); + if (atomic_read(&call->usage)>0) { + list_add_tail(&call->rcv_krxiodq_lk,&rxrpc_krxiod_callq); + atomic_inc(&rxrpc_krxiod_qcount); + } + spin_unlock_irqrestore(&rxrpc_krxiod_callq_lock,flags); + } + wake_up_all(&rxrpc_krxiod_sleepq); + +} /* end rxrpc_krxiod_queue_call() */ + +/*****************************************************************************/ +/* + * dequeue a call from krxiod's attention queue + */ +void rxrpc_krxiod_dequeue_call(struct rxrpc_call *call) +{ + unsigned long flags; + + spin_lock_irqsave(&rxrpc_krxiod_callq_lock,flags); + if (!list_empty(&call->rcv_krxiodq_lk)) { + list_del_init(&call->rcv_krxiodq_lk); + atomic_dec(&rxrpc_krxiod_qcount); + } + spin_unlock_irqrestore(&rxrpc_krxiod_callq_lock,flags); + +} /* end rxrpc_krxiod_dequeue_call() */ diff --git a/net/rxrpc/krxsecd.c b/net/rxrpc/krxsecd.c new file mode 100644 index 000000000000..5b57c3f8d776 --- /dev/null +++ b/net/rxrpc/krxsecd.c @@ -0,0 +1,278 @@ +/* krxsecd.c: Rx security daemon + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + * This daemon deals with: + * - consulting the application as to whether inbound peers and calls should be authorised + * - generating security challenges for inbound connections + * - responding to security challenges on outbound connections + */ + +#include <linux/version.h> +#include <linux/module.h> +#include <linux/sched.h> +#include <linux/completion.h> +#include <linux/spinlock.h> +#include <linux/init.h> +#include <rxrpc/krxsecd.h> +#include <rxrpc/transport.h> +#include <rxrpc/connection.h> +#include <rxrpc/message.h> +#include <rxrpc/peer.h> +#include <rxrpc/call.h> +#include <linux/udp.h> +#include <linux/ip.h> +#include <net/sock.h> +#include "internal.h" + +static DECLARE_WAIT_QUEUE_HEAD(rxrpc_krxsecd_sleepq); +static DECLARE_COMPLETION(rxrpc_krxsecd_dead); +static volatile int rxrpc_krxsecd_die; + +static atomic_t rxrpc_krxsecd_qcount; + +/* queue of unprocessed inbound messages with seqno #1 and RXRPC_CLIENT_INITIATED flag set */ +static LIST_HEAD(rxrpc_krxsecd_initmsgq); +static spinlock_t rxrpc_krxsecd_initmsgq_lock = SPIN_LOCK_UNLOCKED; + +static void rxrpc_krxsecd_process_incoming_call(struct rxrpc_message *msg); + +/*****************************************************************************/ +/* + * Rx security daemon + */ +static int rxrpc_krxsecd(void *arg) +{ + DECLARE_WAITQUEUE(krxsecd,current); + + int die; + + printk("Started krxsecd %d\n",current->pid); + strcpy(current->comm,"krxsecd"); + + daemonize(); + + /* only certain signals are of interest */ + spin_lock_irq(¤t->sig->siglock); + siginitsetinv(¤t->blocked,0); +#if LINUX_VERSION_CODE > KERNEL_VERSION(2,5,3) + recalc_sigpending(); +#else + recalc_sigpending(current); +#endif + spin_unlock_irq(¤t->sig->siglock); + + /* loop around waiting for work to do */ + do { + /* wait for work or to be told to exit */ + _debug("### Begin Wait"); + if (!atomic_read(&rxrpc_krxsecd_qcount)) { + set_current_state(TASK_INTERRUPTIBLE); + + add_wait_queue(&rxrpc_krxsecd_sleepq,&krxsecd); + + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (atomic_read(&rxrpc_krxsecd_qcount) || + rxrpc_krxsecd_die || + signal_pending(current)) + break; + + schedule(); + } + + remove_wait_queue(&rxrpc_krxsecd_sleepq,&krxsecd); + set_current_state(TASK_RUNNING); + } + die = rxrpc_krxsecd_die; + _debug("### End Wait"); + + /* see if there're incoming calls in need of authenticating */ + _debug("### Begin Inbound Calls"); + + if (!list_empty(&rxrpc_krxsecd_initmsgq)) { + struct rxrpc_message *msg = NULL; + + spin_lock(&rxrpc_krxsecd_initmsgq_lock); + + if (!list_empty(&rxrpc_krxsecd_initmsgq)) { + msg = list_entry(rxrpc_krxsecd_initmsgq.next, + struct rxrpc_message,link); + list_del_init(&msg->link); + atomic_dec(&rxrpc_krxsecd_qcount); + } + + spin_unlock(&rxrpc_krxsecd_initmsgq_lock); + + if (msg) { + rxrpc_krxsecd_process_incoming_call(msg); + rxrpc_put_message(msg); + } + } + + _debug("### End Inbound Calls"); + + /* discard pending signals */ + rxrpc_discard_my_signals(); + + } while (!die); + + /* and that's all */ + complete_and_exit(&rxrpc_krxsecd_dead,0); + +} /* end rxrpc_krxsecd() */ + +/*****************************************************************************/ +/* + * start up a krxsecd daemon + */ +int __init rxrpc_krxsecd_init(void) +{ + return kernel_thread(rxrpc_krxsecd,NULL,0); + +} /* end rxrpc_krxsecd_init() */ + +/*****************************************************************************/ +/* + * kill the krxsecd daemon and wait for it to complete + */ +void rxrpc_krxsecd_kill(void) +{ + rxrpc_krxsecd_die = 1; + wake_up_all(&rxrpc_krxsecd_sleepq); + wait_for_completion(&rxrpc_krxsecd_dead); + +} /* end rxrpc_krxsecd_kill() */ + +/*****************************************************************************/ +/* + * clear all pending incoming calls for the specified transport + */ +void rxrpc_krxsecd_clear_transport(struct rxrpc_transport *trans) +{ + LIST_HEAD(tmp); + + struct rxrpc_message *msg; + struct list_head *_p, *_n; + + _enter("%p",trans); + + /* move all the messages for this transport onto a temp list */ + spin_lock(&rxrpc_krxsecd_initmsgq_lock); + + list_for_each_safe(_p,_n,&rxrpc_krxsecd_initmsgq) { + msg = list_entry(_p,struct rxrpc_message,link); + if (msg->trans==trans) { + list_del(&msg->link); + list_add_tail(&msg->link,&tmp); + atomic_dec(&rxrpc_krxsecd_qcount); + } + } + + spin_unlock(&rxrpc_krxsecd_initmsgq_lock); + + /* zap all messages on the temp list */ + while (!list_empty(&tmp)) { + msg = list_entry(tmp.next,struct rxrpc_message,link); + list_del_init(&msg->link); + rxrpc_put_message(msg); + } + + _leave(""); +} /* end rxrpc_krxsecd_clear_transport() */ + +/*****************************************************************************/ +/* + * queue a message on the incoming calls list + */ +void rxrpc_krxsecd_queue_incoming_call(struct rxrpc_message *msg) +{ + _enter("%p",msg); + + /* queue for processing by krxsecd */ + spin_lock(&rxrpc_krxsecd_initmsgq_lock); + + if (!rxrpc_krxsecd_die) { + rxrpc_get_message(msg); + list_add_tail(&msg->link,&rxrpc_krxsecd_initmsgq); + atomic_inc(&rxrpc_krxsecd_qcount); + } + + spin_unlock(&rxrpc_krxsecd_initmsgq_lock); + + wake_up(&rxrpc_krxsecd_sleepq); + + _leave(""); +} /* end rxrpc_krxsecd_queue_incoming_call() */ + +/*****************************************************************************/ +/* + * process the initial message of an incoming call + */ +void rxrpc_krxsecd_process_incoming_call(struct rxrpc_message *msg) +{ + struct rxrpc_transport *trans = msg->trans; + struct rxrpc_service *srv; + struct rxrpc_call *call; + struct list_head *_p; + unsigned short sid; + int ret; + + _enter("%p{tr=%p}",msg,trans); + + ret = rxrpc_incoming_call(msg->conn,msg,&call); + if (ret<0) + goto out; + + /* find the matching service on the transport */ + sid = ntohs(msg->hdr.serviceId); + srv = NULL; + + spin_lock(&trans->lock); + list_for_each(_p,&trans->services) { + srv = list_entry(_p,struct rxrpc_service,link); + if (srv->service_id==sid && try_inc_mod_count(srv->owner)) { + /* found a match (made sure it won't vanish) */ + _debug("found service '%s'",srv->name); + call->owner = srv->owner; + break; + } + } + spin_unlock(&trans->lock); + + /* report the new connection + * - the func must inc the call's usage count to keep it + */ + ret = -ENOENT; + if (_p!=&trans->services) { + /* attempt to accept the call */ + call->conn->service = srv; + call->app_attn_func = srv->attn_func; + call->app_error_func = srv->error_func; + call->app_aemap_func = srv->aemap_func; + + ret = srv->new_call(call); + + /* send an abort if an error occurred */ + if (ret<0) { + rxrpc_call_abort(call,ret); + } + else { + /* formally receive and ACK the new packet */ + ret = rxrpc_conn_receive_call_packet(call->conn,call,msg); + } + } + + rxrpc_put_call(call); + out: + if (ret<0) + rxrpc_trans_immediate_abort(trans,msg,ret); + + _leave(" (%d)",ret); +} /* end rxrpc_krxsecd_process_incoming_call() */ diff --git a/net/rxrpc/krxtimod.c b/net/rxrpc/krxtimod.c new file mode 100644 index 000000000000..8dc986f79a6f --- /dev/null +++ b/net/rxrpc/krxtimod.c @@ -0,0 +1,210 @@ +/* krxtimod.c: RXRPC timeout daemon + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/version.h> +#include <linux/module.h> +#include <linux/init.h> +#include <linux/sched.h> +#include <linux/completion.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/krxtimod.h> +#include <asm/errno.h> +#include "internal.h" + +static DECLARE_COMPLETION(krxtimod_alive); +static DECLARE_COMPLETION(krxtimod_dead); +static DECLARE_WAIT_QUEUE_HEAD(krxtimod_sleepq); +static int krxtimod_die; + +static LIST_HEAD(krxtimod_list); +static spinlock_t krxtimod_lock = SPIN_LOCK_UNLOCKED; + +static int krxtimod(void *arg); + +/*****************************************************************************/ +/* + * start the timeout daemon + */ +int rxrpc_krxtimod_start(void) +{ + int ret; + + ret = kernel_thread(krxtimod,NULL,0); + if (ret<0) + return ret; + + wait_for_completion(&krxtimod_alive); + + return ret; +} /* end rxrpc_krxtimod_start() */ + +/*****************************************************************************/ +/* + * stop the timeout daemon + */ +void rxrpc_krxtimod_kill(void) +{ + /* get rid of my daemon */ + krxtimod_die = 1; + wake_up(&krxtimod_sleepq); + wait_for_completion(&krxtimod_dead); + +} /* end rxrpc_krxtimod_kill() */ + +/*****************************************************************************/ +/* + * timeout processing daemon + */ +static int krxtimod(void *arg) +{ + DECLARE_WAITQUEUE(myself,current); + + rxrpc_timer_t *timer; + + printk("Started krxtimod %d\n",current->pid); + strcpy(current->comm,"krxtimod"); + + daemonize(); + + complete(&krxtimod_alive); + + /* only certain signals are of interest */ + spin_lock_irq(¤t->sig->siglock); + siginitsetinv(¤t->blocked,0); +#if LINUX_VERSION_CODE > KERNEL_VERSION(2,5,3) + recalc_sigpending(); +#else + recalc_sigpending(current); +#endif + spin_unlock_irq(¤t->sig->siglock); + + /* loop around looking for things to attend to */ + loop: + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&krxtimod_sleepq,&myself); + + for (;;) { + unsigned long jif; + signed long timeout; + + /* deal with the server being asked to die */ + if (krxtimod_die) { + remove_wait_queue(&krxtimod_sleepq,&myself); + _leave(""); + complete_and_exit(&krxtimod_dead,0); + } + + /* discard pending signals */ + rxrpc_discard_my_signals(); + + /* work out the time to elapse before the next event */ + spin_lock(&krxtimod_lock); + if (list_empty(&krxtimod_list)) { + timeout = MAX_SCHEDULE_TIMEOUT; + } + else { + timer = list_entry(krxtimod_list.next,rxrpc_timer_t,link); + timeout = timer->timo_jif; + jif = jiffies; + + if (time_before_eq(timeout,jif)) + goto immediate; + + else { + timeout = (long)timeout - (long)jiffies; + } + } + spin_unlock(&krxtimod_lock); + + schedule_timeout(timeout); + + set_current_state(TASK_INTERRUPTIBLE); + } + + /* the thing on the front of the queue needs processing + * - we come here with the lock held and timer pointing to the expired entry + */ + immediate: + remove_wait_queue(&krxtimod_sleepq,&myself); + set_current_state(TASK_RUNNING); + + _debug("@@@ Begin Timeout of %p",timer); + + /* dequeue the timer */ + list_del_init(&timer->link); + spin_unlock(&krxtimod_lock); + + /* call the timeout function */ + timer->ops->timed_out(timer); + + _debug("@@@ End Timeout"); + goto loop; + +} /* end krxtimod() */ + +/*****************************************************************************/ +/* + * (re-)queue a timer + */ +void rxrpc_krxtimod_add_timer(rxrpc_timer_t *timer, unsigned long timeout) +{ + struct list_head *_p; + rxrpc_timer_t *ptimer; + + _enter("%p,%lu",timer,timeout); + + spin_lock(&krxtimod_lock); + + list_del(&timer->link); + + /* the timer was deferred or reset - put it back in the queue at the right place */ + timer->timo_jif = jiffies + timeout; + + list_for_each(_p,&krxtimod_list) { + ptimer = list_entry(_p,rxrpc_timer_t,link); + if (time_before(timer->timo_jif,ptimer->timo_jif)) + break; + } + + list_add_tail(&timer->link,_p); /* insert before stopping point */ + + spin_unlock(&krxtimod_lock); + + wake_up(&krxtimod_sleepq); + + _leave(""); +} /* end rxrpc_krxtimod_queue_vlocation() */ + +/*****************************************************************************/ +/* + * dequeue a timer + * - returns 0 if the timer was deleted or -ENOENT if it wasn't queued + */ +int rxrpc_krxtimod_del_timer(rxrpc_timer_t *timer) +{ + int ret = 0; + + _enter("%p",timer); + + spin_lock(&krxtimod_lock); + + if (list_empty(&timer->link)) + ret = -ENOENT; + else + list_del_init(&timer->link); + + spin_unlock(&krxtimod_lock); + + wake_up(&krxtimod_sleepq); + + _leave(" = %d",ret); + return ret; +} /* end rxrpc_krxtimod_del_timer() */ diff --git a/net/rxrpc/main.c b/net/rxrpc/main.c new file mode 100644 index 000000000000..04bb3faa42d1 --- /dev/null +++ b/net/rxrpc/main.c @@ -0,0 +1,127 @@ +/* main.c: Rx RPC interface + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/module.h> +#include <linux/init.h> +#include <linux/sched.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/krxiod.h> +#include <rxrpc/krxsecd.h> +#include <rxrpc/krxtimod.h> +#include <rxrpc/transport.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include "internal.h" + +static int rxrpc_initialise(void); +static void rxrpc_cleanup(void); + +module_init(rxrpc_initialise); +module_exit(rxrpc_cleanup); + +MODULE_DESCRIPTION("Rx RPC implementation"); +MODULE_AUTHOR("Red Hat, Inc."); +MODULE_LICENSE("GPL"); + +u32 rxrpc_epoch; + +/*****************************************************************************/ +/* + * initialise the Rx module + */ +static int rxrpc_initialise(void) +{ + int ret; + + /* my epoch value */ + rxrpc_epoch = htonl(xtime.tv_sec); + + /* register the /proc interface */ +#ifdef CONFIG_PROC_FS + ret = rxrpc_proc_init(); + if (ret<0) + return ret; +#endif + + /* register the sysctl files */ +#ifdef CONFIG_SYSCTL + ret = rxrpc_sysctl_init(); + if (ret<0) + goto error_proc; +#endif + + /* start the krxtimod daemon */ + ret = rxrpc_krxtimod_start(); + if (ret<0) + goto error_sysctl; + + /* start the krxiod daemon */ + ret = rxrpc_krxiod_init(); + if (ret<0) + goto error_krxtimod; + + /* start the krxsecd daemon */ + ret = rxrpc_krxsecd_init(); + if (ret<0) + goto error_krxiod; + + kdebug("\n\n"); + + return 0; + + error_krxiod: + rxrpc_krxiod_kill(); + error_krxtimod: + rxrpc_krxtimod_kill(); + error_sysctl: +#ifdef CONFIG_SYSCTL + rxrpc_sysctl_cleanup(); +#endif + error_proc: +#ifdef CONFIG_PROC_FS + rxrpc_proc_cleanup(); +#endif + return ret; +} /* end rxrpc_initialise() */ + +/*****************************************************************************/ +/* + * clean up the Rx module + */ +static void rxrpc_cleanup(void) +{ + kenter(""); + + __RXACCT(printk("Outstanding Messages : %d\n",atomic_read(&rxrpc_message_count))); + __RXACCT(printk("Outstanding Calls : %d\n",atomic_read(&rxrpc_call_count))); + __RXACCT(printk("Outstanding Connections: %d\n",atomic_read(&rxrpc_connection_count))); + __RXACCT(printk("Outstanding Peers : %d\n",atomic_read(&rxrpc_peer_count))); + __RXACCT(printk("Outstanding Transports : %d\n",atomic_read(&rxrpc_transport_count))); + + rxrpc_krxsecd_kill(); + rxrpc_krxiod_kill(); + rxrpc_krxtimod_kill(); +#ifdef CONFIG_SYSCTL + rxrpc_sysctl_cleanup(); +#endif +#ifdef CONFIG_PROC_FS + rxrpc_proc_cleanup(); +#endif + + __RXACCT(printk("Outstanding Messages : %d\n",atomic_read(&rxrpc_message_count))); + __RXACCT(printk("Outstanding Calls : %d\n",atomic_read(&rxrpc_call_count))); + __RXACCT(printk("Outstanding Connections: %d\n",atomic_read(&rxrpc_connection_count))); + __RXACCT(printk("Outstanding Peers : %d\n",atomic_read(&rxrpc_peer_count))); + __RXACCT(printk("Outstanding Transports : %d\n",atomic_read(&rxrpc_transport_count))); + + kleave(); +} /* end rxrpc_cleanup() */ diff --git a/net/rxrpc/peer.c b/net/rxrpc/peer.c new file mode 100644 index 000000000000..cdd90014b6af --- /dev/null +++ b/net/rxrpc/peer.c @@ -0,0 +1,380 @@ +/* peer.c: Rx RPC peer management + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include <linux/udp.h> +#include <linux/ip.h> +#include <net/sock.h> +#include <asm/uaccess.h> +#include <asm/div64.h> +#include "internal.h" + +__RXACCT_DECL(atomic_t rxrpc_peer_count); +LIST_HEAD(rxrpc_peers); +DECLARE_RWSEM(rxrpc_peers_sem); + +static void __rxrpc_peer_timeout(rxrpc_timer_t *timer) +{ + struct rxrpc_peer *peer = list_entry(timer,struct rxrpc_peer,timeout); + + _debug("Rx PEER TIMEOUT [%p{u=%d}]",peer,atomic_read(&peer->usage)); + + rxrpc_peer_do_timeout(peer); +} + +static const struct rxrpc_timer_ops rxrpc_peer_timer_ops = { + .timed_out = __rxrpc_peer_timeout, +}; + +/*****************************************************************************/ +/* + * create a peer record + */ +static int __rxrpc_create_peer(struct rxrpc_transport *trans, u32 addr, struct rxrpc_peer **_peer) +{ + struct rxrpc_peer *peer; + + _enter("%p,%08x",trans,ntohl(addr)); + + /* allocate and initialise a peer record */ + peer = kmalloc(sizeof(struct rxrpc_peer),GFP_KERNEL); + if (!peer) { + _leave(" = -ENOMEM"); + return -ENOMEM; + } + + memset(peer,0,sizeof(struct rxrpc_peer)); + atomic_set(&peer->usage,1); + + INIT_LIST_HEAD(&peer->link); + INIT_LIST_HEAD(&peer->proc_link); + INIT_LIST_HEAD(&peer->conn_active); + INIT_LIST_HEAD(&peer->conn_graveyard); + spin_lock_init(&peer->conn_gylock); + init_waitqueue_head(&peer->conn_gy_waitq); + rwlock_init(&peer->conn_lock); + atomic_set(&peer->conn_count,0); + spin_lock_init(&peer->lock); + rxrpc_timer_init(&peer->timeout,&rxrpc_peer_timer_ops); + + peer->addr.s_addr = addr; + + peer->trans = trans; + peer->ops = trans->peer_ops; + + __RXACCT(atomic_inc(&rxrpc_peer_count)); + *_peer = peer; + _leave(" = 0 (%p)",peer); + + return 0; +} /* end __rxrpc_create_peer() */ + +/*****************************************************************************/ +/* + * find a peer record on the specified transport + * - returns (if successful) with peer record usage incremented + * - resurrects it from the graveyard if found there + */ +int rxrpc_peer_lookup(struct rxrpc_transport *trans, u32 addr, struct rxrpc_peer **_peer) +{ + struct rxrpc_peer *peer, *candidate = NULL; + struct list_head *_p; + int ret; + + _enter("%p{%hu},%08x",trans,trans->port,ntohl(addr)); + + /* [common case] search the transport's active list first */ + read_lock(&trans->peer_lock); + list_for_each(_p,&trans->peer_active) { + peer = list_entry(_p,struct rxrpc_peer,link); + if (peer->addr.s_addr==addr) + goto found_active; + } + read_unlock(&trans->peer_lock); + + /* [uncommon case] not active - create a candidate for a new record */ + ret = __rxrpc_create_peer(trans,addr,&candidate); + if (ret<0) { + _leave(" = %d",ret); + return ret; + } + + /* search the active list again, just in case it appeared whilst we were busy */ + write_lock(&trans->peer_lock); + list_for_each(_p,&trans->peer_active) { + peer = list_entry(_p,struct rxrpc_peer,link); + if (peer->addr.s_addr==addr) + goto found_active_second_chance; + } + + /* search the transport's graveyard list */ + spin_lock(&trans->peer_gylock); + list_for_each(_p,&trans->peer_graveyard) { + peer = list_entry(_p,struct rxrpc_peer,link); + if (peer->addr.s_addr==addr) + goto found_in_graveyard; + } + spin_unlock(&trans->peer_gylock); + + /* we can now add the new candidate to the list + * - tell the application layer that this peer has been added + */ + rxrpc_get_transport(trans); + peer = candidate; + candidate = NULL; + + if (peer->ops && peer->ops->adding) { + ret = peer->ops->adding(peer); + if (ret<0) { + write_unlock(&trans->peer_lock); + __RXACCT(atomic_dec(&rxrpc_peer_count)); + kfree(peer); + rxrpc_put_transport(trans); + _leave(" = %d",ret); + return ret; + } + } + + atomic_inc(&trans->peer_count); + + make_active: + list_add_tail(&peer->link,&trans->peer_active); + + success_uwfree: + write_unlock(&trans->peer_lock); + + if (candidate) { + __RXACCT(atomic_dec(&rxrpc_peer_count)); + kfree(candidate); + } + + if (list_empty(&peer->proc_link)) { + down_write(&rxrpc_peers_sem); + list_add_tail(&peer->proc_link,&rxrpc_peers); + up_write(&rxrpc_peers_sem); + } + + success: + *_peer = peer; + + _leave(" = 0 (%p{u=%d cc=%d})", + peer,atomic_read(&peer->usage),atomic_read(&peer->conn_count)); + return 0; + + /* handle the peer being found in the active list straight off */ + found_active: + rxrpc_get_peer(peer); + read_unlock(&trans->peer_lock); + goto success; + + /* handle resurrecting a peer from the graveyard */ + found_in_graveyard: + rxrpc_get_peer(peer); + rxrpc_get_transport(peer->trans); + rxrpc_krxtimod_del_timer(&peer->timeout); + list_del_init(&peer->link); + spin_unlock(&trans->peer_gylock); + goto make_active; + + /* handle finding the peer on the second time through the active list */ + found_active_second_chance: + rxrpc_get_peer(peer); + goto success_uwfree; + +} /* end rxrpc_peer_lookup() */ + +/*****************************************************************************/ +/* + * finish with a peer record + * - it gets sent to the graveyard from where it can be resurrected or timed out + */ +void rxrpc_put_peer(struct rxrpc_peer *peer) +{ + struct rxrpc_transport *trans = peer->trans; + + _enter("%p{cc=%d a=%08x}",peer,atomic_read(&peer->conn_count),ntohl(peer->addr.s_addr)); + + /* sanity check */ + if (atomic_read(&peer->usage)<=0) + BUG(); + + write_lock(&trans->peer_lock); + spin_lock(&trans->peer_gylock); + if (likely(!atomic_dec_and_test(&peer->usage))) { + spin_unlock(&trans->peer_gylock); + write_unlock(&trans->peer_lock); + _leave(""); + return; + } + + /* move to graveyard queue */ + list_del(&peer->link); + write_unlock(&trans->peer_lock); + + list_add_tail(&peer->link,&trans->peer_graveyard); + + if (!list_empty(&peer->conn_active)) BUG(); + + /* discard in 600 secs */ + rxrpc_krxtimod_add_timer(&peer->timeout,100*HZ); + + spin_unlock(&trans->peer_gylock); + + rxrpc_put_transport(trans); + + _leave(" [killed]"); +} /* end rxrpc_put_peer() */ + +/*****************************************************************************/ +/* + * handle a peer timing out in the graveyard + * - called from krxtimod + */ +void rxrpc_peer_do_timeout(struct rxrpc_peer *peer) +{ + struct rxrpc_transport *trans = peer->trans; + + _enter("%p{u=%d cc=%d a=%08x}", + peer,atomic_read(&peer->usage),atomic_read(&peer->conn_count), + ntohl(peer->addr.s_addr)); + + if (atomic_read(&peer->usage)<0) + BUG(); + + /* remove from graveyard if still dead */ + spin_lock(&trans->peer_gylock); + if (atomic_read(&peer->usage)==0) + list_del_init(&peer->link); + else + peer = NULL; + spin_unlock(&trans->peer_gylock); + + if (!peer) { + _leave(""); + return; /* resurrected */ + } + + /* clear all connections on this peer */ + rxrpc_conn_clearall(peer); + + if (!list_empty(&peer->conn_active)) BUG(); + if (!list_empty(&peer->conn_graveyard)) BUG(); + + /* inform the application layer */ + if (peer->ops && peer->ops->discarding) + peer->ops->discarding(peer); + + if (!list_empty(&peer->proc_link)) { + down_write(&rxrpc_peers_sem); + list_del(&peer->proc_link); + up_write(&rxrpc_peers_sem); + } + + __RXACCT(atomic_dec(&rxrpc_peer_count)); + kfree(peer); + + /* if the graveyard is now empty, wake up anyone waiting for that */ + if (atomic_dec_and_test(&trans->peer_count)) + wake_up(&trans->peer_gy_waitq); + + _leave(" [destroyed]"); +} /* end rxrpc_peer_do_timeout() */ + +/*****************************************************************************/ +/* + * clear all peer records from a transport endpoint + */ +void rxrpc_peer_clearall(struct rxrpc_transport *trans) +{ + DECLARE_WAITQUEUE(myself,current); + + struct rxrpc_peer *peer; + int err; + + _enter("%p",trans); + + /* there shouldn't be any active peers remaining */ + if (!list_empty(&trans->peer_active)) + BUG(); + + /* manually timeout all peers in the graveyard */ + spin_lock(&trans->peer_gylock); + while (!list_empty(&trans->peer_graveyard)) { + peer = list_entry(trans->peer_graveyard.next,struct rxrpc_peer,link); + _debug("Clearing peer %p\n",peer); + err = rxrpc_krxtimod_del_timer(&peer->timeout); + spin_unlock(&trans->peer_gylock); + + if (err==0) + rxrpc_peer_do_timeout(peer); + + spin_lock(&trans->peer_gylock); + } + spin_unlock(&trans->peer_gylock); + + /* wait for the the peer graveyard to be completely cleared */ + set_current_state(TASK_UNINTERRUPTIBLE); + add_wait_queue(&trans->peer_gy_waitq,&myself); + + while (atomic_read(&trans->peer_count)!=0) { + schedule(); + set_current_state(TASK_UNINTERRUPTIBLE); + } + + remove_wait_queue(&trans->peer_gy_waitq,&myself); + set_current_state(TASK_RUNNING); + + _leave(""); + +} /* end rxrpc_peer_clearall() */ + +/*****************************************************************************/ +/* + * calculate and cache the Round-Trip-Time for a message and its response + */ +void rxrpc_peer_calculate_rtt(struct rxrpc_peer *peer, + struct rxrpc_message *msg, + struct rxrpc_message *resp) +{ + unsigned long long rtt; + int loop; + + _enter("%p,%p,%p",peer,msg,resp); + + /* calculate the latest RTT */ + rtt = resp->stamp.tv_sec - msg->stamp.tv_sec; + rtt *= 1000000UL; + rtt += resp->stamp.tv_usec - msg->stamp.tv_usec; + + /* add to cache */ + peer->rtt_cache[peer->rtt_point] = rtt; + peer->rtt_point++; + peer->rtt_point %= RXRPC_RTT_CACHE_SIZE; + + if (peer->rtt_usage<RXRPC_RTT_CACHE_SIZE) peer->rtt_usage++; + + /* recalculate RTT */ + for (loop=peer->rtt_usage-1; loop>=0; loop--) + rtt += peer->rtt_cache[loop]; + + peer->rtt = do_div(rtt,peer->rtt_usage); + + _leave(" RTT=%lu.%lums",peer->rtt/1000,peer->rtt%1000); + +} /* end rxrpc_peer_calculate_rtt() */ diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c new file mode 100644 index 000000000000..aeb9d9e8ddae --- /dev/null +++ b/net/rxrpc/proc.c @@ -0,0 +1,612 @@ +/* proc.c: /proc interface for RxRPC + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <linux/proc_fs.h> +#include <linux/seq_file.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include "internal.h" + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) +static inline struct proc_dir_entry *PDE(const struct inode *inode) +{ + return (struct proc_dir_entry *)inode->u.generic_ip; +} +#endif + +static struct proc_dir_entry *proc_rxrpc; + +static int rxrpc_proc_transports_open(struct inode *inode, struct file *file); +static void *rxrpc_proc_transports_start(struct seq_file *p, loff_t *pos); +static void *rxrpc_proc_transports_next(struct seq_file *p, void *v, loff_t *pos); +static void rxrpc_proc_transports_stop(struct seq_file *p, void *v); +static int rxrpc_proc_transports_show(struct seq_file *m, void *v); + +static struct seq_operations rxrpc_proc_transports_ops = { + .start = rxrpc_proc_transports_start, + .next = rxrpc_proc_transports_next, + .stop = rxrpc_proc_transports_stop, + .show = rxrpc_proc_transports_show, +}; + +static struct file_operations rxrpc_proc_transports_fops = { + .open = rxrpc_proc_transports_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static int rxrpc_proc_peers_open(struct inode *inode, struct file *file); +static void *rxrpc_proc_peers_start(struct seq_file *p, loff_t *pos); +static void *rxrpc_proc_peers_next(struct seq_file *p, void *v, loff_t *pos); +static void rxrpc_proc_peers_stop(struct seq_file *p, void *v); +static int rxrpc_proc_peers_show(struct seq_file *m, void *v); + +static struct seq_operations rxrpc_proc_peers_ops = { + .start = rxrpc_proc_peers_start, + .next = rxrpc_proc_peers_next, + .stop = rxrpc_proc_peers_stop, + .show = rxrpc_proc_peers_show, +}; + +static struct file_operations rxrpc_proc_peers_fops = { + .open = rxrpc_proc_peers_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static int rxrpc_proc_conns_open(struct inode *inode, struct file *file); +static void *rxrpc_proc_conns_start(struct seq_file *p, loff_t *pos); +static void *rxrpc_proc_conns_next(struct seq_file *p, void *v, loff_t *pos); +static void rxrpc_proc_conns_stop(struct seq_file *p, void *v); +static int rxrpc_proc_conns_show(struct seq_file *m, void *v); + +static struct seq_operations rxrpc_proc_conns_ops = { + .start = rxrpc_proc_conns_start, + .next = rxrpc_proc_conns_next, + .stop = rxrpc_proc_conns_stop, + .show = rxrpc_proc_conns_show, +}; + +static struct file_operations rxrpc_proc_conns_fops = { + .open = rxrpc_proc_conns_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static int rxrpc_proc_calls_open(struct inode *inode, struct file *file); +static void *rxrpc_proc_calls_start(struct seq_file *p, loff_t *pos); +static void *rxrpc_proc_calls_next(struct seq_file *p, void *v, loff_t *pos); +static void rxrpc_proc_calls_stop(struct seq_file *p, void *v); +static int rxrpc_proc_calls_show(struct seq_file *m, void *v); + +static struct seq_operations rxrpc_proc_calls_ops = { + .start = rxrpc_proc_calls_start, + .next = rxrpc_proc_calls_next, + .stop = rxrpc_proc_calls_stop, + .show = rxrpc_proc_calls_show, +}; + +static struct file_operations rxrpc_proc_calls_fops = { + .open = rxrpc_proc_calls_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static const char *rxrpc_call_states7[] = { + "complet", + "error ", + "rcv_op ", + "rcv_arg", + "got_arg", + "snd_rpl", + "fin_ack", + "snd_arg", + "rcv_rpl", + "got_rpl" +}; + +static const char *rxrpc_call_error_states7[] = { + "no_err ", + "loc_abt", + "rmt_abt", + "loc_err", + "rmt_err" +}; + +/*****************************************************************************/ +/* + * initialise the /proc/net/rxrpc/ directory + */ +int rxrpc_proc_init(void) +{ + struct proc_dir_entry *p; + + proc_rxrpc = proc_mkdir("rxrpc",proc_net); + if (!proc_rxrpc) + goto error; + proc_rxrpc->owner = THIS_MODULE; + + p = create_proc_entry("calls",0,proc_rxrpc); + if (!p) + goto error_proc; + p->proc_fops = &rxrpc_proc_calls_fops; + p->owner = THIS_MODULE; + + p = create_proc_entry("connections",0,proc_rxrpc); + if (!p) + goto error_calls; + p->proc_fops = &rxrpc_proc_conns_fops; + p->owner = THIS_MODULE; + + p = create_proc_entry("peers",0,proc_rxrpc); + if (!p) + goto error_calls; + p->proc_fops = &rxrpc_proc_peers_fops; + p->owner = THIS_MODULE; + + p = create_proc_entry("transports",0,proc_rxrpc); + if (!p) + goto error_conns; + p->proc_fops = &rxrpc_proc_transports_fops; + p->owner = THIS_MODULE; + + return 0; + + error_conns: + remove_proc_entry("conns",proc_rxrpc); + error_calls: + remove_proc_entry("calls",proc_rxrpc); + error_proc: + remove_proc_entry("rxrpc",proc_net); + error: + return -ENOMEM; +} /* end rxrpc_proc_init() */ + +/*****************************************************************************/ +/* + * clean up the /proc/net/rxrpc/ directory + */ +void rxrpc_proc_cleanup(void) +{ + remove_proc_entry("transports",proc_rxrpc); + remove_proc_entry("peers",proc_rxrpc); + remove_proc_entry("connections",proc_rxrpc); + remove_proc_entry("calls",proc_rxrpc); + + remove_proc_entry("rxrpc",proc_net); + +} /* end rxrpc_proc_cleanup() */ + +/*****************************************************************************/ +/* + * open "/proc/net/rxrpc/transports" which provides a summary of extant transports + */ +static int rxrpc_proc_transports_open(struct inode *inode, struct file *file) +{ + struct seq_file *m; + int ret; + + ret = seq_open(file,&rxrpc_proc_transports_ops); + if (ret<0) + return ret; + + m = file->private_data; + m->private = PDE(inode)->data; + + return 0; +} /* end rxrpc_proc_transports_open() */ + +/*****************************************************************************/ +/* + * set up the iterator to start reading from the transports list and return the first item + */ +static void *rxrpc_proc_transports_start(struct seq_file *m, loff_t *_pos) +{ + struct list_head *_p; + loff_t pos = *_pos; + + /* lock the list against modification */ + down_read(&rxrpc_proc_transports_sem); + + /* allow for the header line */ + if (!pos) + return (void *)1; + pos--; + + /* find the n'th element in the list */ + list_for_each(_p,&rxrpc_proc_transports) + if (!pos--) + break; + + return _p!=&rxrpc_proc_transports ? _p : NULL; +} /* end rxrpc_proc_transports_start() */ + +/*****************************************************************************/ +/* + * move to next call in transports list + */ +static void *rxrpc_proc_transports_next(struct seq_file *p, void *v, loff_t *pos) +{ + struct list_head *_p; + + (*pos)++; + + _p = v; + _p = v==(void*)1 ? rxrpc_proc_transports.next : _p->next; + + return _p!=&rxrpc_proc_transports ? _p : NULL; +} /* end rxrpc_proc_transports_next() */ + +/*****************************************************************************/ +/* + * clean up after reading from the transports list + */ +static void rxrpc_proc_transports_stop(struct seq_file *p, void *v) +{ + up_read(&rxrpc_proc_transports_sem); + +} /* end rxrpc_proc_transports_stop() */ + +/*****************************************************************************/ +/* + * display a header line followed by a load of call lines + */ +static int rxrpc_proc_transports_show(struct seq_file *m, void *v) +{ + struct rxrpc_transport *trans = list_entry(v,struct rxrpc_transport,proc_link); + + /* display header on line 1 */ + if (v == (void *)1) { + seq_puts(m, "LOCAL USE\n"); + return 0; + } + + /* display one transport per line on subsequent lines */ + seq_printf(m,"%5hu %3d\n", + trans->port, + atomic_read(&trans->usage) + ); + + return 0; +} /* end rxrpc_proc_transports_show() */ + +/*****************************************************************************/ +/* + * open "/proc/net/rxrpc/peers" which provides a summary of extant peers + */ +static int rxrpc_proc_peers_open(struct inode *inode, struct file *file) +{ + struct seq_file *m; + int ret; + + ret = seq_open(file,&rxrpc_proc_peers_ops); + if (ret<0) + return ret; + + m = file->private_data; + m->private = PDE(inode)->data; + + return 0; +} /* end rxrpc_proc_peers_open() */ + +/*****************************************************************************/ +/* + * set up the iterator to start reading from the peers list and return the first item + */ +static void *rxrpc_proc_peers_start(struct seq_file *m, loff_t *_pos) +{ + struct list_head *_p; + loff_t pos = *_pos; + + /* lock the list against modification */ + down_read(&rxrpc_peers_sem); + + /* allow for the header line */ + if (!pos) + return (void *)1; + pos--; + + /* find the n'th element in the list */ + list_for_each(_p,&rxrpc_peers) + if (!pos--) + break; + + return _p!=&rxrpc_peers ? _p : NULL; +} /* end rxrpc_proc_peers_start() */ + +/*****************************************************************************/ +/* + * move to next conn in peers list + */ +static void *rxrpc_proc_peers_next(struct seq_file *p, void *v, loff_t *pos) +{ + struct list_head *_p; + + (*pos)++; + + _p = v; + _p = v==(void*)1 ? rxrpc_peers.next : _p->next; + + return _p!=&rxrpc_peers ? _p : NULL; +} /* end rxrpc_proc_peers_next() */ + +/*****************************************************************************/ +/* + * clean up after reading from the peers list + */ +static void rxrpc_proc_peers_stop(struct seq_file *p, void *v) +{ + up_read(&rxrpc_peers_sem); + +} /* end rxrpc_proc_peers_stop() */ + +/*****************************************************************************/ +/* + * display a header line followed by a load of conn lines + */ +static int rxrpc_proc_peers_show(struct seq_file *m, void *v) +{ + struct rxrpc_peer *peer = list_entry(v,struct rxrpc_peer,proc_link); + signed long timeout; + + /* display header on line 1 */ + if (v == (void *)1) { + seq_puts(m,"LOCAL REMOTE USAGE CONNS TIMEOUT MTU RTT(uS)\n"); + return 0; + } + + /* display one peer per line on subsequent lines */ + timeout = 0; + if (!list_empty(&peer->timeout.link)) + timeout = (signed long)peer->timeout.timo_jif - (signed long)jiffies; + + seq_printf(m,"%5hu %08x %5d %5d %8ld %5u %7lu\n", + peer->trans->port, + ntohl(peer->addr.s_addr), + atomic_read(&peer->usage), + atomic_read(&peer->conn_count), + timeout, + peer->if_mtu, + peer->rtt + ); + + return 0; +} /* end rxrpc_proc_peers_show() */ + +/*****************************************************************************/ +/* + * open "/proc/net/rxrpc/connections" which provides a summary of extant connections + */ +static int rxrpc_proc_conns_open(struct inode *inode, struct file *file) +{ + struct seq_file *m; + int ret; + + ret = seq_open(file,&rxrpc_proc_conns_ops); + if (ret<0) + return ret; + + m = file->private_data; + m->private = PDE(inode)->data; + + return 0; +} /* end rxrpc_proc_conns_open() */ + +/*****************************************************************************/ +/* + * set up the iterator to start reading from the conns list and return the first item + */ +static void *rxrpc_proc_conns_start(struct seq_file *m, loff_t *_pos) +{ + struct list_head *_p; + loff_t pos = *_pos; + + /* lock the list against modification */ + down_read(&rxrpc_conns_sem); + + /* allow for the header line */ + if (!pos) + return (void *)1; + pos--; + + /* find the n'th element in the list */ + list_for_each(_p,&rxrpc_conns) + if (!pos--) + break; + + return _p!=&rxrpc_conns ? _p : NULL; +} /* end rxrpc_proc_conns_start() */ + +/*****************************************************************************/ +/* + * move to next conn in conns list + */ +static void *rxrpc_proc_conns_next(struct seq_file *p, void *v, loff_t *pos) +{ + struct list_head *_p; + + (*pos)++; + + _p = v; + _p = v==(void*)1 ? rxrpc_conns.next : _p->next; + + return _p!=&rxrpc_conns ? _p : NULL; +} /* end rxrpc_proc_conns_next() */ + +/*****************************************************************************/ +/* + * clean up after reading from the conns list + */ +static void rxrpc_proc_conns_stop(struct seq_file *p, void *v) +{ + up_read(&rxrpc_conns_sem); + +} /* end rxrpc_proc_conns_stop() */ + +/*****************************************************************************/ +/* + * display a header line followed by a load of conn lines + */ +static int rxrpc_proc_conns_show(struct seq_file *m, void *v) +{ + struct rxrpc_connection *conn = list_entry(v,struct rxrpc_connection,proc_link); + signed long timeout; + + /* display header on line 1 */ + if (v == (void *)1) { + seq_puts(m, + "LOCAL REMOTE RPORT SRVC CONN END SERIALNO CALLNO MTU TIMEOUT" + "\n"); + return 0; + } + + /* display one conn per line on subsequent lines */ + timeout = 0; + if (!list_empty(&conn->timeout.link)) + timeout = (signed long)conn->timeout.timo_jif - (signed long)jiffies; + + seq_printf(m,"%5hu %08x %5hu %04hx %08x %-3.3s %08x %08x %5u %8ld\n", + conn->trans->port, + ntohl(conn->addr.sin_addr.s_addr), + ntohs(conn->addr.sin_port), + ntohs(conn->service_id), + ntohl(conn->conn_id), + conn->out_clientflag ? "CLT" : "SRV", + conn->serial_counter, + conn->call_counter, + conn->mtu_size, + timeout + ); + + return 0; +} /* end rxrpc_proc_conns_show() */ + +/*****************************************************************************/ +/* + * open "/proc/net/rxrpc/calls" which provides a summary of extant calls + */ +static int rxrpc_proc_calls_open(struct inode *inode, struct file *file) +{ + struct seq_file *m; + int ret; + + ret = seq_open(file,&rxrpc_proc_calls_ops); + if (ret<0) + return ret; + + m = file->private_data; + m->private = PDE(inode)->data; + + return 0; +} /* end rxrpc_proc_calls_open() */ + +/*****************************************************************************/ +/* + * set up the iterator to start reading from the calls list and return the first item + */ +static void *rxrpc_proc_calls_start(struct seq_file *m, loff_t *_pos) +{ + struct list_head *_p; + loff_t pos = *_pos; + + /* lock the list against modification */ + down_read(&rxrpc_calls_sem); + + /* allow for the header line */ + if (!pos) + return (void *)1; + pos--; + + /* find the n'th element in the list */ + list_for_each(_p,&rxrpc_calls) + if (!pos--) + break; + + return _p!=&rxrpc_calls ? _p : NULL; +} /* end rxrpc_proc_calls_start() */ + +/*****************************************************************************/ +/* + * move to next call in calls list + */ +static void *rxrpc_proc_calls_next(struct seq_file *p, void *v, loff_t *pos) +{ + struct list_head *_p; + + (*pos)++; + + _p = v; + _p = v==(void*)1 ? rxrpc_calls.next : _p->next; + + return _p!=&rxrpc_calls ? _p : NULL; +} /* end rxrpc_proc_calls_next() */ + +/*****************************************************************************/ +/* + * clean up after reading from the calls list + */ +static void rxrpc_proc_calls_stop(struct seq_file *p, void *v) +{ + up_read(&rxrpc_calls_sem); + +} /* end rxrpc_proc_calls_stop() */ + +/*****************************************************************************/ +/* + * display a header line followed by a load of call lines + */ +static int rxrpc_proc_calls_show(struct seq_file *m, void *v) +{ + struct rxrpc_call *call = list_entry(v,struct rxrpc_call,call_link); + + /* display header on line 1 */ + if (v == (void *)1) { + seq_puts(m, + "LOCAL REMOT SRVC CONN CALL DIR USE " + " L STATE OPCODE ABORT ERRNO\n" + ); + return 0; + } + + /* display one call per line on subsequent lines */ + seq_printf(m, + "%5hu %5hu %04hx %08x %08x %s %3u%c" + " %c %-7.7s %6d %08x %5d\n", + call->conn->trans->port, + ntohs(call->conn->addr.sin_port), + ntohs(call->conn->service_id), + ntohl(call->conn->conn_id), + ntohl(call->call_id), + call->conn->service ? "SVC" : "CLT", + atomic_read(&call->usage), + waitqueue_active(&call->waitq) ? 'w' : ' ', + call->app_last_rcv ? 'Y' : '-', + (call->app_call_state!=RXRPC_CSTATE_ERROR ? + rxrpc_call_states7[call->app_call_state] : + rxrpc_call_error_states7[call->app_err_state]), + call->app_opcode, + call->app_abort_code, + call->app_errno + ); + + return 0; +} /* end rxrpc_proc_calls_show() */ diff --git a/net/rxrpc/rxrpc_syms.c b/net/rxrpc/rxrpc_syms.c new file mode 100644 index 000000000000..3b33b7e5cbd7 --- /dev/null +++ b/net/rxrpc/rxrpc_syms.c @@ -0,0 +1,51 @@ +/* rxrpc_syms.c: exported Rx RPC layer interface symbols + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/config.h> +#include <linux/module.h> + +#include <rxrpc/transport.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/krxiod.h> + +/* call.c */ +EXPORT_SYMBOL(rxrpc_call_rcv_timeout); +EXPORT_SYMBOL(rxrpc_call_acks_timeout); +EXPORT_SYMBOL(rxrpc_call_dfr_ack_timeout); +EXPORT_SYMBOL(rxrpc_call_max_resend); +EXPORT_SYMBOL(rxrpc_call_states); +EXPORT_SYMBOL(rxrpc_call_error_states); + +EXPORT_SYMBOL(rxrpc_create_call); +EXPORT_SYMBOL(rxrpc_incoming_call); +EXPORT_SYMBOL(rxrpc_put_call); +EXPORT_SYMBOL(rxrpc_call_abort); +EXPORT_SYMBOL(rxrpc_call_read_data); +EXPORT_SYMBOL(rxrpc_call_write_data); +EXPORT_SYMBOL(rxrpc_call_flush); + +/* connection.c */ +EXPORT_SYMBOL(rxrpc_create_connection); +EXPORT_SYMBOL(rxrpc_put_connection); + +/* sysctl.c */ +EXPORT_SYMBOL(rxrpc_ktrace); +EXPORT_SYMBOL(rxrpc_kdebug); +EXPORT_SYMBOL(rxrpc_kproto); +EXPORT_SYMBOL(rxrpc_knet); + +/* transport.c */ +EXPORT_SYMBOL(rxrpc_create_transport); +EXPORT_SYMBOL(rxrpc_clear_transport); +EXPORT_SYMBOL(rxrpc_put_transport); +EXPORT_SYMBOL(rxrpc_add_service); +EXPORT_SYMBOL(rxrpc_del_service); diff --git a/net/rxrpc/sysctl.c b/net/rxrpc/sysctl.c new file mode 100644 index 000000000000..a2d0d73b59f9 --- /dev/null +++ b/net/rxrpc/sysctl.c @@ -0,0 +1,73 @@ +/* sysctl.c: Rx RPC control + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/config.h> +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <linux/sysctl.h> +#include <linux/config.h> +#include <rxrpc/types.h> +#include <rxrpc/rxrpc.h> +#include <asm/errno.h> +#include "internal.h" + +int rxrpc_ktrace; +int rxrpc_kdebug; +int rxrpc_kproto; +int rxrpc_knet; + +#ifdef CONFIG_SYSCTL +static struct ctl_table_header *rxrpc_sysctl = NULL; + +static ctl_table rxrpc_sysctl_table[] = { + { 1, "kdebug", &rxrpc_kdebug, sizeof(int), 0644, NULL, &proc_dointvec }, + { 2, "ktrace", &rxrpc_ktrace, sizeof(int), 0644, NULL, &proc_dointvec }, + { 3, "kproto", &rxrpc_kproto, sizeof(int), 0644, NULL, &proc_dointvec }, + { 4, "knet", &rxrpc_knet, sizeof(int), 0644, NULL, &proc_dointvec }, + { 0 } +}; + +static ctl_table rxrpc_dir_sysctl_table[] = { + { 1, "rxrpc", NULL, 0, 0555, rxrpc_sysctl_table }, + { 0 } +}; +#endif /* CONFIG_SYSCTL */ + +/*****************************************************************************/ +/* + * initialise the sysctl stuff for Rx RPC + */ +int rxrpc_sysctl_init(void) +{ +#ifdef CONFIG_SYSCTL + rxrpc_sysctl = register_sysctl_table(rxrpc_dir_sysctl_table,0); + if (!rxrpc_sysctl) + return -ENOMEM; +#endif /* CONFIG_SYSCTL */ + + return 0; +} /* end rxrpc_sysctl_init() */ + +/*****************************************************************************/ +/* + * clean up the sysctl stuff for Rx RPC + */ +void rxrpc_sysctl_cleanup(void) +{ +#ifdef CONFIG_SYSCTL + if (rxrpc_sysctl) { + unregister_sysctl_table(rxrpc_sysctl); + rxrpc_sysctl = NULL; + } +#endif /* CONFIG_SYSCTL */ + +} /* end rxrpc_sysctl_cleanup() */ diff --git a/net/rxrpc/transport.c b/net/rxrpc/transport.c new file mode 100644 index 000000000000..f1dd614c7251 --- /dev/null +++ b/net/rxrpc/transport.c @@ -0,0 +1,824 @@ +/* transport.c: Rx Transport routines + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include <rxrpc/krxiod.h> +#include <rxrpc/krxsecd.h> +#include <linux/udp.h> +#include <linux/in.h> +#include <linux/in6.h> +#include <linux/icmp.h> +#include <net/sock.h> +#include <net/ip.h> +#if defined(CONFIG_IPV6) || defined (CONFIG_IPV6_MODULE) +#include <linux/ipv6.h> /* this should _really_ be in errqueue.h.. */ +#endif +#include <linux/errqueue.h> +#include <asm/uaccess.h> +#include <asm/checksum.h> +#include "internal.h" + +struct errormsg { + struct cmsghdr cmsg; /* control message header */ + struct sock_extended_err ee; /* extended error information */ + struct sockaddr_in icmp_src; /* ICMP packet source address */ +}; + +static spinlock_t rxrpc_transports_lock = SPIN_LOCK_UNLOCKED; +static struct list_head rxrpc_transports = LIST_HEAD_INIT(rxrpc_transports); + +__RXACCT_DECL(atomic_t rxrpc_transport_count); +LIST_HEAD(rxrpc_proc_transports); +DECLARE_RWSEM(rxrpc_proc_transports_sem); + +static void rxrpc_data_ready(struct sock *sk, int count); +static void rxrpc_error_report(struct sock *sk); +static int rxrpc_trans_receive_new_call(struct rxrpc_transport *trans, + struct list_head *msgq); +static void rxrpc_trans_receive_error_report(struct rxrpc_transport *trans); + +/*****************************************************************************/ +/* + * create a new transport endpoint using the specified UDP port + */ +int rxrpc_create_transport(unsigned short port, struct rxrpc_transport **_trans) +{ + struct rxrpc_transport *trans; + struct sockaddr_in sin; + mm_segment_t oldfs; + struct sock *sock; + int ret, opt; + + _enter("%hu",port); + + trans = kmalloc(sizeof(struct rxrpc_transport),GFP_KERNEL); + if (!trans) + return -ENOMEM; + + memset(trans,0,sizeof(struct rxrpc_transport)); + atomic_set(&trans->usage,1); + INIT_LIST_HEAD(&trans->services); + INIT_LIST_HEAD(&trans->link); + INIT_LIST_HEAD(&trans->krxiodq_link); + spin_lock_init(&trans->lock); + INIT_LIST_HEAD(&trans->peer_active); + INIT_LIST_HEAD(&trans->peer_graveyard); + spin_lock_init(&trans->peer_gylock); + init_waitqueue_head(&trans->peer_gy_waitq); + rwlock_init(&trans->peer_lock); + atomic_set(&trans->peer_count,0); + trans->port = port; + + /* create a UDP socket to be my actual transport endpoint */ + ret = sock_create(PF_INET,SOCK_DGRAM,IPPROTO_UDP,&trans->socket); + if (ret<0) + goto error; + + /* use the specified port */ + if (port) { + memset(&sin,0,sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + ret = trans->socket->ops->bind(trans->socket,(struct sockaddr *)&sin,sizeof(sin)); + if (ret<0) + goto error; + } + + opt = 1; + oldfs = get_fs(); + set_fs(KERNEL_DS); + ret = trans->socket->ops->setsockopt(trans->socket,SOL_IP,IP_RECVERR, + (char*)&opt,sizeof(opt)); + set_fs(oldfs); + + spin_lock(&rxrpc_transports_lock); + list_add(&trans->link,&rxrpc_transports); + spin_unlock(&rxrpc_transports_lock); + + /* set the socket up */ + sock = trans->socket->sk; + sock->user_data = trans; + sock->data_ready = rxrpc_data_ready; + sock->error_report = rxrpc_error_report; + + down_write(&rxrpc_proc_transports_sem); + list_add_tail(&trans->proc_link,&rxrpc_proc_transports); + up_write(&rxrpc_proc_transports_sem); + + __RXACCT(atomic_inc(&rxrpc_transport_count)); + + *_trans = trans; + _leave(" = 0 (%p)",trans); + return 0; + + error: + rxrpc_put_transport(trans); + + _leave(" = %d",ret); + + return ret; + +} /* end rxrpc_create_transport() */ + +/*****************************************************************************/ +/* + * clear the connections on a transport endpoint + */ +void rxrpc_clear_transport(struct rxrpc_transport *trans) +{ + //struct rxrpc_connection *conn; + +} /* end rxrpc_clear_transport() */ + +/*****************************************************************************/ +/* + * destroy a transport endpoint + */ +void rxrpc_put_transport(struct rxrpc_transport *trans) +{ + _enter("%p{u=%d p=%hu}",trans,atomic_read(&trans->usage),trans->port); + + if (atomic_read(&trans->usage)<=0) + BUG(); + + /* to prevent a race, the decrement and the dequeue must be effectively atomic */ + spin_lock(&rxrpc_transports_lock); + if (likely(!atomic_dec_and_test(&trans->usage))) { + spin_unlock(&rxrpc_transports_lock); + _leave(""); + return; + } + + list_del(&trans->link); + spin_unlock(&rxrpc_transports_lock); + + /* finish cleaning up the transport */ + if (trans->socket) + trans->socket->ops->shutdown(trans->socket,2); + + rxrpc_krxsecd_clear_transport(trans); + rxrpc_krxiod_dequeue_transport(trans); + + /* discard all peer information */ + rxrpc_peer_clearall(trans); + + down_write(&rxrpc_proc_transports_sem); + list_del(&trans->proc_link); + up_write(&rxrpc_proc_transports_sem); + __RXACCT(atomic_dec(&rxrpc_transport_count)); + + /* close the socket */ + if (trans->socket) { + trans->socket->sk->user_data = NULL; + sock_release(trans->socket); + trans->socket = NULL; + } + + kfree(trans); + + _leave(""); + +} /* end rxrpc_put_transport() */ + +/*****************************************************************************/ +/* + * add a service to a transport to be listened upon + */ +int rxrpc_add_service(struct rxrpc_transport *trans, struct rxrpc_service *newsrv) +{ + struct rxrpc_service *srv; + struct list_head *_p; + int ret = -EEXIST; + + _enter("%p{%hu},%p{%hu}",trans,trans->port,newsrv,newsrv->service_id); + + /* verify that the service ID is not already present */ + spin_lock(&trans->lock); + + list_for_each(_p,&trans->services) { + srv = list_entry(_p,struct rxrpc_service,link); + if (srv->service_id==newsrv->service_id) + goto out; + } + + /* okay - add the transport to the list */ + list_add_tail(&newsrv->link,&trans->services); + rxrpc_get_transport(trans); + ret = 0; + + out: + spin_unlock(&trans->lock); + + _leave("= %d",ret); + return ret; + +} /* end rxrpc_add_service() */ + +/*****************************************************************************/ +/* + * remove a service from a transport + */ +void rxrpc_del_service(struct rxrpc_transport *trans, struct rxrpc_service *srv) +{ + _enter("%p{%hu},%p{%hu}",trans,trans->port,srv,srv->service_id); + + spin_lock(&trans->lock); + list_del(&srv->link); + spin_unlock(&trans->lock); + + rxrpc_put_transport(trans); + + _leave(""); + +} /* end rxrpc_del_service() */ + +/*****************************************************************************/ +/* + * INET callback when data has been received on the socket. + */ +static void rxrpc_data_ready(struct sock *sk, int count) +{ + struct rxrpc_transport *trans; + + _enter("%p{t=%p},%d",sk,sk->user_data,count); + + /* queue the transport for attention by krxiod */ + trans = (struct rxrpc_transport *) sk->user_data; + if (trans) + rxrpc_krxiod_queue_transport(trans); + + /* wake up anyone waiting on the socket */ + if (sk->sleep && waitqueue_active(sk->sleep)) + wake_up_interruptible(sk->sleep); + + _leave(""); + +} /* end rxrpc_data_ready() */ + +/*****************************************************************************/ +/* + * INET callback when an ICMP error packet is received + * - sk->err is error (EHOSTUNREACH, EPROTO or EMSGSIZE) + */ +static void rxrpc_error_report(struct sock *sk) +{ + struct rxrpc_transport *trans; + + _enter("%p{t=%p}",sk,sk->user_data); + + /* queue the transport for attention by krxiod */ + trans = (struct rxrpc_transport *) sk->user_data; + if (trans) { + trans->error_rcvd = 1; + rxrpc_krxiod_queue_transport(trans); + } + + /* wake up anyone waiting on the socket */ + if (sk->sleep && waitqueue_active(sk->sleep)) + wake_up_interruptible(sk->sleep); + + _leave(""); + +} /* end rxrpc_error_report() */ + +/*****************************************************************************/ +/* + * split a message up, allocating message records and filling them in from the contents of a + * socket buffer + */ +static int rxrpc_incoming_msg(struct rxrpc_transport *trans, + struct sk_buff *pkt, + struct list_head *msgq) +{ + struct rxrpc_message *msg; + int ret; + + _enter(""); + + msg = kmalloc(sizeof(struct rxrpc_message),GFP_KERNEL); + if (!msg) { + _leave(" = -ENOMEM"); + return -ENOMEM; + } + + memset(msg,0,sizeof(*msg)); + atomic_set(&msg->usage,1); + list_add_tail(&msg->link,msgq); + + /* dig out the Rx routing parameters */ + if (skb_copy_bits(pkt,sizeof(struct udphdr),&msg->hdr,sizeof(msg->hdr))<0) { + ret = -EBADMSG; + goto error; + } + + msg->trans = trans; + msg->state = RXRPC_MSG_RECEIVED; + msg->stamp = pkt->stamp; + msg->seq = ntohl(msg->hdr.seq); + + /* attach the packet */ + skb_get(pkt); + msg->pkt = pkt; + + msg->offset = sizeof(struct udphdr) + sizeof(struct rxrpc_header); + msg->dsize = msg->pkt->len - msg->offset; + + _net("Rx Received packet from %s (%08x;%08x,%1x,%d,%s,%02x,%d,%d)", + msg->hdr.flags & RXRPC_CLIENT_INITIATED ? "client" : "server", + ntohl(msg->hdr.epoch), + (ntohl(msg->hdr.cid) & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT, + ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK, + ntohl(msg->hdr.callNumber), + rxrpc_pkts[msg->hdr.type], + msg->hdr.flags, + ntohs(msg->hdr.serviceId), + msg->hdr.securityIndex); + + __RXACCT(atomic_inc(&rxrpc_message_count)); + + /* split off jumbo packets */ + while (msg->hdr.type==RXRPC_PACKET_TYPE_DATA && msg->hdr.flags & RXRPC_JUMBO_PACKET) { + struct rxrpc_jumbo_header jumbo; + struct rxrpc_message *jumbomsg = msg; + + _debug("split jumbo packet"); + + /* quick sanity check */ + ret = -EBADMSG; + if (msg->dsize < RXRPC_JUMBO_DATALEN+sizeof(struct rxrpc_jumbo_header)) + goto error; + if (msg->hdr.flags & RXRPC_LAST_PACKET) + goto error; + + /* dig out the secondary header */ + if (skb_copy_bits(pkt,msg->offset+RXRPC_JUMBO_DATALEN,&jumbo,sizeof(jumbo))<0) + goto error; + + /* allocate a new message record */ + ret = -ENOMEM; + msg = kmalloc(sizeof(struct rxrpc_message),GFP_KERNEL); + if (!msg) + goto error; + + memcpy(msg,jumbomsg,sizeof(*msg)); + list_add_tail(&msg->link,msgq); + + /* adjust the jumbo packet */ + jumbomsg->dsize = RXRPC_JUMBO_DATALEN; + + /* attach the packet here too */ + skb_get(pkt); + + /* adjust the parameters */ + msg->seq++; + msg->hdr.seq = htonl(msg->seq); + msg->hdr.serial = htonl(ntohl(msg->hdr.serial) + 1); + msg->offset += RXRPC_JUMBO_DATALEN + sizeof(struct rxrpc_jumbo_header); + msg->dsize -= RXRPC_JUMBO_DATALEN + sizeof(struct rxrpc_jumbo_header); + msg->hdr.flags = jumbo.flags; + msg->hdr._rsvd = jumbo._rsvd; + + _net("Rx Split jumbo packet from %s (%08x;%08x,%1x,%d,%s,%02x,%d,%d)", + msg->hdr.flags & RXRPC_CLIENT_INITIATED ? "client" : "server", + ntohl(msg->hdr.epoch), + (ntohl(msg->hdr.cid) & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT, + ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK, + ntohl(msg->hdr.callNumber), + rxrpc_pkts[msg->hdr.type], + msg->hdr.flags, + ntohs(msg->hdr.serviceId), + msg->hdr.securityIndex); + + __RXACCT(atomic_inc(&rxrpc_message_count)); + } + + _leave(" = 0 #%d",atomic_read(&rxrpc_message_count)); + return 0; + + error: + while (!list_empty(msgq)) { + msg = list_entry(msgq->next,struct rxrpc_message,link); + list_del_init(&msg->link); + + rxrpc_put_message(msg); + } + + _leave(" = %d",ret); + return ret; +} /* end rxrpc_incoming_msg() */ + +/*****************************************************************************/ +/* + * accept a new call + * - called from krxiod in process context + */ +void rxrpc_trans_receive_packet(struct rxrpc_transport *trans) +{ + struct rxrpc_message *msg; + struct rxrpc_peer *peer; + struct sk_buff *pkt; + int ret; + u32 addr; + u16 port; + + LIST_HEAD(msgq); + + _enter("%p{%d}",trans,trans->port); + + for (;;) { + /* deal with outstanting errors first */ + if (trans->error_rcvd) + rxrpc_trans_receive_error_report(trans); + + /* attempt to receive a packet */ + pkt = skb_recv_datagram(trans->socket->sk,0,1,&ret); + if (!pkt) { + if (ret==-EAGAIN) { + _leave(" EAGAIN"); + return; + } + + /* an icmp error may have occurred */ + rxrpc_krxiod_queue_transport(trans); + _leave(" error %d\n",ret); + return; + } + + /* we'll probably need to checksum it (didn't call sock_recvmsg) */ + if (pkt->ip_summed != CHECKSUM_UNNECESSARY) { + if ((unsigned short)csum_fold(skb_checksum(pkt,0,pkt->len,pkt->csum))) { + kfree_skb(pkt); + rxrpc_krxiod_queue_transport(trans); + _leave(" CSUM failed"); + return; + } + } + + addr = pkt->nh.iph->saddr; + port = pkt->h.uh->source; + + _net("Rx Received UDP packet from %08x:%04hu",ntohl(addr),ntohs(port)); + + /* unmarshall the Rx parameters and split jumbo packets */ + ret = rxrpc_incoming_msg(trans,pkt,&msgq); + if (ret<0) { + kfree_skb(pkt); + rxrpc_krxiod_queue_transport(trans); + _leave(" bad packet"); + return; + } + + if (list_empty(&msgq)) BUG(); + + msg = list_entry(msgq.next,struct rxrpc_message,link); + + /* locate the record for the peer from which it originated */ + ret = rxrpc_peer_lookup(trans,addr,&peer); + if (ret<0) { + kdebug("Rx No connections from that peer"); + rxrpc_trans_immediate_abort(trans,msg,-EINVAL); + goto finished_msg; + } + + /* try and find a matching connection */ + ret = rxrpc_connection_lookup(peer,msg,&msg->conn); + if (ret<0) { + kdebug("Rx Unknown Connection"); + rxrpc_trans_immediate_abort(trans,msg,-EINVAL); + rxrpc_put_peer(peer); + goto finished_msg; + } + rxrpc_put_peer(peer); + + /* deal with the first packet of a new call */ + if (msg->hdr.flags & RXRPC_CLIENT_INITIATED && + msg->hdr.type==RXRPC_PACKET_TYPE_DATA && + ntohl(msg->hdr.seq)==1 + ) { + _debug("Rx New server call"); + rxrpc_trans_receive_new_call(trans,&msgq); + goto finished_msg; + } + + /* deal with subsequent packet(s) of call */ + _debug("Rx Call packet"); + while (!list_empty(&msgq)) { + msg = list_entry(msgq.next,struct rxrpc_message,link); + list_del_init(&msg->link); + + ret = rxrpc_conn_receive_call_packet(msg->conn,NULL,msg); + if (ret<0) { + rxrpc_trans_immediate_abort(trans,msg,ret); + rxrpc_put_message(msg); + goto finished_msg; + } + + rxrpc_put_message(msg); + } + + goto finished_msg; + + /* dispose of the packets */ + finished_msg: + while (!list_empty(&msgq)) { + msg = list_entry(msgq.next,struct rxrpc_message,link); + list_del_init(&msg->link); + + rxrpc_put_message(msg); + } + kfree_skb(pkt); + } + + _leave(""); + +} /* end rxrpc_trans_receive_packet() */ + +/*****************************************************************************/ +/* + * accept a new call from a client trying to connect to one of my services + * - called in process context + */ +static int rxrpc_trans_receive_new_call(struct rxrpc_transport *trans, + struct list_head *msgq) +{ + struct rxrpc_message *msg; + + _enter(""); + + /* only bother with the first packet */ + msg = list_entry(msgq->next,struct rxrpc_message,link); + list_del_init(&msg->link); + rxrpc_krxsecd_queue_incoming_call(msg); + rxrpc_put_message(msg); + + _leave(" = 0"); + + return 0; +} /* end rxrpc_trans_receive_new_call() */ + +/*****************************************************************************/ +/* + * perform an immediate abort without connection or call structures + */ +int rxrpc_trans_immediate_abort(struct rxrpc_transport *trans, + struct rxrpc_message *msg, + int error) +{ + struct rxrpc_header ahdr; + struct sockaddr_in sin; + struct msghdr msghdr; + struct iovec iov[2]; + mm_segment_t oldfs; + int len, ret; + u32 _error; + + _enter("%p,%p,%d",trans,msg,error); + + /* don't abort an abort packet */ + if (msg->hdr.type==RXRPC_PACKET_TYPE_ABORT) { + _leave(" = 0"); + return 0; + } + + _error = htonl(-error); + + /* set up the message to be transmitted */ + memcpy(&ahdr,&msg->hdr,sizeof(ahdr)); + ahdr.epoch = msg->hdr.epoch; + ahdr.serial = htonl(1); + ahdr.seq = 0; + ahdr.type = RXRPC_PACKET_TYPE_ABORT; + ahdr.flags = RXRPC_LAST_PACKET | (~msg->hdr.flags & RXRPC_CLIENT_INITIATED); + + iov[0].iov_len = sizeof(ahdr); + iov[0].iov_base = &ahdr; + iov[1].iov_len = sizeof(_error); + iov[1].iov_base = &_error; + + len = sizeof(ahdr) + sizeof(_error); + + memset(&sin,0,sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = msg->pkt->h.uh->source; + sin.sin_addr.s_addr = msg->pkt->nh.iph->saddr; + + msghdr.msg_name = &sin; + msghdr.msg_namelen = sizeof(sin); + msghdr.msg_iov = iov; + msghdr.msg_iovlen = 2; + msghdr.msg_control = NULL; + msghdr.msg_controllen = 0; + msghdr.msg_flags = MSG_DONTWAIT; + + _net("Sending message type %d of %d bytes to %08x:%d", + ahdr.type, + len, + htonl(sin.sin_addr.s_addr), + htons(sin.sin_port)); + + /* send the message */ + oldfs = get_fs(); + set_fs(KERNEL_DS); + ret = sock_sendmsg(trans->socket,&msghdr,len); + set_fs(oldfs); + + _leave(" = %d",ret); + return ret; +} /* end rxrpc_trans_immediate_abort() */ + +/*****************************************************************************/ +/* + * receive an ICMP error report and percolate it to all connections heading to the affected + * host or port + */ +static void rxrpc_trans_receive_error_report(struct rxrpc_transport *trans) +{ + struct rxrpc_connection *conn; + struct sockaddr_in sin; + struct rxrpc_peer *peer; + struct list_head connq, *_p; + struct errormsg emsg; + struct msghdr msg; + mm_segment_t oldfs; + int local, err; + u16 port; + + _enter("%p",trans); + + for (;;) { + trans->error_rcvd = 0; + + /* try and receive an error message */ + msg.msg_name = &sin; + msg.msg_namelen = sizeof(sin); + msg.msg_iov = NULL; + msg.msg_iovlen = 0; + msg.msg_control = &emsg; + msg.msg_controllen = sizeof(emsg); + msg.msg_flags = 0; + + oldfs = get_fs(); + set_fs(KERNEL_DS); + err = sock_recvmsg(trans->socket,&msg,0,MSG_ERRQUEUE|MSG_DONTWAIT|MSG_TRUNC); + set_fs(oldfs); + + if (err==-EAGAIN) { + _leave(""); + return; + } + + if (err<0) { + printk("%s: unable to recv an error report: %d\n",__FUNCTION__,err); + _leave(""); + return; + } + + msg.msg_controllen = (char*)msg.msg_control - (char*)&emsg; + + if (msg.msg_controllen<sizeof(emsg.cmsg) || msg.msg_namelen<sizeof(sin)) { + printk("%s: short control message (nlen=%u clen=%u fl=%x)\n", + __FUNCTION__,msg.msg_namelen,msg.msg_controllen,msg.msg_flags); + continue; + } + + _net("Rx Received control message { len=%u level=%u type=%u }", + emsg.cmsg.cmsg_len,emsg.cmsg.cmsg_level,emsg.cmsg.cmsg_type); + + if (sin.sin_family!=AF_INET) { + printk("Rx Ignoring error report with non-INET address (fam=%u)", + sin.sin_family); + continue; + } + + _net("Rx Received message pertaining to host addr=%x port=%hu", + ntohl(sin.sin_addr.s_addr),ntohs(sin.sin_port)); + + if (emsg.cmsg.cmsg_level!=SOL_IP || emsg.cmsg.cmsg_type!=IP_RECVERR) { + printk("Rx Ignoring unknown error report { level=%u type=%u }", + emsg.cmsg.cmsg_level,emsg.cmsg.cmsg_type); + continue; + } + + if (msg.msg_controllen<sizeof(emsg.cmsg)+sizeof(emsg.ee)) { + printk("%s: short error message (%u)\n",__FUNCTION__,msg.msg_controllen); + _leave(""); + return; + } + + port = sin.sin_port; + + switch (emsg.ee.ee_origin) { + case SO_EE_ORIGIN_ICMP: + local = 0; + switch (emsg.ee.ee_type) { + case ICMP_DEST_UNREACH: + switch (emsg.ee.ee_code) { + case ICMP_NET_UNREACH: + _net("Rx Received ICMP Network Unreachable"); + port = 0; + err = -ENETUNREACH; + break; + case ICMP_HOST_UNREACH: + _net("Rx Received ICMP Host Unreachable"); + port = 0; + err = -EHOSTUNREACH; + break; + case ICMP_PORT_UNREACH: + _net("Rx Received ICMP Port Unreachable"); + err = -ECONNREFUSED; + break; + case ICMP_NET_UNKNOWN: + _net("Rx Received ICMP Unknown Network"); + port = 0; + err = -ENETUNREACH; + break; + case ICMP_HOST_UNKNOWN: + _net("Rx Received ICMP Unknown Host"); + port = 0; + err = -EHOSTUNREACH; + break; + default: + _net("Rx Received ICMP DestUnreach { code=%u }", + emsg.ee.ee_code); + err = emsg.ee.ee_errno; + break; + } + break; + + case ICMP_TIME_EXCEEDED: + _net("Rx Received ICMP TTL Exceeded"); + err = emsg.ee.ee_errno; + break; + + default: + _proto("Rx Received ICMP error { type=%u code=%u }", + emsg.ee.ee_type,emsg.ee.ee_code); + err = emsg.ee.ee_errno; + break; + } + break; + + case SO_EE_ORIGIN_LOCAL: + _proto("Rx Received local error { error=%d }",emsg.ee.ee_errno); + local = 1; + err = emsg.ee.ee_errno; + break; + + case SO_EE_ORIGIN_NONE: + case SO_EE_ORIGIN_ICMP6: + default: + _proto("Rx Received error report { orig=%u }",emsg.ee.ee_origin); + local = 0; + err = emsg.ee.ee_errno; + break; + } + + /* find all the connections between this transport and the affected destination */ + INIT_LIST_HEAD(&connq); + + if (rxrpc_peer_lookup(trans,sin.sin_addr.s_addr,&peer)==0) { + read_lock(&peer->conn_lock); + list_for_each(_p,&peer->conn_active) { + conn = list_entry(_p,struct rxrpc_connection,link); + if (port && conn->addr.sin_port!=port) + continue; + if (!list_empty(&conn->err_link)) + continue; + + rxrpc_get_connection(conn); + list_add_tail(&conn->err_link,&connq); + } + read_unlock(&peer->conn_lock); + + /* service all those connections */ + while (!list_empty(&connq)) { + conn = list_entry(connq.next,struct rxrpc_connection,err_link); + list_del(&conn->err_link); + + rxrpc_conn_handle_error(conn,local,err); + + rxrpc_put_connection(conn); + } + + rxrpc_put_peer(peer); + } + } + + _leave(""); + return; +} /* end rxrpc_trans_receive_error_report() */ |
