diff options
author | Damien George <damien@micropython.org> | 2024-09-25 17:46:59 +1000 |
---|---|---|
committer | Damien George <damien@micropython.org> | 2025-10-02 11:36:48 +1000 |
commit | 7ef47ef98d8b98f9c2786e8d58a0307f59dceebb (patch) | |
tree | ad6ac5d93f32fedfaad083267116937b8630d925 | |
parent | b2871e0d0aec0c3783fe4bad215fbd8a4d88e5fe (diff) |
tests/serial_test.py: Add test for serial throughput.
This is a test script used to test USB CDC (or USB UART) serial reliability
and throughput.
Run against any MicroPython remote target with:
$ python serial_test.py -t <device>
Signed-off-by: Damien George <damien@micropython.org>
-rw-r--r-- | tests/README.md | 12 | ||||
-rwxr-xr-x | tests/serial_test.py | 263 |
2 files changed, 275 insertions, 0 deletions
diff --git a/tests/README.md b/tests/README.md index 676e9d832..534e7e0a0 100644 --- a/tests/README.md +++ b/tests/README.md @@ -203,6 +203,18 @@ internal_bench/bytebuf: 1 tests performed (3 individual testcases) ``` +## Serial reliability and performance test + +Serial port reliability and performance can be tested using the `serial_test.py` script. +Pass the name of the port to test against, for example: + + $ ./serial_test.py -t /dev/ttyACM0 + +If no port is specified then `/dev/ttyACM0` is used as the default. + +The test will send data out to the target, and receive data from the target, in various +chunk sizes. The throughput of the serial connection will be reported for each sub-test. + ## Test key/certificates SSL/TLS tests in `multi_net` and `net_inet` use self-signed key/cert pairs diff --git a/tests/serial_test.py b/tests/serial_test.py new file mode 100755 index 000000000..1baa2282a --- /dev/null +++ b/tests/serial_test.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python +# +# Performance and reliability test for serial port communication. +# +# Basic usage: +# serial_test.py [-t serial-device] +# +# The `serial-device` will default to /dev/ttyACM0. + +import argparse +import serial +import sys +import time + +run_tests_module = __import__("run-tests") + +read_test_script = """ +bin = True +try: + wr=__import__("pyb").USB_VCP(0).send +except: + import sys + if hasattr(sys.stdout,'buffer'): + wr=sys.stdout.buffer.write + else: + wr=sys.stdout.write + bin = False +b=bytearray(%u) +if bin: + wr('BIN') + for i in range(len(b)): + b[i] = i & 0xff +else: + wr('TXT') + for i in range(len(b)): + b[i] = 0x20 + (i & 0x3f) +for _ in range(%d): + wr(b) +""" + + +write_test_script_verified = """ +import sys +try: + rd=__import__("pyb").USB_VCP(0).recv +except: + rd=sys.stdin.readinto +b=bytearray(%u) +for _ in range(%u): + n = rd(b) + fail = 0 + for i in range(n): + if b[i] != 32 + (i & 0x3f): + fail += 1 + if fail: + sys.stdout.write(b'ER%%05u' %% fail) + else: + sys.stdout.write(b'OK%%05u' %% n) +""" + +write_test_script_unverified = """ +import sys +try: + rd=__import__("pyb").USB_VCP(0).recv +except: + rd=sys.stdin.readinto +b=bytearray(%u) +for _ in range(%u): + n = rd(b) + if n != len(b): + sys.stdout.write(b'ER%%05u' %% n) + else: + sys.stdout.write(b'OK%%05u' %% n) +""" + + +class TestError(Exception): + pass + + +def drain_input(ser): + time.sleep(0.1) + while ser.inWaiting() > 0: + data = ser.read(ser.inWaiting()) + time.sleep(0.1) + + +def send_script(ser, script): + chunk_size = 32 + for i in range(0, len(script), chunk_size): + ser.write(script[i : i + chunk_size]) + time.sleep(0.01) + ser.write(b"\x04") # eof + ser.flush() + response = ser.read(2) + if response != b"OK": + response += ser.read(ser.inWaiting()) + raise TestError("could not send script", response) + + +def read_test(ser_repl, ser_data, bufsize, nbuf): + global test_passed + + assert bufsize % 256 == 0 # for verify to work + + # Load and run the read_test_script. + ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot + drain_input(ser_repl) + script = bytes(read_test_script % (bufsize, nbuf), "ascii") + send_script(ser_repl, script) + + # Read from the device the type of data that it will send (BIN or TXT). + data_type = ser_data.read(3) + + # Read data from the device, check it is correct, and measure throughput. + n = 0 + last_byte = None + t_start = time.time() + remain = nbuf * bufsize + total_data = bytearray(remain) + while remain: + t0 = time.monotonic_ns() + while ser_data.inWaiting() == 0: + if time.monotonic_ns() - t0 > 1e9: + # timeout waiting for data from device + break + time.sleep(0.0001) + if not ser_data.inWaiting(): + test_passed = False + print("ERROR: timeout waiting for data") + print(total_data[:n]) + return 0 + to_read = min(ser_data.inWaiting(), remain) + data = ser_data.read(to_read) + remain -= len(data) + print(f"{n} / {nbuf * bufsize}", end="\r") + total_data[n : n + len(data)] = data + n += len(data) + t_end = time.time() + for i in range(0, len(total_data)): + if data_type == b"BIN": + wanted = i & 0xFF + else: + wanted = 0x20 + (i & 0x3F) + if total_data[i] != wanted: + test_passed = False + print("ERROR: data mismatch:", i, wanted, total_data[i]) + ser_repl.write(b"\x03") # break + t = t_end - t_start + + # Print results. + print( + "DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec" + % (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t) + ) + + return n / t + + +def write_test(ser_repl, ser_data, bufsize, nbuf, verified): + global test_passed + + # Load and run the write_test_script. + ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot + drain_input(ser_repl) + if verified: + script = write_test_script_verified + else: + script = write_test_script_unverified + script = bytes(script % (bufsize, nbuf), "ascii") + send_script(ser_repl, script) + drain_input(ser_repl) + + # Write data to the device, check it is correct, and measure throughput. + n = 0 + t_start = time.time() + buf = bytearray(bufsize) + for i in range(len(buf)): + buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars! + for i in range(nbuf): + ser_data.write(buf) + n += len(buf) + print(f"{n} / {nbuf * bufsize}", end="\r") + response = ser_repl.read(7) + if response != b"OK%05u" % bufsize: + test_passed = False + print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response)) + t_end = time.time() + ser_repl.write(b"\x03") # break + t = t_end - t_start + + # Print results. + print( + "DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec" + % (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t) + ) + + return n / t + + +def do_test(dev_repl, dev_data=None, time_per_subtest=1): + if dev_data is None: + print("REPL and data on", dev_repl) + ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1) + ser_data = ser_repl + else: + print("REPL on", dev_repl) + print("data on", dev_data) + ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1) + ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1) + + for test_func, test_args, bufsize in ( + (read_test, (), 256), + (write_test, (True,), 128), + (write_test, (False,), 128), + ): + nbuf = 128 + while bufsize <= 16384: + rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args) + bufsize *= 2 + if rate: + # Adjust the amount of data based on the rate, to keep each subtest + # at around time_per_subtest seconds long. + nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1) + + ser_repl.close() + ser_data.close() + + +def main(): + global test_passed + + cmd_parser = argparse.ArgumentParser( + description="Test performance and reliability of serial port communication.", + epilog=run_tests_module.test_instance_epilog, + formatter_class=argparse.RawTextHelpFormatter, + ) + cmd_parser.add_argument( + "-t", + "--test-instance", + default="a0", + help="MicroPython instance to test", + ) + cmd_parser.add_argument( + "--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)" + ) + args = cmd_parser.parse_args() + + dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance) + + test_passed = True + try: + do_test(dev_repl, None, float(args.time_per_subtest)) + except TestError as er: + test_passed = False + print("ERROR:", er) + + if not test_passed: + sys.exit(1) + + +if __name__ == "__main__": + main() |