summaryrefslogtreecommitdiff
path: root/extmod/modlwip.c
diff options
context:
space:
mode:
Diffstat (limited to 'extmod/modlwip.c')
-rw-r--r--extmod/modlwip.c167
1 files changed, 114 insertions, 53 deletions
diff --git a/extmod/modlwip.c b/extmod/modlwip.c
index f109e0029..961803f5e 100644
--- a/extmod/modlwip.c
+++ b/extmod/modlwip.c
@@ -286,6 +286,15 @@ static const int error_lookup_table[] = {
#define MOD_NETWORK_SOCK_DGRAM (2)
#define MOD_NETWORK_SOCK_RAW (3)
+// Total queue length for buffered UDP/raw incoming packets.
+#define LWIP_INCOMING_PACKET_QUEUE_LEN (4)
+
+typedef struct _lwip_incoming_packet_t {
+ struct pbuf *pbuf;
+ ip_addr_t peer_addr;
+ uint16_t peer_port;
+} lwip_incoming_packet_t;
+
typedef struct _lwip_socket_obj_t {
mp_obj_base_t base;
@@ -294,8 +303,11 @@ typedef struct _lwip_socket_obj_t {
struct udp_pcb *udp;
struct raw_pcb *raw;
} pcb;
+
+ // Data structure that holds incoming pbuf's.
+ // Each socket type has different state that it needs to keep track of.
volatile union {
- struct pbuf *pbuf;
+ // TCP listening sockets have a queue of incoming connections, implemented as a ringbuffer.
struct {
uint8_t alloc;
uint8_t iget;
@@ -305,10 +317,23 @@ typedef struct _lwip_socket_obj_t {
struct tcp_pcb **array; // if alloc != 0
} tcp;
} connection;
+
+ // Connected TCP sockets have a single incoming pbuf that new data is appended to.
+ struct {
+ struct pbuf *pbuf;
+ } tcp;
+
+ // UDP and raw sockets have a queue of incoming pbuf's, implemented as a ringbuffer.
+ struct {
+ uint8_t iget; // ringbuffer read index
+ uint8_t iput; // ringbuffer write index
+ lwip_incoming_packet_t *array;
+ } udp_raw;
} incoming;
+
mp_obj_t callback;
- ip_addr_t peer;
- mp_uint_t peer_port;
+ ip_addr_t tcp_peer_addr;
+ mp_uint_t tcp_peer_port;
mp_uint_t timeout;
uint16_t recv_offset;
@@ -347,9 +372,21 @@ static void lwip_socket_free_incoming(lwip_socket_obj_t *socket) {
&& socket->pcb.tcp->state == LISTEN;
if (!socket_is_listener) {
- if (socket->incoming.pbuf != NULL) {
- pbuf_free(socket->incoming.pbuf);
- socket->incoming.pbuf = NULL;
+ if (socket->type == MOD_NETWORK_SOCK_STREAM) {
+ if (socket->incoming.tcp.pbuf != NULL) {
+ pbuf_free(socket->incoming.tcp.pbuf);
+ socket->incoming.tcp.pbuf = NULL;
+ }
+ } else {
+ for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) {
+ lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[i];
+ if (slot->pbuf != NULL) {
+ pbuf_free(slot->pbuf);
+ slot->pbuf = NULL;
+ }
+ }
+ socket->incoming.udp_raw.iget = 0;
+ socket->incoming.udp_raw.iput = 0;
}
} else {
uint8_t alloc = socket->incoming.connection.alloc;
@@ -407,6 +444,19 @@ static inline void exec_user_callback(lwip_socket_obj_t *socket) {
}
}
+static void udp_raw_incoming(lwip_socket_obj_t *socket, struct pbuf *p, const ip_addr_t *addr, u16_t port) {
+ lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iput];
+ if (slot->pbuf != NULL) {
+ // No room in the inn, drop the packet.
+ pbuf_free(p);
+ } else {
+ slot->pbuf = p;
+ slot->peer_addr = *addr;
+ slot->peer_port = port;
+ socket->incoming.udp_raw.iput = (socket->incoming.udp_raw.iput + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN;
+ }
+}
+
#if MICROPY_PY_LWIP_SOCK_RAW
// Callback for incoming raw packets.
#if LWIP_VERSION_MAJOR < 2
@@ -416,13 +466,7 @@ static u8_t _lwip_raw_incoming(void *arg, struct raw_pcb *pcb, struct pbuf *p, c
#endif
{
lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg;
-
- if (socket->incoming.pbuf != NULL) {
- pbuf_free(p);
- } else {
- socket->incoming.pbuf = p;
- memcpy(&socket->peer, addr, sizeof(socket->peer));
- }
+ udp_raw_incoming(socket, p, addr, 0);
return 1; // we ate the packet
}
#endif
@@ -436,15 +480,7 @@ static void _lwip_udp_incoming(void *arg, struct udp_pcb *upcb, struct pbuf *p,
#endif
{
lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg;
-
- if (socket->incoming.pbuf != NULL) {
- // That's why they call it "unreliable". No room in the inn, drop the packet.
- pbuf_free(p);
- } else {
- socket->incoming.pbuf = p;
- socket->peer_port = (mp_uint_t)port;
- memcpy(&socket->peer, addr, sizeof(socket->peer));
- }
+ udp_raw_incoming(socket, p, addr, port);
}
// Callback for general tcp errors.
@@ -562,13 +598,13 @@ static err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err
return ERR_OK;
}
- if (socket->incoming.pbuf == NULL) {
- socket->incoming.pbuf = p;
+ if (socket->incoming.tcp.pbuf == NULL) {
+ socket->incoming.tcp.pbuf = p;
} else {
#ifdef SOCKET_SINGLE_PBUF
return ERR_BUF;
#else
- pbuf_cat(socket->incoming.pbuf, p);
+ pbuf_cat(socket->incoming.tcp.pbuf, p);
#endif
}
@@ -639,7 +675,9 @@ static mp_uint_t lwip_raw_udp_send(lwip_socket_obj_t *socket, const byte *buf, m
// Helper function for recv/recvfrom to handle raw/UDP packets
static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_t len, ip_addr_t *ip, mp_uint_t *port, int *_errno) {
- if (socket->incoming.pbuf == NULL) {
+ lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget];
+
+ if (slot->pbuf == NULL) {
if (socket->timeout == 0) {
// Non-blocking socket.
*_errno = MP_EAGAIN;
@@ -648,7 +686,7 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u
// Wait for data to arrive on UDP socket.
mp_uint_t start = mp_hal_ticks_ms();
- while (socket->incoming.pbuf == NULL) {
+ while (slot->pbuf == NULL) {
if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) {
*_errno = MP_ETIMEDOUT;
return -1;
@@ -658,17 +696,18 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u
}
if (ip != NULL) {
- memcpy(ip, &socket->peer, sizeof(socket->peer));
- *port = socket->peer_port;
+ *ip = slot->peer_addr;
+ *port = slot->peer_port;
}
- struct pbuf *p = socket->incoming.pbuf;
+ struct pbuf *p = slot->pbuf;
MICROPY_PY_LWIP_ENTER
u16_t result = pbuf_copy_partial(p, buf, ((p->tot_len > len) ? len : p->tot_len), 0);
pbuf_free(p);
- socket->incoming.pbuf = NULL;
+ slot->pbuf = NULL;
+ socket->incoming.udp_raw.iget = (socket->incoming.udp_raw.iget + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN;
MICROPY_PY_LWIP_EXIT
@@ -780,7 +819,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
// Check for any pending errors
STREAM_ERROR_CHECK(socket);
- if (socket->incoming.pbuf == NULL) {
+ if (socket->incoming.tcp.pbuf == NULL) {
// Non-blocking socket
if (socket->timeout == 0) {
@@ -792,7 +831,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
}
mp_uint_t start = mp_hal_ticks_ms();
- while (socket->state == STATE_CONNECTED && socket->incoming.pbuf == NULL) {
+ while (socket->state == STATE_CONNECTED && socket->incoming.tcp.pbuf == NULL) {
if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) {
*_errno = MP_ETIMEDOUT;
return -1;
@@ -801,7 +840,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
}
if (socket->state == STATE_PEER_CLOSED) {
- if (socket->incoming.pbuf == NULL) {
+ if (socket->incoming.tcp.pbuf == NULL) {
// socket closed and no data left in buffer
return 0;
}
@@ -819,7 +858,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
assert(socket->pcb.tcp != NULL);
- struct pbuf *p = socket->incoming.pbuf;
+ struct pbuf *p = socket->incoming.tcp.pbuf;
mp_uint_t remaining = p->len - socket->recv_offset;
if (len > remaining) {
@@ -830,7 +869,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
remaining -= len;
if (remaining == 0) {
- socket->incoming.pbuf = p->next;
+ socket->incoming.tcp.pbuf = p->next;
// If we don't ref here, free() will free the entire chain,
// if we ref, it does what we need: frees 1st buf, and decrements
// next buf's refcount back to 1.
@@ -854,8 +893,18 @@ static const mp_obj_type_t lwip_socket_type;
static void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
lwip_socket_obj_t *self = MP_OBJ_TO_PTR(self_in);
- mp_printf(print, "<socket state=%d timeout=%d incoming=%p off=%d>", self->state, self->timeout,
- self->incoming.pbuf, self->recv_offset);
+ mp_printf(print, "<socket state=%d timeout=%d incoming=", self->state, self->timeout);
+ if (self->type == MOD_NETWORK_SOCK_STREAM) {
+ mp_printf(print, "%p off=%d>", self->incoming.tcp.pbuf, self->recv_offset);
+ } else {
+ int num_in_queue = 0;
+ for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) {
+ if (self->incoming.udp_raw.array[i].pbuf != NULL) {
+ ++num_in_queue;
+ }
+ }
+ mp_printf(print, "%d>", num_in_queue);
+ }
}
// FIXME: Only supports two arguments at present
@@ -884,16 +933,22 @@ static mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, size_t n_args, s
socket->incoming.connection.tcp.item = NULL;
break;
case MOD_NETWORK_SOCK_DGRAM:
- socket->pcb.udp = udp_new();
- socket->incoming.pbuf = NULL;
- break;
#if MICROPY_PY_LWIP_SOCK_RAW
- case MOD_NETWORK_SOCK_RAW: {
- mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]);
- socket->pcb.raw = raw_new(proto);
- break;
- }
+ case MOD_NETWORK_SOCK_RAW:
#endif
+ if (socket->type == MOD_NETWORK_SOCK_DGRAM) {
+ socket->pcb.udp = udp_new();
+ }
+ #if MICROPY_PY_LWIP_SOCK_RAW
+ else {
+ mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]);
+ socket->pcb.raw = raw_new(proto);
+ }
+ #endif
+ socket->incoming.udp_raw.iget = 0;
+ socket->incoming.udp_raw.iput = 0;
+ socket->incoming.udp_raw.array = m_new0(lwip_incoming_packet_t, LWIP_INCOMING_PACKET_QUEUE_LEN);
+ break;
default:
mp_raise_OSError(MP_EINVAL);
}
@@ -1075,7 +1130,7 @@ static mp_obj_t lwip_socket_accept(mp_obj_t self_in) {
// ...and set up the new socket for it.
socket2->domain = MOD_NETWORK_AF_INET;
socket2->type = MOD_NETWORK_SOCK_STREAM;
- socket2->incoming.pbuf = NULL;
+ socket2->incoming.tcp.pbuf = NULL;
socket2->timeout = socket->timeout;
socket2->state = STATE_CONNECTED;
socket2->recv_offset = 0;
@@ -1130,8 +1185,8 @@ static mp_obj_t lwip_socket_connect(mp_obj_t self_in, mp_obj_t addr_in) {
socket->state = STATE_NEW;
mp_raise_OSError(error_lookup_table[-err]);
}
- socket->peer_port = (mp_uint_t)port;
- memcpy(&socket->peer, &dest, sizeof(socket->peer));
+ socket->tcp_peer_addr = dest;
+ socket->tcp_peer_port = (mp_uint_t)port;
MICROPY_PY_LWIP_EXIT
// And now we wait...
@@ -1299,8 +1354,8 @@ static mp_obj_t lwip_socket_recvfrom(mp_obj_t self_in, mp_obj_t len_in) {
mp_uint_t ret = 0;
switch (socket->type) {
case MOD_NETWORK_SOCK_STREAM: {
- memcpy(&ip, &socket->peer, sizeof(socket->peer));
- port = (mp_uint_t)socket->peer_port;
+ ip = socket->tcp_peer_addr;
+ port = (mp_uint_t)socket->tcp_peer_port;
ret = lwip_tcp_receive(socket, (byte *)vstr.buf, len, &_errno);
break;
}
@@ -1537,9 +1592,15 @@ static mp_uint_t lwip_socket_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_
if (lwip_socket_incoming_array(socket)[socket->incoming.connection.iget] != NULL) {
ret |= MP_STREAM_POLL_RD;
}
+ } else if (socket->type == MOD_NETWORK_SOCK_STREAM) {
+ // For TCP sockets there is just one slot for incoming data
+ if (socket->incoming.tcp.pbuf != NULL) {
+ ret |= MP_STREAM_POLL_RD;
+ }
} else {
- // Otherwise there is just one slot for incoming data
- if (socket->incoming.pbuf != NULL) {
+ // Otherwise for UDP/raw there is a queue of incoming data
+ lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget];
+ if (slot->pbuf != NULL) {
ret |= MP_STREAM_POLL_RD;
}
}