diff options
Diffstat (limited to 'extmod/modlwip.c')
| -rw-r--r-- | extmod/modlwip.c | 167 |
1 files changed, 114 insertions, 53 deletions
diff --git a/extmod/modlwip.c b/extmod/modlwip.c index f109e0029..961803f5e 100644 --- a/extmod/modlwip.c +++ b/extmod/modlwip.c @@ -286,6 +286,15 @@ static const int error_lookup_table[] = { #define MOD_NETWORK_SOCK_DGRAM (2) #define MOD_NETWORK_SOCK_RAW (3) +// Total queue length for buffered UDP/raw incoming packets. +#define LWIP_INCOMING_PACKET_QUEUE_LEN (4) + +typedef struct _lwip_incoming_packet_t { + struct pbuf *pbuf; + ip_addr_t peer_addr; + uint16_t peer_port; +} lwip_incoming_packet_t; + typedef struct _lwip_socket_obj_t { mp_obj_base_t base; @@ -294,8 +303,11 @@ typedef struct _lwip_socket_obj_t { struct udp_pcb *udp; struct raw_pcb *raw; } pcb; + + // Data structure that holds incoming pbuf's. + // Each socket type has different state that it needs to keep track of. volatile union { - struct pbuf *pbuf; + // TCP listening sockets have a queue of incoming connections, implemented as a ringbuffer. struct { uint8_t alloc; uint8_t iget; @@ -305,10 +317,23 @@ typedef struct _lwip_socket_obj_t { struct tcp_pcb **array; // if alloc != 0 } tcp; } connection; + + // Connected TCP sockets have a single incoming pbuf that new data is appended to. + struct { + struct pbuf *pbuf; + } tcp; + + // UDP and raw sockets have a queue of incoming pbuf's, implemented as a ringbuffer. + struct { + uint8_t iget; // ringbuffer read index + uint8_t iput; // ringbuffer write index + lwip_incoming_packet_t *array; + } udp_raw; } incoming; + mp_obj_t callback; - ip_addr_t peer; - mp_uint_t peer_port; + ip_addr_t tcp_peer_addr; + mp_uint_t tcp_peer_port; mp_uint_t timeout; uint16_t recv_offset; @@ -347,9 +372,21 @@ static void lwip_socket_free_incoming(lwip_socket_obj_t *socket) { && socket->pcb.tcp->state == LISTEN; if (!socket_is_listener) { - if (socket->incoming.pbuf != NULL) { - pbuf_free(socket->incoming.pbuf); - socket->incoming.pbuf = NULL; + if (socket->type == MOD_NETWORK_SOCK_STREAM) { + if (socket->incoming.tcp.pbuf != NULL) { + pbuf_free(socket->incoming.tcp.pbuf); + socket->incoming.tcp.pbuf = NULL; + } + } else { + for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) { + lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[i]; + if (slot->pbuf != NULL) { + pbuf_free(slot->pbuf); + slot->pbuf = NULL; + } + } + socket->incoming.udp_raw.iget = 0; + socket->incoming.udp_raw.iput = 0; } } else { uint8_t alloc = socket->incoming.connection.alloc; @@ -407,6 +444,19 @@ static inline void exec_user_callback(lwip_socket_obj_t *socket) { } } +static void udp_raw_incoming(lwip_socket_obj_t *socket, struct pbuf *p, const ip_addr_t *addr, u16_t port) { + lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iput]; + if (slot->pbuf != NULL) { + // No room in the inn, drop the packet. + pbuf_free(p); + } else { + slot->pbuf = p; + slot->peer_addr = *addr; + slot->peer_port = port; + socket->incoming.udp_raw.iput = (socket->incoming.udp_raw.iput + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN; + } +} + #if MICROPY_PY_LWIP_SOCK_RAW // Callback for incoming raw packets. #if LWIP_VERSION_MAJOR < 2 @@ -416,13 +466,7 @@ static u8_t _lwip_raw_incoming(void *arg, struct raw_pcb *pcb, struct pbuf *p, c #endif { lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg; - - if (socket->incoming.pbuf != NULL) { - pbuf_free(p); - } else { - socket->incoming.pbuf = p; - memcpy(&socket->peer, addr, sizeof(socket->peer)); - } + udp_raw_incoming(socket, p, addr, 0); return 1; // we ate the packet } #endif @@ -436,15 +480,7 @@ static void _lwip_udp_incoming(void *arg, struct udp_pcb *upcb, struct pbuf *p, #endif { lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg; - - if (socket->incoming.pbuf != NULL) { - // That's why they call it "unreliable". No room in the inn, drop the packet. - pbuf_free(p); - } else { - socket->incoming.pbuf = p; - socket->peer_port = (mp_uint_t)port; - memcpy(&socket->peer, addr, sizeof(socket->peer)); - } + udp_raw_incoming(socket, p, addr, port); } // Callback for general tcp errors. @@ -562,13 +598,13 @@ static err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err return ERR_OK; } - if (socket->incoming.pbuf == NULL) { - socket->incoming.pbuf = p; + if (socket->incoming.tcp.pbuf == NULL) { + socket->incoming.tcp.pbuf = p; } else { #ifdef SOCKET_SINGLE_PBUF return ERR_BUF; #else - pbuf_cat(socket->incoming.pbuf, p); + pbuf_cat(socket->incoming.tcp.pbuf, p); #endif } @@ -639,7 +675,9 @@ static mp_uint_t lwip_raw_udp_send(lwip_socket_obj_t *socket, const byte *buf, m // Helper function for recv/recvfrom to handle raw/UDP packets static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_t len, ip_addr_t *ip, mp_uint_t *port, int *_errno) { - if (socket->incoming.pbuf == NULL) { + lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget]; + + if (slot->pbuf == NULL) { if (socket->timeout == 0) { // Non-blocking socket. *_errno = MP_EAGAIN; @@ -648,7 +686,7 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u // Wait for data to arrive on UDP socket. mp_uint_t start = mp_hal_ticks_ms(); - while (socket->incoming.pbuf == NULL) { + while (slot->pbuf == NULL) { if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) { *_errno = MP_ETIMEDOUT; return -1; @@ -658,17 +696,18 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u } if (ip != NULL) { - memcpy(ip, &socket->peer, sizeof(socket->peer)); - *port = socket->peer_port; + *ip = slot->peer_addr; + *port = slot->peer_port; } - struct pbuf *p = socket->incoming.pbuf; + struct pbuf *p = slot->pbuf; MICROPY_PY_LWIP_ENTER u16_t result = pbuf_copy_partial(p, buf, ((p->tot_len > len) ? len : p->tot_len), 0); pbuf_free(p); - socket->incoming.pbuf = NULL; + slot->pbuf = NULL; + socket->incoming.udp_raw.iget = (socket->incoming.udp_raw.iget + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN; MICROPY_PY_LWIP_EXIT @@ -780,7 +819,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ // Check for any pending errors STREAM_ERROR_CHECK(socket); - if (socket->incoming.pbuf == NULL) { + if (socket->incoming.tcp.pbuf == NULL) { // Non-blocking socket if (socket->timeout == 0) { @@ -792,7 +831,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ } mp_uint_t start = mp_hal_ticks_ms(); - while (socket->state == STATE_CONNECTED && socket->incoming.pbuf == NULL) { + while (socket->state == STATE_CONNECTED && socket->incoming.tcp.pbuf == NULL) { if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) { *_errno = MP_ETIMEDOUT; return -1; @@ -801,7 +840,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ } if (socket->state == STATE_PEER_CLOSED) { - if (socket->incoming.pbuf == NULL) { + if (socket->incoming.tcp.pbuf == NULL) { // socket closed and no data left in buffer return 0; } @@ -819,7 +858,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ assert(socket->pcb.tcp != NULL); - struct pbuf *p = socket->incoming.pbuf; + struct pbuf *p = socket->incoming.tcp.pbuf; mp_uint_t remaining = p->len - socket->recv_offset; if (len > remaining) { @@ -830,7 +869,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ remaining -= len; if (remaining == 0) { - socket->incoming.pbuf = p->next; + socket->incoming.tcp.pbuf = p->next; // If we don't ref here, free() will free the entire chain, // if we ref, it does what we need: frees 1st buf, and decrements // next buf's refcount back to 1. @@ -854,8 +893,18 @@ static const mp_obj_type_t lwip_socket_type; static void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) { lwip_socket_obj_t *self = MP_OBJ_TO_PTR(self_in); - mp_printf(print, "<socket state=%d timeout=%d incoming=%p off=%d>", self->state, self->timeout, - self->incoming.pbuf, self->recv_offset); + mp_printf(print, "<socket state=%d timeout=%d incoming=", self->state, self->timeout); + if (self->type == MOD_NETWORK_SOCK_STREAM) { + mp_printf(print, "%p off=%d>", self->incoming.tcp.pbuf, self->recv_offset); + } else { + int num_in_queue = 0; + for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) { + if (self->incoming.udp_raw.array[i].pbuf != NULL) { + ++num_in_queue; + } + } + mp_printf(print, "%d>", num_in_queue); + } } // FIXME: Only supports two arguments at present @@ -884,16 +933,22 @@ static mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, size_t n_args, s socket->incoming.connection.tcp.item = NULL; break; case MOD_NETWORK_SOCK_DGRAM: - socket->pcb.udp = udp_new(); - socket->incoming.pbuf = NULL; - break; #if MICROPY_PY_LWIP_SOCK_RAW - case MOD_NETWORK_SOCK_RAW: { - mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]); - socket->pcb.raw = raw_new(proto); - break; - } + case MOD_NETWORK_SOCK_RAW: #endif + if (socket->type == MOD_NETWORK_SOCK_DGRAM) { + socket->pcb.udp = udp_new(); + } + #if MICROPY_PY_LWIP_SOCK_RAW + else { + mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]); + socket->pcb.raw = raw_new(proto); + } + #endif + socket->incoming.udp_raw.iget = 0; + socket->incoming.udp_raw.iput = 0; + socket->incoming.udp_raw.array = m_new0(lwip_incoming_packet_t, LWIP_INCOMING_PACKET_QUEUE_LEN); + break; default: mp_raise_OSError(MP_EINVAL); } @@ -1075,7 +1130,7 @@ static mp_obj_t lwip_socket_accept(mp_obj_t self_in) { // ...and set up the new socket for it. socket2->domain = MOD_NETWORK_AF_INET; socket2->type = MOD_NETWORK_SOCK_STREAM; - socket2->incoming.pbuf = NULL; + socket2->incoming.tcp.pbuf = NULL; socket2->timeout = socket->timeout; socket2->state = STATE_CONNECTED; socket2->recv_offset = 0; @@ -1130,8 +1185,8 @@ static mp_obj_t lwip_socket_connect(mp_obj_t self_in, mp_obj_t addr_in) { socket->state = STATE_NEW; mp_raise_OSError(error_lookup_table[-err]); } - socket->peer_port = (mp_uint_t)port; - memcpy(&socket->peer, &dest, sizeof(socket->peer)); + socket->tcp_peer_addr = dest; + socket->tcp_peer_port = (mp_uint_t)port; MICROPY_PY_LWIP_EXIT // And now we wait... @@ -1299,8 +1354,8 @@ static mp_obj_t lwip_socket_recvfrom(mp_obj_t self_in, mp_obj_t len_in) { mp_uint_t ret = 0; switch (socket->type) { case MOD_NETWORK_SOCK_STREAM: { - memcpy(&ip, &socket->peer, sizeof(socket->peer)); - port = (mp_uint_t)socket->peer_port; + ip = socket->tcp_peer_addr; + port = (mp_uint_t)socket->tcp_peer_port; ret = lwip_tcp_receive(socket, (byte *)vstr.buf, len, &_errno); break; } @@ -1537,9 +1592,15 @@ static mp_uint_t lwip_socket_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_ if (lwip_socket_incoming_array(socket)[socket->incoming.connection.iget] != NULL) { ret |= MP_STREAM_POLL_RD; } + } else if (socket->type == MOD_NETWORK_SOCK_STREAM) { + // For TCP sockets there is just one slot for incoming data + if (socket->incoming.tcp.pbuf != NULL) { + ret |= MP_STREAM_POLL_RD; + } } else { - // Otherwise there is just one slot for incoming data - if (socket->incoming.pbuf != NULL) { + // Otherwise for UDP/raw there is a queue of incoming data + lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget]; + if (slot->pbuf != NULL) { ret |= MP_STREAM_POLL_RD; } } |
