summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien George <damien@micropython.org>2024-09-25 17:46:59 +1000
committerDamien George <damien@micropython.org>2025-10-02 11:36:48 +1000
commit7ef47ef98d8b98f9c2786e8d58a0307f59dceebb (patch)
treead6ac5d93f32fedfaad083267116937b8630d925
parentb2871e0d0aec0c3783fe4bad215fbd8a4d88e5fe (diff)
tests/serial_test.py: Add test for serial throughput.
This is a test script used to test USB CDC (or USB UART) serial reliability and throughput. Run against any MicroPython remote target with: $ python serial_test.py -t <device> Signed-off-by: Damien George <damien@micropython.org>
-rw-r--r--tests/README.md12
-rwxr-xr-xtests/serial_test.py263
2 files changed, 275 insertions, 0 deletions
diff --git a/tests/README.md b/tests/README.md
index 676e9d832..534e7e0a0 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -203,6 +203,18 @@ internal_bench/bytebuf:
1 tests performed (3 individual testcases)
```
+## Serial reliability and performance test
+
+Serial port reliability and performance can be tested using the `serial_test.py` script.
+Pass the name of the port to test against, for example:
+
+ $ ./serial_test.py -t /dev/ttyACM0
+
+If no port is specified then `/dev/ttyACM0` is used as the default.
+
+The test will send data out to the target, and receive data from the target, in various
+chunk sizes. The throughput of the serial connection will be reported for each sub-test.
+
## Test key/certificates
SSL/TLS tests in `multi_net` and `net_inet` use self-signed key/cert pairs
diff --git a/tests/serial_test.py b/tests/serial_test.py
new file mode 100755
index 000000000..1baa2282a
--- /dev/null
+++ b/tests/serial_test.py
@@ -0,0 +1,263 @@
+#!/usr/bin/env python
+#
+# Performance and reliability test for serial port communication.
+#
+# Basic usage:
+# serial_test.py [-t serial-device]
+#
+# The `serial-device` will default to /dev/ttyACM0.
+
+import argparse
+import serial
+import sys
+import time
+
+run_tests_module = __import__("run-tests")
+
+read_test_script = """
+bin = True
+try:
+ wr=__import__("pyb").USB_VCP(0).send
+except:
+ import sys
+ if hasattr(sys.stdout,'buffer'):
+ wr=sys.stdout.buffer.write
+ else:
+ wr=sys.stdout.write
+ bin = False
+b=bytearray(%u)
+if bin:
+ wr('BIN')
+ for i in range(len(b)):
+ b[i] = i & 0xff
+else:
+ wr('TXT')
+ for i in range(len(b)):
+ b[i] = 0x20 + (i & 0x3f)
+for _ in range(%d):
+ wr(b)
+"""
+
+
+write_test_script_verified = """
+import sys
+try:
+ rd=__import__("pyb").USB_VCP(0).recv
+except:
+ rd=sys.stdin.readinto
+b=bytearray(%u)
+for _ in range(%u):
+ n = rd(b)
+ fail = 0
+ for i in range(n):
+ if b[i] != 32 + (i & 0x3f):
+ fail += 1
+ if fail:
+ sys.stdout.write(b'ER%%05u' %% fail)
+ else:
+ sys.stdout.write(b'OK%%05u' %% n)
+"""
+
+write_test_script_unverified = """
+import sys
+try:
+ rd=__import__("pyb").USB_VCP(0).recv
+except:
+ rd=sys.stdin.readinto
+b=bytearray(%u)
+for _ in range(%u):
+ n = rd(b)
+ if n != len(b):
+ sys.stdout.write(b'ER%%05u' %% n)
+ else:
+ sys.stdout.write(b'OK%%05u' %% n)
+"""
+
+
+class TestError(Exception):
+ pass
+
+
+def drain_input(ser):
+ time.sleep(0.1)
+ while ser.inWaiting() > 0:
+ data = ser.read(ser.inWaiting())
+ time.sleep(0.1)
+
+
+def send_script(ser, script):
+ chunk_size = 32
+ for i in range(0, len(script), chunk_size):
+ ser.write(script[i : i + chunk_size])
+ time.sleep(0.01)
+ ser.write(b"\x04") # eof
+ ser.flush()
+ response = ser.read(2)
+ if response != b"OK":
+ response += ser.read(ser.inWaiting())
+ raise TestError("could not send script", response)
+
+
+def read_test(ser_repl, ser_data, bufsize, nbuf):
+ global test_passed
+
+ assert bufsize % 256 == 0 # for verify to work
+
+ # Load and run the read_test_script.
+ ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
+ drain_input(ser_repl)
+ script = bytes(read_test_script % (bufsize, nbuf), "ascii")
+ send_script(ser_repl, script)
+
+ # Read from the device the type of data that it will send (BIN or TXT).
+ data_type = ser_data.read(3)
+
+ # Read data from the device, check it is correct, and measure throughput.
+ n = 0
+ last_byte = None
+ t_start = time.time()
+ remain = nbuf * bufsize
+ total_data = bytearray(remain)
+ while remain:
+ t0 = time.monotonic_ns()
+ while ser_data.inWaiting() == 0:
+ if time.monotonic_ns() - t0 > 1e9:
+ # timeout waiting for data from device
+ break
+ time.sleep(0.0001)
+ if not ser_data.inWaiting():
+ test_passed = False
+ print("ERROR: timeout waiting for data")
+ print(total_data[:n])
+ return 0
+ to_read = min(ser_data.inWaiting(), remain)
+ data = ser_data.read(to_read)
+ remain -= len(data)
+ print(f"{n} / {nbuf * bufsize}", end="\r")
+ total_data[n : n + len(data)] = data
+ n += len(data)
+ t_end = time.time()
+ for i in range(0, len(total_data)):
+ if data_type == b"BIN":
+ wanted = i & 0xFF
+ else:
+ wanted = 0x20 + (i & 0x3F)
+ if total_data[i] != wanted:
+ test_passed = False
+ print("ERROR: data mismatch:", i, wanted, total_data[i])
+ ser_repl.write(b"\x03") # break
+ t = t_end - t_start
+
+ # Print results.
+ print(
+ "DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
+ % (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
+ )
+
+ return n / t
+
+
+def write_test(ser_repl, ser_data, bufsize, nbuf, verified):
+ global test_passed
+
+ # Load and run the write_test_script.
+ ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
+ drain_input(ser_repl)
+ if verified:
+ script = write_test_script_verified
+ else:
+ script = write_test_script_unverified
+ script = bytes(script % (bufsize, nbuf), "ascii")
+ send_script(ser_repl, script)
+ drain_input(ser_repl)
+
+ # Write data to the device, check it is correct, and measure throughput.
+ n = 0
+ t_start = time.time()
+ buf = bytearray(bufsize)
+ for i in range(len(buf)):
+ buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
+ for i in range(nbuf):
+ ser_data.write(buf)
+ n += len(buf)
+ print(f"{n} / {nbuf * bufsize}", end="\r")
+ response = ser_repl.read(7)
+ if response != b"OK%05u" % bufsize:
+ test_passed = False
+ print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response))
+ t_end = time.time()
+ ser_repl.write(b"\x03") # break
+ t = t_end - t_start
+
+ # Print results.
+ print(
+ "DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
+ % (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
+ )
+
+ return n / t
+
+
+def do_test(dev_repl, dev_data=None, time_per_subtest=1):
+ if dev_data is None:
+ print("REPL and data on", dev_repl)
+ ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
+ ser_data = ser_repl
+ else:
+ print("REPL on", dev_repl)
+ print("data on", dev_data)
+ ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
+ ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1)
+
+ for test_func, test_args, bufsize in (
+ (read_test, (), 256),
+ (write_test, (True,), 128),
+ (write_test, (False,), 128),
+ ):
+ nbuf = 128
+ while bufsize <= 16384:
+ rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args)
+ bufsize *= 2
+ if rate:
+ # Adjust the amount of data based on the rate, to keep each subtest
+ # at around time_per_subtest seconds long.
+ nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1)
+
+ ser_repl.close()
+ ser_data.close()
+
+
+def main():
+ global test_passed
+
+ cmd_parser = argparse.ArgumentParser(
+ description="Test performance and reliability of serial port communication.",
+ epilog=run_tests_module.test_instance_epilog,
+ formatter_class=argparse.RawTextHelpFormatter,
+ )
+ cmd_parser.add_argument(
+ "-t",
+ "--test-instance",
+ default="a0",
+ help="MicroPython instance to test",
+ )
+ cmd_parser.add_argument(
+ "--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)"
+ )
+ args = cmd_parser.parse_args()
+
+ dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance)
+
+ test_passed = True
+ try:
+ do_test(dev_repl, None, float(args.time_per_subtest))
+ except TestError as er:
+ test_passed = False
+ print("ERROR:", er)
+
+ if not test_passed:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()