summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/run-multitests.py396
1 files changed, 396 insertions, 0 deletions
diff --git a/tests/run-multitests.py b/tests/run-multitests.py
new file mode 100755
index 000000000..f27af3b0e
--- /dev/null
+++ b/tests/run-multitests.py
@@ -0,0 +1,396 @@
+#!/usr/bin/env python3
+
+# This file is part of the MicroPython project, http://micropython.org/
+# The MIT License (MIT)
+# Copyright (c) 2020 Damien P. George
+
+import sys, os, time, re, select
+import argparse
+import subprocess
+
+sys.path.append("../tools")
+import pyboard
+
+if os.name == "nt":
+ CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe")
+ MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe")
+else:
+ CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3")
+ MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython")
+
+PYTHON_TRUTH = CPYTHON3
+
+INSTANCE_READ_TIMEOUT_S = 10
+
+APPEND_CODE_TEMPLATE = """
+import sys
+class multitest:
+ @staticmethod
+ def flush():
+ if hasattr(sys.stdout, "flush"):
+ sys.stdout.flush()
+ @staticmethod
+ def skip():
+ print("SKIP")
+ multitest.flush()
+ raise SystemExit
+ @staticmethod
+ def next():
+ print("NEXT")
+ multitest.flush()
+ @staticmethod
+ def globals(**gs):
+ for g in gs:
+ print("SET {{}} = {{!r}}".format(g, gs[g]))
+ multitest.flush()
+ @staticmethod
+ def get_network_ip():
+ try:
+ import network
+ ip = network.WLAN().ifconfig()[0]
+ except:
+ ip = "127.0.0.1"
+ return ip
+
+{}
+
+instance{}()
+multitest.flush()
+"""
+
+
+class PyInstance:
+ def __init__(self):
+ pass
+
+ def close(self):
+ pass
+
+ def prepare_script_from_file(self, filename, prepend, append):
+ with open(filename, "rb") as f:
+ script = f.read()
+ if prepend:
+ script = bytes(prepend, "ascii") + b"\n" + script
+ if append:
+ script += b"\n" + bytes(append, "ascii")
+ return script
+
+ def run_file(self, filename, prepend="", append=""):
+ return self.run_script(self.prepare_script_from_file(filename, prepend, append))
+
+ def start_file(self, filename, prepend="", append=""):
+ return self.start_script(self.prepare_script_from_file(filename, prepend, append))
+
+
+class PyInstanceSubProcess(PyInstance):
+ def __init__(self, cmd):
+ self.cmd = cmd
+ self.popen = None
+ self.finished = True
+
+ def __str__(self):
+ return self.cmd[0].rsplit("/")[-1]
+
+ def run_script(self, script):
+ output = b""
+ err = None
+ try:
+ p = subprocess.run(
+ self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=script
+ )
+ output = p.stdout
+ except subprocess.CalledProcessError as er:
+ err = er
+ return str(output.strip(), "ascii"), err
+
+ def start_script(self, script):
+ self.popen = subprocess.Popen(
+ self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ )
+ self.popen.stdin.write(script)
+ self.popen.stdin.close()
+ self.finished = False
+
+ def stop(self):
+ if self.popen and self.popen.poll() is None:
+ self.popen.terminate()
+
+ def readline(self):
+ sel = select.select([self.popen.stdout.raw], [], [], 0.1)
+ if not sel[0]:
+ self.finished = self.popen.poll() is not None
+ return None, None
+ out = self.popen.stdout.raw.readline()
+ if out == b"":
+ self.finished = self.popen.poll() is not None
+ return None, None
+ else:
+ return str(out.rstrip(), "ascii"), None
+
+ def is_finished(self):
+ return self.finished
+
+ def wait_finished(self):
+ self.popen.wait()
+ out = self.popen.stdout.read()
+ return str(out, "ascii"), ""
+
+
+class PyInstancePyboard(PyInstance):
+ def __init__(self, device):
+ self.device = device
+ self.pyb = pyboard.Pyboard(device)
+ self.pyb.enter_raw_repl()
+ self.finished = True
+
+ def __str__(self):
+ return self.device.rsplit("/")[-1]
+
+ def close(self):
+ self.pyb.exit_raw_repl()
+ self.pyb.close()
+
+ def run_script(self, script):
+ output = b""
+ err = None
+ try:
+ self.pyb.enter_raw_repl()
+ output = self.pyb.exec_(script)
+ except pyboard.PyboardError as er:
+ err = er
+ return str(output.strip(), "ascii"), err
+
+ def start_script(self, script):
+ self.pyb.enter_raw_repl()
+ self.pyb.exec_raw_no_follow(script)
+ self.finished = False
+
+ def stop(self):
+ self.pyb.serial.write(b"\r\x03")
+
+ def readline(self):
+ if self.finished:
+ return None, None
+ if self.pyb.serial.inWaiting() == 0:
+ return None, None
+ out = self.pyb.read_until(1, (b"\r\n", b"\x04"))
+ if out.endswith(b"\x04"):
+ self.finished = True
+ out = out[:-1]
+ err = str(self.pyb.read_until(1, b"\x04"), "ascii")
+ err = err[:-1]
+ if not out and not err:
+ return None, None
+ else:
+ err = None
+ return str(out.rstrip(), "ascii"), err
+
+ def is_finished(self):
+ return self.finished
+
+ def wait_finished(self):
+ out, err = self.pyb.follow(10, None)
+ return str(out, "ascii"), str(err, "ascii")
+
+
+def prepare_test_file_list(test_files):
+ test_files2 = []
+ for test_file in sorted(test_files):
+ num_instances = 0
+ with open(test_file) as f:
+ for line in f:
+ m = re.match(r"def instance([0-9]+)\(\):", line)
+ if m:
+ num_instances = max(num_instances, int(m.group(1)) + 1)
+ test_files2.append((test_file, num_instances))
+ return test_files2
+
+
+def trace_instance_output(instance_idx, line):
+ if cmd_args.trace_output:
+ t_ms = round((time.time() - trace_t0) * 1000)
+ print("{:6} i{} :".format(t_ms, instance_idx), line)
+
+
+def run_test_on_instances(test_file, num_instances, instances):
+ global trace_t0
+ trace_t0 = time.time()
+
+ error = False
+ skip = False
+ injected_globals = ""
+ output = [[] for _ in range(num_instances)]
+
+ # Start all instances running, in order, waiting until they signal they are ready
+ for idx in range(num_instances):
+ append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx)
+ instance = instances[idx]
+ instance.start_file(test_file, append=append_code)
+ last_read_time = time.time()
+ while True:
+ if instance.is_finished():
+ break
+ out, err = instance.readline()
+ if out is None and err is None:
+ if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S:
+ output[idx].append("TIMEOUT")
+ error = True
+ break
+ time.sleep(0.1)
+ continue
+ last_read_time = time.time()
+ if out is not None:
+ trace_instance_output(idx, out)
+ if out.startswith("SET "):
+ injected_globals += out[4:] + "\n"
+ elif out == "SKIP":
+ skip = True
+ break
+ elif out == "NEXT":
+ break
+ else:
+ output[idx].append(out)
+ if err is not None:
+ trace_instance_output(idx, err)
+ output[idx].append(err)
+ error = True
+
+ if error or skip:
+ break
+
+ if not error and not skip:
+ # Capture output and wait for all instances to finish running
+ last_read_time = [time.time() for _ in range(num_instances)]
+ while True:
+ num_running = 0
+ num_output = 0
+ for idx in range(num_instances):
+ instance = instances[idx]
+ if instance.is_finished():
+ continue
+ num_running += 1
+ out, err = instance.readline()
+ if out is None and err is None:
+ if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S:
+ output[idx].append("TIMEOUT")
+ error = True
+ continue
+ num_output += 1
+ last_read_time[idx] = time.time()
+ if out is not None:
+ trace_instance_output(idx, out)
+ output[idx].append(out)
+ if err is not None:
+ trace_instance_output(idx, err)
+ output[idx].append(err)
+ error = True
+
+ if not num_output:
+ time.sleep(0.1)
+ if not num_running or error:
+ break
+
+ # Stop all instances
+ for idx in range(num_instances):
+ instances[idx].stop()
+
+ output_str = ""
+ for idx, lines in enumerate(output):
+ output_str += "--- instance{} ---\n".format(idx)
+ output_str += "\n".join(lines) + "\n"
+
+ return error, skip, output_str
+
+
+def run_tests(test_files, instances_truth, instances_test):
+ for test_file, num_instances in test_files:
+ instances_str = "|".join(str(instances_test[i]) for i in range(num_instances))
+ print("{} on {}: ".format(test_file, instances_str), end="")
+ if cmd_args.show_output or cmd_args.trace_output:
+ print()
+ sys.stdout.flush()
+
+ # Run test on test instances
+ error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test)
+
+ if cmd_args.show_output:
+ print("### TEST ###")
+ print(output_test, end="")
+
+ if not skip:
+ # Check if truth exists in a file, and read it in
+ test_file_expected = test_file + ".exp"
+ if os.path.isfile(test_file_expected):
+ with open(test_file_expected) as f:
+ output_truth = f.read()
+ else:
+ # Run test on truth instances to get expected output
+ _, _, output_truth = run_test_on_instances(
+ test_file, num_instances, instances_truth
+ )
+ if cmd_args.show_output:
+ print("### TRUTH ###")
+ print(output_truth, end="")
+
+ # Print result of test
+ if skip:
+ print("skip")
+ elif output_test == output_truth:
+ print("pass")
+ else:
+ print("FAIL")
+ if not cmd_args.show_output:
+ print("### TEST ###")
+ print(output_test, end="")
+ print("### TRUTH ###")
+ print(output_truth, end="")
+
+ if cmd_args.show_output:
+ print()
+
+
+def main():
+ global cmd_args
+
+ cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython")
+ cmd_parser.add_argument(
+ "-s", "--show-output", action="store_true", help="show test output after running"
+ )
+ cmd_parser.add_argument(
+ "-t", "--trace-output", action="store_true", help="trace test output while running"
+ )
+ cmd_parser.add_argument(
+ "-i", "--instance", action="append", default=[], help="instance(s) to run the tests on"
+ )
+ cmd_parser.add_argument("files", nargs="+", help="input test files")
+ cmd_args = cmd_parser.parse_args()
+
+ test_files = prepare_test_file_list(cmd_args.files)
+ max_instances = max(t[1] for t in test_files)
+
+ instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)]
+
+ instances_test = []
+ for i in cmd_args.instance:
+ if i.startswith("exec:"):
+ instances_test.append(PyInstanceSubProcess([i[len("exec:") :]]))
+ elif i.startswith("pyb:"):
+ instances_test.append(PyInstancePyboard(i[len("pyb:") :]))
+ else:
+ print("unknown instance string: {}".format(i), file=sys.stderr)
+ sys.exit(1)
+
+ for _ in range(max_instances - len(instances_test)):
+ instances_test.append(PyInstanceSubProcess([MICROPYTHON]))
+
+ try:
+ run_tests(test_files, instances_truth, instances_test)
+ finally:
+ for i in instances_truth:
+ i.close()
+ for i in instances_test:
+ i.close()
+
+
+if __name__ == "__main__":
+ main()