summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/multi_espnow/10_simple_data.py57
-rw-r--r--tests/multi_espnow/10_simple_data.py.exp6
-rw-r--r--tests/multi_espnow/20_send_echo.py93
-rw-r--r--tests/multi_espnow/20_send_echo.py.exp21
-rw-r--r--tests/multi_espnow/30_lmk_echo.py130
-rw-r--r--tests/multi_espnow/30_lmk_echo.py.exp8
-rw-r--r--tests/multi_espnow/40_recv_test.py113
-rw-r--r--tests/multi_espnow/40_recv_test.py.exp14
-rw-r--r--tests/multi_espnow/50_esp32_rssi_test.py114
-rw-r--r--tests/multi_espnow/50_esp32_rssi_test.py.exp10
-rw-r--r--tests/multi_espnow/60_irq_test.py117
-rw-r--r--tests/multi_espnow/60_irq_test.py.exp8
-rw-r--r--tests/multi_espnow/80_uasyncio_client.py110
-rw-r--r--tests/multi_espnow/80_uasyncio_client.py.exp18
-rw-r--r--tests/multi_espnow/81_uasyncio_server.py96
-rw-r--r--tests/multi_espnow/81_uasyncio_server.py.exp11
-rw-r--r--tests/multi_espnow/90_memory_test.py108
-rw-r--r--tests/multi_espnow/90_memory_test.py.exp25
18 files changed, 1059 insertions, 0 deletions
diff --git a/tests/multi_espnow/10_simple_data.py b/tests/multi_espnow/10_simple_data.py
new file mode 100644
index 000000000..1d218fe98
--- /dev/null
+++ b/tests/multi_espnow/10_simple_data.py
@@ -0,0 +1,57 @@
+# Simple test of a ESPnow server and client transferring data.
+# This test works with ESP32 or ESP8266 as server or client.
+
+try:
+ import network
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ e.set_pmk(default_pmk)
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ peer, msg1 = e.recv(timeout_ms)
+ if msg1 is None:
+ print("e.recv({timeout_ms}): Timeout waiting for message.")
+ e.active(False)
+ return
+ print(bytes(msg1))
+ msg2 = b"server to client"
+ e.add_peer(peer)
+ e.send(peer, msg2)
+ print(bytes(msg2))
+ e.active(False)
+
+
+# Client
+def instance1():
+ e = init(True, False)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+ msg1 = b"client to server"
+ e.send(peer, msg1)
+ print(bytes(msg1))
+ peer2, msg2 = e.recv(timeout_ms)
+ print(bytes(msg2))
+ e.active(False)
diff --git a/tests/multi_espnow/10_simple_data.py.exp b/tests/multi_espnow/10_simple_data.py.exp
new file mode 100644
index 000000000..71a247f04
--- /dev/null
+++ b/tests/multi_espnow/10_simple_data.py.exp
@@ -0,0 +1,6 @@
+--- instance0 ---
+b'client to server'
+b'server to client'
+--- instance1 ---
+b'client to server'
+b'server to client'
diff --git a/tests/multi_espnow/20_send_echo.py b/tests/multi_espnow/20_send_echo.py
new file mode 100644
index 000000000..4a1d1624d
--- /dev/null
+++ b/tests/multi_espnow/20_send_echo.py
@@ -0,0 +1,93 @@
+# Test of a ESPnow echo server and client transferring data.
+# This test works with ESP32 or ESP8266 as server or client.
+
+try:
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ peer, msg = e.recv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def echo_test(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+ p2, msg2 = e.recv(timeout_ms)
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+
+def echo_client(e, peer, msglens):
+ for sync in [True, False]:
+ for msglen in msglens:
+ msg = bytearray(msglen)
+ if msglen > 0:
+ msg[0] = b"_"[0] # Random message must not start with '!'
+ for i in range(1, msglen):
+ msg[i] = random.getrandbits(8)
+ echo_test(e, peer, msg, sync)
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ e = init(True, False)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+ echo_client(e, peer, [1, 2, 8, 100, 249, 250, 251, 0])
+ echo_test(e, peer, b"!done", True)
+ e.active(False)
diff --git a/tests/multi_espnow/20_send_echo.py.exp b/tests/multi_espnow/20_send_echo.py.exp
new file mode 100644
index 000000000..e43900bcf
--- /dev/null
+++ b/tests/multi_espnow/20_send_echo.py.exp
@@ -0,0 +1,21 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+TEST: send/recv(msglen=1,sync=True): OK
+TEST: send/recv(msglen=2,sync=True): OK
+TEST: send/recv(msglen=8,sync=True): OK
+TEST: send/recv(msglen=100,sync=True): OK
+TEST: send/recv(msglen=249,sync=True): OK
+TEST: send/recv(msglen=250,sync=True): OK
+TEST: send/recv(msglen=251,sync=True): ERROR: OSError:
+TEST: send/recv(msglen=0,sync=True): ERROR: OSError:
+TEST: send/recv(msglen=1,sync=False): OK
+TEST: send/recv(msglen=2,sync=False): OK
+TEST: send/recv(msglen=8,sync=False): OK
+TEST: send/recv(msglen=100,sync=False): OK
+TEST: send/recv(msglen=249,sync=False): OK
+TEST: send/recv(msglen=250,sync=False): OK
+TEST: send/recv(msglen=251,sync=False): ERROR: OSError:
+TEST: send/recv(msglen=0,sync=False): ERROR: OSError:
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/30_lmk_echo.py b/tests/multi_espnow/30_lmk_echo.py
new file mode 100644
index 000000000..ac8908049
--- /dev/null
+++ b/tests/multi_espnow/30_lmk_echo.py
@@ -0,0 +1,130 @@
+# Test of a ESPnow echo server and client transferring encrypted data.
+# This test works with ESP32 or ESP8266 as server or client.
+
+# First instance (echo server):
+# Set the shared PMK
+# Set the PEERS global to our mac addresses
+# Run the echo server
+# First exchange an unencrypted message from the client (so we
+# can get its MAC address) and echo the message back (unenecrypted).
+# Then set the peer LMK so all further communications are encrypted.
+
+# Second instance (echo client):
+# Set the shared PMK
+# Send an unencrypted message to the server and wait for echo response.
+# Set the LMK for the peer communications so all further comms are encrypted.
+# Send random messages and compare with response from server.
+
+try:
+ import network
+ import random
+ import time
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+default_lmk = b"0123456789abcdef"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ # Wait for messages from the client
+ peer, msg = e.recv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ # If this is first message, add the peer unencrypted
+ e.add_peer(peer)
+
+ # Echo the message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if peer not in peers:
+ # If this is first message, add the peer encrypted
+ peers.append(peer)
+ e.del_peer(peer)
+ e.add_peer(peer, default_lmk)
+
+ if msg == b"!done":
+ return
+
+
+# Send a message from the client and compare with response from server.
+def echo_test(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+ p2, msg2 = e.recv(timeout_ms)
+ if p2 is None:
+ print("ERROR: No response from server.")
+ raise SystemExit
+
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+
+# Send some random messages to server and check the responses
+def echo_client(e, peer, msglens):
+ for sync in [True, False]:
+ for msglen in msglens:
+ msg = bytearray(msglen)
+ if msglen > 0:
+ msg[0] = b"_"[0] # Random message must not start with '!'
+ for i in range(1, msglen):
+ msg[i] = random.getrandbits(8)
+ echo_test(e, peer, msg, sync)
+
+
+# Initialise the wifi and espnow hardware and software
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ macs = [network.WLAN(i).config("mac") for i in (0, 1)]
+ print("Server Start")
+ multitest.globals(PEERS=macs)
+ multitest.next()
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ e = init(True, False)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+ echo_test(e, peer, b"start", True)
+ # Wait long enough for the server to set the lmk
+ time.sleep(0.1)
+ e.del_peer(peer)
+ e.add_peer(peer, default_lmk)
+ echo_client(e, peer, [250])
+ echo_test(e, peer, b"!done", True)
+ e.active(False)
diff --git a/tests/multi_espnow/30_lmk_echo.py.exp b/tests/multi_espnow/30_lmk_echo.py.exp
new file mode 100644
index 000000000..cd05fe6c7
--- /dev/null
+++ b/tests/multi_espnow/30_lmk_echo.py.exp
@@ -0,0 +1,8 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+TEST: send/recv(msglen=5,sync=True): OK
+TEST: send/recv(msglen=250,sync=True): OK
+TEST: send/recv(msglen=250,sync=False): OK
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/40_recv_test.py b/tests/multi_espnow/40_recv_test.py
new file mode 100644
index 000000000..46f4f78df
--- /dev/null
+++ b/tests/multi_espnow/40_recv_test.py
@@ -0,0 +1,113 @@
+# Test of a ESPnow echo server and client transferring data.
+# This test works with ESP32 or ESP8266 as server or client.
+# Explicitly tests the irecv(), rev() and recvinto() methods.
+
+try:
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ peer, msg = e.irecv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def client_send(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ # Instance 1 (the client)
+ e = init(True, False)
+ e.config(timeout_ms=timeout_ms)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+
+ print("RECVINTO() test...")
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ data = [bytearray(espnow.ADDR_LEN), bytearray(espnow.MAX_DATA_LEN)]
+ n = e.recvinto(data)
+ print("OK" if data[1] == msg else "ERROR: Received != Sent")
+
+ print("IRECV() test...")
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ print("RECV() test...")
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.recv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ print("ITERATOR() test...")
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ p2, msg2 = next(e)
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ # Tell the server to stop
+ print("DONE")
+ msg = b"!done"
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ e.active(False)
diff --git a/tests/multi_espnow/40_recv_test.py.exp b/tests/multi_espnow/40_recv_test.py.exp
new file mode 100644
index 000000000..61f220625
--- /dev/null
+++ b/tests/multi_espnow/40_recv_test.py.exp
@@ -0,0 +1,14 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+RECVINTO() test...
+TEST: send/recv(msglen=12,sync=True): OK
+IRECV() test...
+TEST: send/recv(msglen=12,sync=True): OK
+RECV() test...
+TEST: send/recv(msglen=12,sync=True): OK
+ITERATOR() test...
+TEST: send/recv(msglen=12,sync=True): OK
+DONE
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/50_esp32_rssi_test.py b/tests/multi_espnow/50_esp32_rssi_test.py
new file mode 100644
index 000000000..6a47b540d
--- /dev/null
+++ b/tests/multi_espnow/50_esp32_rssi_test.py
@@ -0,0 +1,114 @@
+# Test the ESP32 RSSI extensions on instance1.
+# Will SKIP test if instance1 is not an ESP32.
+# Instance0 may be an ESP32 or ESP8266.
+
+try:
+ import time
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ peer, msg = e.irecv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def client_send(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ # Instance 1 (the client)
+ e = init(True, False)
+ if not hasattr(e, "peers_table"):
+ e.active(False)
+ print("SKIP")
+ raise SystemExit
+
+ e.config(timeout_ms=timeout_ms)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+
+ # assert len(e.peers) == 1
+ print("IRECV() test...")
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ print("RSSI test...")
+ if len(e.peers_table) != 1:
+ print("ERROR: len(ESPNow.peers_table()) != 1. ESPNow.peers_table()=", peers)
+ elif list(e.peers_table.keys())[0] != peer:
+ print("ERROR: ESPNow.peers_table().keys[0] != peer. ESPNow.peers_table()=", peers)
+ else:
+ rssi, time_ms = e.peers_table[peer]
+ if not -127 < rssi < 0:
+ print("ERROR: Invalid rssi value:", rssi)
+ elif time.ticks_diff(time.ticks_ms(), time_ms) > 5000:
+ print("ERROR: Unexpected time_ms value:", time_ms)
+ else:
+ print("OK")
+
+ # Tell the server to stop
+ print("DONE")
+ msg = b"!done"
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ e.active(False)
diff --git a/tests/multi_espnow/50_esp32_rssi_test.py.exp b/tests/multi_espnow/50_esp32_rssi_test.py.exp
new file mode 100644
index 000000000..22fc4f028
--- /dev/null
+++ b/tests/multi_espnow/50_esp32_rssi_test.py.exp
@@ -0,0 +1,10 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+IRECV() test...
+TEST: send/recv(msglen=12,sync=True): OK
+RSSI test...
+OK
+DONE
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/60_irq_test.py b/tests/multi_espnow/60_irq_test.py
new file mode 100644
index 000000000..37fc57ce4
--- /dev/null
+++ b/tests/multi_espnow/60_irq_test.py
@@ -0,0 +1,117 @@
+# Test of a ESPnow echo server and client transferring data.
+# Test the ESP32 extemnsions. Assumes instance1 is an ESP32.
+# Instance0 may be an ESP32 or ESP8266
+
+try:
+ import network
+ import random
+ import time
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ peer, msg = e.irecv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def client_send(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+done = False
+
+
+# Client
+def instance1():
+ # Instance 1 (the client)
+ e = init(True, False)
+ try:
+ e.irq(None)
+ except AttributeError:
+ print("SKIP")
+ raise SystemExit
+
+ e.config(timeout_ms=timeout_ms)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+
+ def on_recv_cb(e):
+ global done
+ p2, msg2 = e.irecv(0)
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+ done = True
+
+ global done
+ print("IRQ() test...")
+ e.irq(on_recv_cb)
+ done = False
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ start = time.ticks_ms()
+ while not done:
+ if time.ticks_diff(time.ticks_ms(), start) > timeout_ms:
+ print("Timeout waiting for response.")
+ raise SystemExit
+ e.irq(None)
+
+ # Tell the server to stop
+ print("DONE")
+ msg = b"!done"
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv()
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ e.active(False)
diff --git a/tests/multi_espnow/60_irq_test.py.exp b/tests/multi_espnow/60_irq_test.py.exp
new file mode 100644
index 000000000..c2be2dccf
--- /dev/null
+++ b/tests/multi_espnow/60_irq_test.py.exp
@@ -0,0 +1,8 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+IRQ() test...
+TEST: send/recv(msglen=12,sync=True): OK
+DONE
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/80_uasyncio_client.py b/tests/multi_espnow/80_uasyncio_client.py
new file mode 100644
index 000000000..fa2918cc0
--- /dev/null
+++ b/tests/multi_espnow/80_uasyncio_client.py
@@ -0,0 +1,110 @@
+# Test of a ESPnow echo server and asyncio client transferring data.
+# Test will SKIP if instance1 (asyncio client) does not support asyncio.
+# - eg. ESP8266 with 1MB flash.
+# Instance0 is not required to support asyncio.
+
+try:
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ while True:
+ peer, msg = e.irecv(timeout_ms)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def client_send(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+ print("OK")
+
+
+def init(e, sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+async def client(e):
+ init(e, True, False)
+ e.config(timeout_ms=timeout_ms)
+ peer = PEERS[0]
+ e.add_peer(peer)
+ multitest.next()
+
+ print("airecv() test...")
+ msgs = []
+ for i in range(5):
+ # Send messages to the peer who will echo it back
+ msgs.append(bytes([random.getrandbits(8) for _ in range(12)]))
+ client_send(e, peer, msgs[i], True)
+
+ for i in range(5):
+ mac, reply = await e.airecv()
+ print("OK" if reply == msgs[i] else "ERROR: Received != Sent")
+
+ # Tell the server to stop
+ print("DONE")
+ msg = b"!done"
+ client_send(e, peer, msg, True)
+ mac, reply = await e.airecv()
+ print("OK" if reply == msg else "ERROR: Received != Sent")
+
+ e.active(False)
+
+
+# Server
+def instance0():
+ e = espnow.ESPNow()
+ init(e, True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ try:
+ import uasyncio as asyncio
+ from aioespnow import AIOESPNow
+ except ImportError:
+ print("SKIP")
+ raise SystemExit
+ asyncio.run(client(AIOESPNow()))
diff --git a/tests/multi_espnow/80_uasyncio_client.py.exp b/tests/multi_espnow/80_uasyncio_client.py.exp
new file mode 100644
index 000000000..05fdf8aca
--- /dev/null
+++ b/tests/multi_espnow/80_uasyncio_client.py.exp
@@ -0,0 +1,18 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+airecv() test...
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+OK
+OK
+OK
+OK
+OK
+DONE
+TEST: send/recv(msglen=5,sync=True): OK
+OK
diff --git a/tests/multi_espnow/81_uasyncio_server.py b/tests/multi_espnow/81_uasyncio_server.py
new file mode 100644
index 000000000..ee098b7f3
--- /dev/null
+++ b/tests/multi_espnow/81_uasyncio_server.py
@@ -0,0 +1,96 @@
+# Test of a ESPnow asyncio echo server and client transferring data.
+# Test will SKIP if instance0 (asyncio echo server) does not support asyncio.
+# - eg. ESP8266 with 1MB flash.
+# Instance1 is not required to support asyncio.
+
+try:
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def client_send(e, peer, msg, sync):
+ print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+
+def init(e, sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+async def echo_server(e):
+ peers = []
+ async for peer, msg in e:
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the message back to the sender
+ if not await e.asend(peer, msg, sync):
+ print("ERROR: asend() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+# Server
+def instance0():
+ try:
+ import uasyncio as asyncio
+ from aioespnow import AIOESPNow
+ except ImportError:
+ print("SKIP")
+ raise SystemExit
+ e = AIOESPNow()
+ init(e, True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ asyncio.run(echo_server(e))
+ print("Server Done")
+ e.active(False)
+
+
+def instance1():
+ e = espnow.ESPNow()
+ init(e, True, False)
+ peer = PEERS[0]
+ e.add_peer(peer)
+ multitest.next()
+
+ for i in range(5):
+ msg = bytes([random.getrandbits(8) for _ in range(12)])
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv(timeout_ms)
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ # Tell the server to stop
+ print("DONE")
+ msg = b"!done"
+ client_send(e, peer, msg, True)
+ p2, msg2 = e.irecv(timeout_ms)
+ print("OK" if msg2 == msg else "ERROR: Received != Sent")
+
+ e.active(False)
diff --git a/tests/multi_espnow/81_uasyncio_server.py.exp b/tests/multi_espnow/81_uasyncio_server.py.exp
new file mode 100644
index 000000000..abe34fc42
--- /dev/null
+++ b/tests/multi_espnow/81_uasyncio_server.py.exp
@@ -0,0 +1,11 @@
+--- instance0 ---
+Server Start
+Server Done
+--- instance1 ---
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+TEST: send/recv(msglen=12,sync=True): OK
+DONE
+TEST: send/recv(msglen=5,sync=True): OK
diff --git a/tests/multi_espnow/90_memory_test.py b/tests/multi_espnow/90_memory_test.py
new file mode 100644
index 000000000..5e80eb0fd
--- /dev/null
+++ b/tests/multi_espnow/90_memory_test.py
@@ -0,0 +1,108 @@
+# Test of a ESPnow echo server and client transferring data.
+# This test works with ESP32 or ESP8266 as server or client.
+
+try:
+ import network
+ import random
+ import espnow
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Set read timeout to 5 seconds
+timeout_ms = 5000
+default_pmk = b"MicroPyth0nRules"
+sync = True
+
+
+def echo_server(e):
+ peers = []
+ i = 0
+ while True:
+ peer, msg = e.irecv(timeout_ms)
+ i += 1
+ if i % 10 == 0:
+ print("OK:", i)
+ if peer is None:
+ return
+ if peer not in peers:
+ peers.append(peer)
+ e.add_peer(peer)
+
+ # Echo the MAC and message back to the sender
+ if not e.send(peer, msg, sync):
+ print("ERROR: send() failed to", peer)
+ return
+
+ if msg == b"!done":
+ return
+
+
+def echo_test(e, peer, msg, sync):
+ try:
+ if not e.send(peer, msg, sync):
+ print("ERROR: Send failed.")
+ return
+ except OSError as exc:
+ # Don't print exc as it is differs for esp32 and esp8266
+ print("ERROR: OSError:")
+ return
+
+ p2, msg2 = e.irecv(timeout_ms)
+ if msg2 != msg:
+ print("ERROR: Received != Sent")
+
+
+def echo_client(e, peer, msglens):
+ for sync in [True]:
+ for msglen in msglens:
+ msg = bytearray(msglen)
+ if msglen > 0:
+ msg[0] = b"_"[0] # Random message must not start with '!'
+ for i in range(1, msglen):
+ msg[i] = random.getrandbits(8)
+ echo_test(e, peer, msg, sync)
+
+
+def init(sta_active=True, ap_active=False):
+ wlans = [network.WLAN(i) for i in [network.STA_IF, network.AP_IF]]
+ e = espnow.ESPNow()
+ e.active(True)
+ e.set_pmk(default_pmk)
+ wlans[0].active(sta_active)
+ wlans[1].active(ap_active)
+ wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
+ return e
+
+
+# Server
+def instance0():
+ e = init(True, False)
+ multitest.globals(PEERS=[network.WLAN(i).config("mac") for i in (0, 1)])
+ multitest.next()
+ print("Server Start")
+ echo_server(e)
+ print("Server Done")
+ e.active(False)
+
+
+# Client
+def instance1():
+ e = init(True, False)
+ multitest.next()
+ peer = PEERS[0]
+ e.add_peer(peer)
+ echo_test(e, peer, b"ping", True)
+ gc.collect()
+ mem_start = gc.mem_alloc()
+ for i in range(10):
+ echo_client(e, peer, [250] * 10)
+ print("OK:", (i + 1) * 10)
+ echo_test(e, peer, b"!done", True)
+ gc.collect()
+ mem_end = gc.mem_alloc()
+ if mem_end - mem_start < 1024:
+ print("OK: Less than 1024 bytes consumed")
+ else:
+ print("Error: Memory consumed is", mem_end - mem_start)
+ e.active(False)
diff --git a/tests/multi_espnow/90_memory_test.py.exp b/tests/multi_espnow/90_memory_test.py.exp
new file mode 100644
index 000000000..1ea8c2959
--- /dev/null
+++ b/tests/multi_espnow/90_memory_test.py.exp
@@ -0,0 +1,25 @@
+--- instance0 ---
+Server Start
+OK: 10
+OK: 20
+OK: 30
+OK: 40
+OK: 50
+OK: 60
+OK: 70
+OK: 80
+OK: 90
+OK: 100
+Server Done
+--- instance1 ---
+OK: 10
+OK: 20
+OK: 30
+OK: 40
+OK: 50
+OK: 60
+OK: 70
+OK: 80
+OK: 90
+OK: 100
+OK: Less than 1024 bytes consumed