diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/multi_espnow/10_simple_data.py | 57 | ||||
| -rw-r--r-- | tests/multi_espnow/10_simple_data.py.exp | 6 | ||||
| -rw-r--r-- | tests/multi_espnow/20_send_echo.py | 93 | ||||
| -rw-r--r-- | tests/multi_espnow/20_send_echo.py.exp | 21 | ||||
| -rw-r--r-- | tests/multi_espnow/30_lmk_echo.py | 130 | ||||
| -rw-r--r-- | tests/multi_espnow/30_lmk_echo.py.exp | 8 | ||||
| -rw-r--r-- | tests/multi_espnow/40_recv_test.py | 113 | ||||
| -rw-r--r-- | tests/multi_espnow/40_recv_test.py.exp | 14 | ||||
| -rw-r--r-- | tests/multi_espnow/50_esp32_rssi_test.py | 114 | ||||
| -rw-r--r-- | tests/multi_espnow/50_esp32_rssi_test.py.exp | 10 | ||||
| -rw-r--r-- | tests/multi_espnow/60_irq_test.py | 117 | ||||
| -rw-r--r-- | tests/multi_espnow/60_irq_test.py.exp | 8 | ||||
| -rw-r--r-- | tests/multi_espnow/80_uasyncio_client.py | 110 | ||||
| -rw-r--r-- | tests/multi_espnow/80_uasyncio_client.py.exp | 18 | ||||
| -rw-r--r-- | tests/multi_espnow/81_uasyncio_server.py | 96 | ||||
| -rw-r--r-- | tests/multi_espnow/81_uasyncio_server.py.exp | 11 | ||||
| -rw-r--r-- | tests/multi_espnow/90_memory_test.py | 108 | ||||
| -rw-r--r-- | tests/multi_espnow/90_memory_test.py.exp | 25 |
18 files changed, 1059 insertions, 0 deletions
diff --git a/tests/multi_espnow/10_simple_data.py b/tests/multi_espnow/10_simple_data.py new file mode 100644 index 000000000..1d218fe98 --- /dev/null +++ b/tests/multi_espnow/10_simple_data.py @@ -0,0 +1,57 @@ +# Simple test of a ESPnow server and client transferring data. +# This test works with ESP32 or ESP8266 as server or client. + +try: + import network + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + e.set_pmk(default_pmk) + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + peer, msg1 = e.recv(timeout_ms) + if msg1 is None: + print("e.recv({timeout_ms}): Timeout waiting for message.") + e.active(False) + return + print(bytes(msg1)) + msg2 = b"server to client" + e.add_peer(peer) + e.send(peer, msg2) + print(bytes(msg2)) + e.active(False) + + +# Client +def instance1(): + e = init(True, False) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + msg1 = b"client to server" + e.send(peer, msg1) + print(bytes(msg1)) + peer2, msg2 = e.recv(timeout_ms) + print(bytes(msg2)) + e.active(False) diff --git a/tests/multi_espnow/10_simple_data.py.exp b/tests/multi_espnow/10_simple_data.py.exp new file mode 100644 index 000000000..71a247f04 --- /dev/null +++ b/tests/multi_espnow/10_simple_data.py.exp @@ -0,0 +1,6 @@ +--- instance0 --- +b'client to server' +b'server to client' +--- instance1 --- +b'client to server' +b'server to client' diff --git a/tests/multi_espnow/20_send_echo.py b/tests/multi_espnow/20_send_echo.py new file mode 100644 index 000000000..4a1d1624d --- /dev/null +++ b/tests/multi_espnow/20_send_echo.py @@ -0,0 +1,93 @@ +# Test of a ESPnow echo server and client transferring data. +# This test works with ESP32 or ESP8266 as server or client. + +try: + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + while True: + peer, msg = e.recv(timeout_ms) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def echo_test(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + p2, msg2 = e.recv(timeout_ms) + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + +def echo_client(e, peer, msglens): + for sync in [True, False]: + for msglen in msglens: + msg = bytearray(msglen) + if msglen > 0: + msg[0] = b"_"[0] # Random message must not start with '!' + for i in range(1, msglen): + msg[i] = random.getrandbits(8) + echo_test(e, peer, msg, sync) + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + e = init(True, False) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + echo_client(e, peer, [1, 2, 8, 100, 249, 250, 251, 0]) + echo_test(e, peer, b"!done", True) + e.active(False) diff --git a/tests/multi_espnow/20_send_echo.py.exp b/tests/multi_espnow/20_send_echo.py.exp new file mode 100644 index 000000000..e43900bcf --- /dev/null +++ b/tests/multi_espnow/20_send_echo.py.exp @@ -0,0 +1,21 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +TEST: send/recv(msglen=1,sync=True): OK +TEST: send/recv(msglen=2,sync=True): OK +TEST: send/recv(msglen=8,sync=True): OK +TEST: send/recv(msglen=100,sync=True): OK +TEST: send/recv(msglen=249,sync=True): OK +TEST: send/recv(msglen=250,sync=True): OK +TEST: send/recv(msglen=251,sync=True): ERROR: OSError: +TEST: send/recv(msglen=0,sync=True): ERROR: OSError: +TEST: send/recv(msglen=1,sync=False): OK +TEST: send/recv(msglen=2,sync=False): OK +TEST: send/recv(msglen=8,sync=False): OK +TEST: send/recv(msglen=100,sync=False): OK +TEST: send/recv(msglen=249,sync=False): OK +TEST: send/recv(msglen=250,sync=False): OK +TEST: send/recv(msglen=251,sync=False): ERROR: OSError: +TEST: send/recv(msglen=0,sync=False): ERROR: OSError: +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/30_lmk_echo.py b/tests/multi_espnow/30_lmk_echo.py new file mode 100644 index 000000000..ac8908049 --- /dev/null +++ b/tests/multi_espnow/30_lmk_echo.py @@ -0,0 +1,130 @@ +# Test of a ESPnow echo server and client transferring encrypted data. +# This test works with ESP32 or ESP8266 as server or client. + +# First instance (echo server): +# Set the shared PMK +# Set the PEERS global to our mac addresses +# Run the echo server +# First exchange an unencrypted message from the client (so we +# can get its MAC address) and echo the message back (unenecrypted). +# Then set the peer LMK so all further communications are encrypted. + +# Second instance (echo client): +# Set the shared PMK +# Send an unencrypted message to the server and wait for echo response. +# Set the LMK for the peer communications so all further comms are encrypted. +# Send random messages and compare with response from server. + +try: + import network + import random + import time + import espnow +except ImportError: + print("SKIP") + raise SystemExit + + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +default_lmk = b"0123456789abcdef" +sync = True + + +def echo_server(e): + peers = [] + while True: + # Wait for messages from the client + peer, msg = e.recv(timeout_ms) + if peer is None: + return + if peer not in peers: + # If this is first message, add the peer unencrypted + e.add_peer(peer) + + # Echo the message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if peer not in peers: + # If this is first message, add the peer encrypted + peers.append(peer) + e.del_peer(peer) + e.add_peer(peer, default_lmk) + + if msg == b"!done": + return + + +# Send a message from the client and compare with response from server. +def echo_test(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + p2, msg2 = e.recv(timeout_ms) + if p2 is None: + print("ERROR: No response from server.") + raise SystemExit + + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + +# Send some random messages to server and check the responses +def echo_client(e, peer, msglens): + for sync in [True, False]: + for msglen in msglens: + msg = bytearray(msglen) + if msglen > 0: + msg[0] = b"_"[0] # Random message must not start with '!' + for i in range(1, msglen): + msg[i] = random.getrandbits(8) + echo_test(e, peer, msg, sync) + + +# Initialise the wifi and espnow hardware and software +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +# Server +def instance0(): + e = init(True, False) + macs = [network.WLAN(i).config("mac") for i in (0, 1)] + print("Server Start") + multitest.globals(PEERS=macs) + multitest.next() + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + e = init(True, False) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + echo_test(e, peer, b"start", True) + # Wait long enough for the server to set the lmk + time.sleep(0.1) + e.del_peer(peer) + e.add_peer(peer, default_lmk) + echo_client(e, peer, [250]) + echo_test(e, peer, b"!done", True) + e.active(False) diff --git a/tests/multi_espnow/30_lmk_echo.py.exp b/tests/multi_espnow/30_lmk_echo.py.exp new file mode 100644 index 000000000..cd05fe6c7 --- /dev/null +++ b/tests/multi_espnow/30_lmk_echo.py.exp @@ -0,0 +1,8 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +TEST: send/recv(msglen=5,sync=True): OK +TEST: send/recv(msglen=250,sync=True): OK +TEST: send/recv(msglen=250,sync=False): OK +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/40_recv_test.py b/tests/multi_espnow/40_recv_test.py new file mode 100644 index 000000000..46f4f78df --- /dev/null +++ b/tests/multi_espnow/40_recv_test.py @@ -0,0 +1,113 @@ +# Test of a ESPnow echo server and client transferring data. +# This test works with ESP32 or ESP8266 as server or client. +# Explicitly tests the irecv(), rev() and recvinto() methods. + +try: + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + while True: + peer, msg = e.irecv(timeout_ms) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def client_send(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + # Instance 1 (the client) + e = init(True, False) + e.config(timeout_ms=timeout_ms) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + + print("RECVINTO() test...") + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + data = [bytearray(espnow.ADDR_LEN), bytearray(espnow.MAX_DATA_LEN)] + n = e.recvinto(data) + print("OK" if data[1] == msg else "ERROR: Received != Sent") + + print("IRECV() test...") + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + p2, msg2 = e.irecv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + print("RECV() test...") + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + p2, msg2 = e.recv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + print("ITERATOR() test...") + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + p2, msg2 = next(e) + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + # Tell the server to stop + print("DONE") + msg = b"!done" + client_send(e, peer, msg, True) + p2, msg2 = e.irecv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + e.active(False) diff --git a/tests/multi_espnow/40_recv_test.py.exp b/tests/multi_espnow/40_recv_test.py.exp new file mode 100644 index 000000000..61f220625 --- /dev/null +++ b/tests/multi_espnow/40_recv_test.py.exp @@ -0,0 +1,14 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +RECVINTO() test... +TEST: send/recv(msglen=12,sync=True): OK +IRECV() test... +TEST: send/recv(msglen=12,sync=True): OK +RECV() test... +TEST: send/recv(msglen=12,sync=True): OK +ITERATOR() test... +TEST: send/recv(msglen=12,sync=True): OK +DONE +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/50_esp32_rssi_test.py b/tests/multi_espnow/50_esp32_rssi_test.py new file mode 100644 index 000000000..6a47b540d --- /dev/null +++ b/tests/multi_espnow/50_esp32_rssi_test.py @@ -0,0 +1,114 @@ +# Test the ESP32 RSSI extensions on instance1. +# Will SKIP test if instance1 is not an ESP32. +# Instance0 may be an ESP32 or ESP8266. + +try: + import time + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + while True: + peer, msg = e.irecv(timeout_ms) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def client_send(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + # Instance 1 (the client) + e = init(True, False) + if not hasattr(e, "peers_table"): + e.active(False) + print("SKIP") + raise SystemExit + + e.config(timeout_ms=timeout_ms) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + + # assert len(e.peers) == 1 + print("IRECV() test...") + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + p2, msg2 = e.irecv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + print("RSSI test...") + if len(e.peers_table) != 1: + print("ERROR: len(ESPNow.peers_table()) != 1. ESPNow.peers_table()=", peers) + elif list(e.peers_table.keys())[0] != peer: + print("ERROR: ESPNow.peers_table().keys[0] != peer. ESPNow.peers_table()=", peers) + else: + rssi, time_ms = e.peers_table[peer] + if not -127 < rssi < 0: + print("ERROR: Invalid rssi value:", rssi) + elif time.ticks_diff(time.ticks_ms(), time_ms) > 5000: + print("ERROR: Unexpected time_ms value:", time_ms) + else: + print("OK") + + # Tell the server to stop + print("DONE") + msg = b"!done" + client_send(e, peer, msg, True) + p2, msg2 = e.irecv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + e.active(False) diff --git a/tests/multi_espnow/50_esp32_rssi_test.py.exp b/tests/multi_espnow/50_esp32_rssi_test.py.exp new file mode 100644 index 000000000..22fc4f028 --- /dev/null +++ b/tests/multi_espnow/50_esp32_rssi_test.py.exp @@ -0,0 +1,10 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +IRECV() test... +TEST: send/recv(msglen=12,sync=True): OK +RSSI test... +OK +DONE +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/60_irq_test.py b/tests/multi_espnow/60_irq_test.py new file mode 100644 index 000000000..37fc57ce4 --- /dev/null +++ b/tests/multi_espnow/60_irq_test.py @@ -0,0 +1,117 @@ +# Test of a ESPnow echo server and client transferring data. +# Test the ESP32 extemnsions. Assumes instance1 is an ESP32. +# Instance0 may be an ESP32 or ESP8266 + +try: + import network + import random + import time + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + while True: + peer, msg = e.irecv(timeout_ms) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def client_send(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +done = False + + +# Client +def instance1(): + # Instance 1 (the client) + e = init(True, False) + try: + e.irq(None) + except AttributeError: + print("SKIP") + raise SystemExit + + e.config(timeout_ms=timeout_ms) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + + def on_recv_cb(e): + global done + p2, msg2 = e.irecv(0) + print("OK" if msg2 == msg else "ERROR: Received != Sent") + done = True + + global done + print("IRQ() test...") + e.irq(on_recv_cb) + done = False + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + start = time.ticks_ms() + while not done: + if time.ticks_diff(time.ticks_ms(), start) > timeout_ms: + print("Timeout waiting for response.") + raise SystemExit + e.irq(None) + + # Tell the server to stop + print("DONE") + msg = b"!done" + client_send(e, peer, msg, True) + p2, msg2 = e.irecv() + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + e.active(False) diff --git a/tests/multi_espnow/60_irq_test.py.exp b/tests/multi_espnow/60_irq_test.py.exp new file mode 100644 index 000000000..c2be2dccf --- /dev/null +++ b/tests/multi_espnow/60_irq_test.py.exp @@ -0,0 +1,8 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +IRQ() test... +TEST: send/recv(msglen=12,sync=True): OK +DONE +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/80_uasyncio_client.py b/tests/multi_espnow/80_uasyncio_client.py new file mode 100644 index 000000000..fa2918cc0 --- /dev/null +++ b/tests/multi_espnow/80_uasyncio_client.py @@ -0,0 +1,110 @@ +# Test of a ESPnow echo server and asyncio client transferring data. +# Test will SKIP if instance1 (asyncio client) does not support asyncio. +# - eg. ESP8266 with 1MB flash. +# Instance0 is not required to support asyncio. + +try: + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + while True: + peer, msg = e.irecv(timeout_ms) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def client_send(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + print("OK") + + +def init(e, sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +async def client(e): + init(e, True, False) + e.config(timeout_ms=timeout_ms) + peer = PEERS[0] + e.add_peer(peer) + multitest.next() + + print("airecv() test...") + msgs = [] + for i in range(5): + # Send messages to the peer who will echo it back + msgs.append(bytes([random.getrandbits(8) for _ in range(12)])) + client_send(e, peer, msgs[i], True) + + for i in range(5): + mac, reply = await e.airecv() + print("OK" if reply == msgs[i] else "ERROR: Received != Sent") + + # Tell the server to stop + print("DONE") + msg = b"!done" + client_send(e, peer, msg, True) + mac, reply = await e.airecv() + print("OK" if reply == msg else "ERROR: Received != Sent") + + e.active(False) + + +# Server +def instance0(): + e = espnow.ESPNow() + init(e, True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + try: + import uasyncio as asyncio + from aioespnow import AIOESPNow + except ImportError: + print("SKIP") + raise SystemExit + asyncio.run(client(AIOESPNow())) diff --git a/tests/multi_espnow/80_uasyncio_client.py.exp b/tests/multi_espnow/80_uasyncio_client.py.exp new file mode 100644 index 000000000..05fdf8aca --- /dev/null +++ b/tests/multi_espnow/80_uasyncio_client.py.exp @@ -0,0 +1,18 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +airecv() test... +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +OK +OK +OK +OK +OK +DONE +TEST: send/recv(msglen=5,sync=True): OK +OK diff --git a/tests/multi_espnow/81_uasyncio_server.py b/tests/multi_espnow/81_uasyncio_server.py new file mode 100644 index 000000000..ee098b7f3 --- /dev/null +++ b/tests/multi_espnow/81_uasyncio_server.py @@ -0,0 +1,96 @@ +# Test of a ESPnow asyncio echo server and client transferring data. +# Test will SKIP if instance0 (asyncio echo server) does not support asyncio. +# - eg. ESP8266 with 1MB flash. +# Instance1 is not required to support asyncio. + +try: + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def client_send(e, peer, msg, sync): + print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="") + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + +def init(e, sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +async def echo_server(e): + peers = [] + async for peer, msg in e: + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the message back to the sender + if not await e.asend(peer, msg, sync): + print("ERROR: asend() failed to", peer) + return + + if msg == b"!done": + return + + +# Server +def instance0(): + try: + import uasyncio as asyncio + from aioespnow import AIOESPNow + except ImportError: + print("SKIP") + raise SystemExit + e = AIOESPNow() + init(e, True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + asyncio.run(echo_server(e)) + print("Server Done") + e.active(False) + + +def instance1(): + e = espnow.ESPNow() + init(e, True, False) + peer = PEERS[0] + e.add_peer(peer) + multitest.next() + + for i in range(5): + msg = bytes([random.getrandbits(8) for _ in range(12)]) + client_send(e, peer, msg, True) + p2, msg2 = e.irecv(timeout_ms) + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + # Tell the server to stop + print("DONE") + msg = b"!done" + client_send(e, peer, msg, True) + p2, msg2 = e.irecv(timeout_ms) + print("OK" if msg2 == msg else "ERROR: Received != Sent") + + e.active(False) diff --git a/tests/multi_espnow/81_uasyncio_server.py.exp b/tests/multi_espnow/81_uasyncio_server.py.exp new file mode 100644 index 000000000..abe34fc42 --- /dev/null +++ b/tests/multi_espnow/81_uasyncio_server.py.exp @@ -0,0 +1,11 @@ +--- instance0 --- +Server Start +Server Done +--- instance1 --- +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +TEST: send/recv(msglen=12,sync=True): OK +DONE +TEST: send/recv(msglen=5,sync=True): OK diff --git a/tests/multi_espnow/90_memory_test.py b/tests/multi_espnow/90_memory_test.py new file mode 100644 index 000000000..5e80eb0fd --- /dev/null +++ b/tests/multi_espnow/90_memory_test.py @@ -0,0 +1,108 @@ +# Test of a ESPnow echo server and client transferring data. +# This test works with ESP32 or ESP8266 as server or client. + +try: + import network + import random + import espnow +except ImportError: + print("SKIP") + raise SystemExit + +# Set read timeout to 5 seconds +timeout_ms = 5000 +default_pmk = b"MicroPyth0nRules" +sync = True + + +def echo_server(e): + peers = [] + i = 0 + while True: + peer, msg = e.irecv(timeout_ms) + i += 1 + if i % 10 == 0: + print("OK:", i) + if peer is None: + return + if peer not in peers: + peers.append(peer) + e.add_peer(peer) + + # Echo the MAC and message back to the sender + if not e.send(peer, msg, sync): + print("ERROR: send() failed to", peer) + return + + if msg == b"!done": + return + + +def echo_test(e, peer, msg, sync): + try: + if not e.send(peer, msg, sync): + print("ERROR: Send failed.") + return + except OSError as exc: + # Don't print exc as it is differs for esp32 and esp8266 + print("ERROR: OSError:") + return + + p2, msg2 = e.irecv(timeout_ms) + if msg2 != msg: + print("ERROR: Received != Sent") + + +def echo_client(e, peer, msglens): + for sync in [True]: + for msglen in msglens: + msg = bytearray(msglen) + if msglen > 0: + msg[0] = b"_"[0] # Random message must not start with '!' + for i in range(1, msglen): + msg[i] = random.getrandbits(8) + echo_test(e, peer, msg, sync) + + +def init(sta_active=True, ap_active=False): + wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]] + e = espnow.ESPNow() + e.active(True) + e.set_pmk(default_pmk) + wlans[0].active(sta_active) + wlans[1].active(ap_active) + wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP + return e + + +# Server +def instance0(): + e = init(True, False) + multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)]) + multitest.next() + print("Server Start") + echo_server(e) + print("Server Done") + e.active(False) + + +# Client +def instance1(): + e = init(True, False) + multitest.next() + peer = PEERS[0] + e.add_peer(peer) + echo_test(e, peer, b"ping", True) + gc.collect() + mem_start = gc.mem_alloc() + for i in range(10): + echo_client(e, peer, [250] * 10) + print("OK:", (i + 1) * 10) + echo_test(e, peer, b"!done", True) + gc.collect() + mem_end = gc.mem_alloc() + if mem_end - mem_start < 1024: + print("OK: Less than 1024 bytes consumed") + else: + print("Error: Memory consumed is", mem_end - mem_start) + e.active(False) diff --git a/tests/multi_espnow/90_memory_test.py.exp b/tests/multi_espnow/90_memory_test.py.exp new file mode 100644 index 000000000..1ea8c2959 --- /dev/null +++ b/tests/multi_espnow/90_memory_test.py.exp @@ -0,0 +1,25 @@ +--- instance0 --- +Server Start +OK: 10 +OK: 20 +OK: 30 +OK: 40 +OK: 50 +OK: 60 +OK: 70 +OK: 80 +OK: 90 +OK: 100 +Server Done +--- instance1 --- +OK: 10 +OK: 20 +OK: 30 +OK: 40 +OK: 50 +OK: 60 +OK: 70 +OK: 80 +OK: 90 +OK: 100 +OK: Less than 1024 bytes consumed |
