diff options
| author | Damien George <damien@micropython.org> | 2021-05-29 17:12:54 +1000 |
|---|---|---|
| committer | Damien George <damien@micropython.org> | 2021-05-29 17:17:22 +1000 |
| commit | a60ad3364132b9c4a30b20cd91fd5cd1ac965618 (patch) | |
| tree | 16d10d4e3dc5ba3dfa5469c403313f540a9fefa5 | |
| parent | e4ba57c5cd6f68306726891c45f36b5129b633ec (diff) | |
tools/mpremote: Add new CLI utility to interact with remote device.
This has been under development since April 2017. See #3034 and #6375.
Signed-off-by: Damien George <damien@micropython.org>
| -rw-r--r-- | tools/mpremote/LICENSE | 21 | ||||
| -rw-r--r-- | tools/mpremote/README.md | 71 | ||||
| -rwxr-xr-x | tools/mpremote/mpremote.py | 6 | ||||
| -rw-r--r-- | tools/mpremote/mpremote/__init__.py | 1 | ||||
| -rw-r--r-- | tools/mpremote/mpremote/console.py | 162 | ||||
| -rw-r--r-- | tools/mpremote/mpremote/main.py | 477 | ||||
| -rw-r--r-- | tools/mpremote/mpremote/pyboardextended.py | 621 | ||||
| -rw-r--r-- | tools/mpremote/pyproject.toml | 6 | ||||
| -rw-r--r-- | tools/mpremote/setup.cfg | 25 |
9 files changed, 1390 insertions, 0 deletions
diff --git a/tools/mpremote/LICENSE b/tools/mpremote/LICENSE new file mode 100644 index 000000000..5ec904cca --- /dev/null +++ b/tools/mpremote/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2021 Damien P. George + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/tools/mpremote/README.md b/tools/mpremote/README.md new file mode 100644 index 000000000..a6aaa1755 --- /dev/null +++ b/tools/mpremote/README.md @@ -0,0 +1,71 @@ +# mpremote -- MicroPython remote control + +This CLI tool provides an integrated set of utilities to remotely interact with +and automate a MicroPython device over a serial connection. + +The simplest way to use this tool is: + + mpremote + +This will automatically connect to the device and provide an interactive REPL. + +The full list of supported commands are: + + mpremote connect <device> -- connect to given device + device may be: list, auto, id:x, port:x + or any valid device name/path + mpremote disconnect -- disconnect current device + mpremote mount <local-dir> -- mount local directory on device + mpremote eval <string> -- evaluate and print the string + mpremote exec <string> -- execute the string + mpremote run <file> -- run the given local script + mpremote fs <command> <args...> -- execute filesystem commands on the device + command may be: cat, ls, cp, rm, mkdir, rmdir + use ":" as a prefix to specify a file on the device + mpremote repl -- enter REPL + options: + --capture <file> + --inject-code <string> + --inject-file <file> + +Multiple commands can be specified and they will be run sequentially. Connection +and disconnection will be done automatically at the start and end of the execution +of the tool, if such commands are not explicitly given. Automatic connection will +search for the first available serial device. If no action is specified then the +REPL will be entered. + +Shortcuts can be defined using the macro system. Built-in shortcuts are: + +- a0, a1, a2, a3: connect to `/dev/ttyACM?` +- u0, u1, u2, u3: connect to `/dev/ttyUSB?` +- c0, c1, c2, c3: connect to `COM?` +- cat, ls, cp, rm, mkdir, rmdir, df: filesystem commands +- reset: reset the device +- bootloader: make the device enter its bootloader + +Any user configuration, including user-defined shortcuts, can be placed in +.config/mpremote/config.py. For example: + + # Custom macro commands + commands = { + "c33": "connect id:334D335C3138", + "bl": "bootloader", + "double x=4": "eval x*2", + } + +Examples: + + mpremote + mpremote a1 + mpremote connect /dev/ttyUSB0 repl + mpremote ls + mpremote a1 ls + mpremote exec "import micropython; micropython.mem_info()" + mpremote eval 1/2 eval 3/4 + mpremote mount . + mpremote mount . exec "import local_script" + mpremote ls + mpremote cat boot.py + mpremote cp :main.py . + mpremote cp main.py : + mpremote cp -r dir/ : diff --git a/tools/mpremote/mpremote.py b/tools/mpremote/mpremote.py new file mode 100755 index 000000000..a91ff67b1 --- /dev/null +++ b/tools/mpremote/mpremote.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +import sys +from mpremote import main + +sys.exit(main.main()) diff --git a/tools/mpremote/mpremote/__init__.py b/tools/mpremote/mpremote/__init__.py new file mode 100644 index 000000000..1bb8bf6d7 --- /dev/null +++ b/tools/mpremote/mpremote/__init__.py @@ -0,0 +1 @@ +# empty diff --git a/tools/mpremote/mpremote/console.py b/tools/mpremote/mpremote/console.py new file mode 100644 index 000000000..646bbfa22 --- /dev/null +++ b/tools/mpremote/mpremote/console.py @@ -0,0 +1,162 @@ +import sys + +try: + import select, termios +except ImportError: + termios = None + select = None + import msvcrt + + +class ConsolePosix: + def __init__(self): + self.infd = sys.stdin.fileno() + self.infile = sys.stdin.buffer.raw + self.outfile = sys.stdout.buffer.raw + self.orig_attr = termios.tcgetattr(self.infd) + + def enter(self): + # attr is: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] + attr = termios.tcgetattr(self.infd) + attr[0] &= ~( + termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON + ) + attr[1] = 0 + attr[2] = attr[2] & ~(termios.CSIZE | termios.PARENB) | termios.CS8 + attr[3] = 0 + attr[6][termios.VMIN] = 1 + attr[6][termios.VTIME] = 0 + termios.tcsetattr(self.infd, termios.TCSANOW, attr) + + def exit(self): + termios.tcsetattr(self.infd, termios.TCSANOW, self.orig_attr) + + def waitchar(self): + # TODO pyb.serial might not have fd + select.select([console_in.infd, pyb.serial.fd], [], []) + + def readchar(self): + res = select.select([self.infd], [], [], 0) + if res[0]: + return self.infile.read(1) + else: + return None + + def write(self, buf): + self.outfile.write(buf) + + +class ConsoleWindows: + KEY_MAP = { + b"H": b"A", # UP + b"P": b"B", # DOWN + b"M": b"C", # RIGHT + b"K": b"D", # LEFT + b"G": b"H", # POS1 + b"O": b"F", # END + b"Q": b"6~", # PGDN + b"I": b"5~", # PGUP + b"s": b"1;5D", # CTRL-LEFT, + b"t": b"1;5C", # CTRL-RIGHT, + b"\x8d": b"1;5A", # CTRL-UP, + b"\x91": b"1;5B", # CTRL-DOWN, + b"w": b"1;5H", # CTRL-POS1 + b"u": b"1;5F", # CTRL-END + b"\x98": b"1;3A", # ALT-UP, + b"\xa0": b"1;3B", # ALT-DOWN, + b"\x9d": b"1;3C", # ALT-RIGHT, + b"\x9b": b"1;3D", # ALT-LEFT, + b"\x97": b"1;3H", # ALT-POS1, + b"\x9f": b"1;3F", # ALT-END, + b"S": b"3~", # DEL, + b"\x93": b"3;5~", # CTRL-DEL + b"R": b"2~", # INS + b"\x92": b"2;5~", # CTRL-INS + b"\x94": b"Z", # Ctrl-Tab = BACKTAB, + } + + def enter(self): + pass + + def exit(self): + pass + + def inWaiting(self): + return 1 if msvcrt.kbhit() else 0 + + def waitchar(self): + while not (self.inWaiting() or pyb.serial.inWaiting()): + time.sleep(0.01) + + def readchar(self): + if msvcrt.kbhit(): + ch = msvcrt.getch() + while ch in b"\x00\xe0": # arrow or function key prefix? + if not msvcrt.kbhit(): + return None + ch = msvcrt.getch() # second call returns the actual key code + try: + ch = b"\x1b[" + self.KEY_MAP[ch] + except KeyError: + return None + return ch + + def write(self, buf): + buf = buf.decode() if isinstance(buf, bytes) else buf + sys.stdout.write(buf) + sys.stdout.flush() + # for b in buf: + # if isinstance(b, bytes): + # msvcrt.putch(b) + # else: + # msvcrt.putwch(b) + + +if termios: + Console = ConsolePosix + VT_ENABLED = True +else: + Console = ConsoleWindows + + # Windows VT mode ( >= win10 only) + # https://bugs.python.org/msg291732 + import ctypes + from ctypes import wintypes + + kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + + ERROR_INVALID_PARAMETER = 0x0057 + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + def _check_bool(result, func, args): + if not result: + raise ctypes.WinError(ctypes.get_last_error()) + return args + + LPDWORD = ctypes.POINTER(wintypes.DWORD) + kernel32.GetConsoleMode.errcheck = _check_bool + kernel32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD) + kernel32.SetConsoleMode.errcheck = _check_bool + kernel32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD) + + def set_conout_mode(new_mode, mask=0xFFFFFFFF): + # don't assume StandardOutput is a console. + # open CONOUT$ instead + fdout = os.open("CONOUT$", os.O_RDWR) + try: + hout = msvcrt.get_osfhandle(fdout) + old_mode = wintypes.DWORD() + kernel32.GetConsoleMode(hout, ctypes.byref(old_mode)) + mode = (new_mode & mask) | (old_mode.value & ~mask) + kernel32.SetConsoleMode(hout, mode) + return old_mode.value + finally: + os.close(fdout) + + # def enable_vt_mode(): + mode = mask = ENABLE_VIRTUAL_TERMINAL_PROCESSING + try: + set_conout_mode(mode, mask) + VT_ENABLED = True + except WindowsError as e: + VT_ENABLED = False diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py new file mode 100644 index 000000000..1ce00305e --- /dev/null +++ b/tools/mpremote/mpremote/main.py @@ -0,0 +1,477 @@ +""" +MicroPython Remote - Interaction and automation tool for MicroPython +MIT license; Copyright (c) 2019-2021 Damien P. George + +This program provides a set of utilities to interact with and automate a +MicroPython device over a serial connection. Commands supported are: + + mpremote -- auto-detect, connect and enter REPL + mpremote <device-shortcut> -- connect to given device + mpremote connect <device> -- connect to given device + mpremote disconnect -- disconnect current device + mpremote mount <local-dir> -- mount local directory on device + mpremote eval <string> -- evaluate and print the string + mpremote exec <string> -- execute the string + mpremote run <script> -- run the given local script + mpremote fs <command> <args...> -- execute filesystem commands on the device + mpremote repl -- enter REPL +""" + +import os, select, sys, time +import serial.tools.list_ports + +from . import pyboardextended as pyboard +from .console import Console, ConsolePosix + +_PROG = "mpremote" + +_AUTO_CONNECT_SEARCH_LIST = [ + "/dev/ttyACM0", + "/dev/ttyACM1", + "/dev/ttyACM2", + "/dev/ttyACM3", + "/dev/ttyUSB0", + "/dev/ttyUSB1", + "/dev/ttyUSB2", + "/dev/ttyUSB3", + "COM0", + "COM1", + "COM2", + "COM3", +] + +_BUILTIN_COMMAND_EXPANSIONS = { + # Device connection shortcuts. + "a0": "connect /dev/ttyACM0", + "a1": "connect /dev/ttyACM1", + "a2": "connect /dev/ttyACM2", + "a3": "connect /dev/ttyACM3", + "u0": "connect /dev/ttyUSB0", + "u1": "connect /dev/ttyUSB1", + "u2": "connect /dev/ttyUSB2", + "u3": "connect /dev/ttyUSB3", + "c0": "connect COM0", + "c1": "connect COM1", + "c2": "connect COM2", + "c3": "connect COM3", + # Filesystem shortcuts. + "cat": "fs cat", + "ls": "fs ls", + "cp": "fs cp", + "rm": "fs rm", + "mkdir": "fs mkdir", + "rmdir": "fs rmdir", + "df": [ + "exec", + "import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')", + ], + # Other shortcuts. + "reset t_ms=100": [ + "exec", + "--no-follow", + "import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()", + ], + "bootloader t_ms=100": [ + "exec", + "--no-follow", + "import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()", + ], + "setrtc": [ + "exec", + "import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))", + ], +} + + +def load_user_config(): + # Create empty config object. + config = __build_class__(lambda: None, "Config")() + config.commands = {} + + # Get config file name. + path = os.getenv("XDG_CONFIG_HOME") + if path is None: + path = os.getenv("HOME") + if path is None: + return config + path = os.path.join(path, ".config") + path = os.path.join(path, _PROG) + config_file = os.path.join(path, "config.py") + + # Check if config file exists. + if not os.path.exists(config_file): + return config + + # Exec the config file in its directory. + with open(config_file) as f: + config_data = f.read() + prev_cwd = os.getcwd() + os.chdir(path) + exec(config_data, config.__dict__) + os.chdir(prev_cwd) + + return config + + +def prepare_command_expansions(config): + global _command_expansions + + _command_expansions = {} + + for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands): + for cmd, sub in command_set.items(): + cmd = cmd.split() + if len(cmd) == 1: + args = () + else: + args = tuple(c.split("=") for c in cmd[1:]) + if isinstance(sub, str): + sub = sub.split() + _command_expansions[cmd[0]] = (args, sub) + + +def do_command_expansion(args): + def usage_error(cmd, exp_args, msg): + print(f"Command {cmd} {msg}; signature is:") + print(" ", cmd, " ".join("=".join(a) for a in exp_args)) + sys.exit(1) + + last_arg_idx = len(args) + pre = [] + while args and args[0] in _command_expansions: + cmd = args.pop(0) + exp_args, exp_sub = _command_expansions[cmd] + for exp_arg in exp_args: + exp_arg_name = exp_arg[0] + if args and "=" not in args[0]: + # Argument given without a name. + value = args.pop(0) + elif args and args[0].startswith(exp_arg_name + "="): + # Argument given with correct name. + value = args.pop(0).split("=", 1)[1] + else: + # No argument given, or argument given with a different name. + if len(exp_arg) == 1: + # Required argument (it has no default). + usage_error(cmd, exp_args, f"missing argument {exp_arg_name}") + else: + # Optional argument with a default. + value = exp_arg[1] + pre.append(f"{exp_arg_name}={value}") + + args[0:0] = exp_sub + last_arg_idx = len(exp_sub) + + if last_arg_idx < len(args) and "=" in args[last_arg_idx]: + # Extra unknown arguments given. + arg = args[last_arg_idx].split("=", 1)[0] + usage_error(cmd, exp_args, f"given unexpected argument {arg}") + sys.exit(1) + + # Insert expansion with optional setting of arguments. + if pre: + args[0:0] = ["exec", ";".join(pre)] + + +def do_connect(args): + dev = args.pop(0) + try: + if dev == "list": + # List attached devices. + for p in sorted(serial.tools.list_ports.comports()): + print( + "{} {} {:04x}:{:04x} {} {}".format( + p.device, p.serial_number, p.pid, p.vid, p.manufacturer, p.product + ) + ) + return None + elif dev == "auto": + # Auto-detect and auto-connect to the first available device. + ports = serial.tools.list_ports.comports() + for dev in _AUTO_CONNECT_SEARCH_LIST: + if any(p.device == dev for p in ports): + try: + return pyboard.PyboardExtended(dev, baudrate=115200) + except pyboard.PyboardError as er: + if not er.args[0].startswith("failed to access"): + raise er + raise pyboard.PyboardError("no device found") + elif dev.startswith("id:"): + # Search for a device with the given serial number. + serial_number = dev[len("id:") :] + dev = None + for p in serial.tools.list_ports.comports(): + if p.serial_number == serial_number: + return pyboard.PyboardExtended(p.device, baudrate=115200) + raise pyboard.PyboardError("no device with serial number {}".format(serial_number)) + else: + # Connect to the given device. + if dev.startswith("port:"): + dev = dev[len("port:") :] + return pyboard.PyboardExtended(dev, baudrate=115200) + except pyboard.PyboardError as er: + msg = er.args[0] + if msg.startswith("failed to access"): + msg += " (it may be in use by another program)" + print(msg) + sys.exit(1) + + +def do_disconnect(pyb): + try: + if pyb.mounted: + if not pyb.in_raw_repl: + pyb.enter_raw_repl(soft_reset=False) + pyb.umount_local() + if pyb.in_raw_repl: + pyb.exit_raw_repl() + except OSError: + # Ignore any OSError exceptions when shutting down, eg: + # - pyboard.filesystem_command will close the connecton if it had an error + # - umounting will fail if serial port disappeared + pass + pyb.close() + + +def do_filesystem(pyb, args): + def _list_recursive(files, path): + if os.path.isdir(path): + for entry in os.listdir(path): + _list_recursive(files, os.path.join(path, entry)) + else: + files.append(os.path.split(path)) + + if args[0] == "cp" and args[1] == "-r": + args.pop(0) + args.pop(0) + assert args[-1] == ":" + args.pop() + src_files = [] + for path in args: + _list_recursive(src_files, path) + known_dirs = {""} + pyb.exec_("import uos") + for dir, file in src_files: + dir_parts = dir.split("/") + for i in range(len(dir_parts)): + d = "/".join(dir_parts[: i + 1]) + if d not in known_dirs: + pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d) + known_dirs.add(d) + pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"]) + else: + pyboard.filesystem_command(pyb, args) + args.clear() + + +def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject): + while True: + if isinstance(console_in, ConsolePosix): + # TODO pyb.serial might not have fd + select.select([console_in.infd, pyb.serial.fd], [], []) + else: + while not (console_in.inWaiting() or pyb.serial.inWaiting()): + time.sleep(0.01) + c = console_in.readchar() + if c: + if c == b"\x1d": # ctrl-], quit + break + elif c == b"\x04": # ctrl-D + # do a soft reset and reload the filesystem hook + pyb.soft_reset_with_mount(console_out_write) + elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code + pyb.serial.write(code_to_inject) + elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script + console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8")) + pyb.enter_raw_repl(soft_reset=False) + with open(file_to_inject, "rb") as f: + pyfile = f.read() + try: + pyb.exec_raw_no_follow(pyfile) + except pyboard.PyboardError as er: + console_out_write(b"Error:\r\n") + console_out_write(er) + pyb.exit_raw_repl() + else: + pyb.serial.write(c) + + try: + n = pyb.serial.inWaiting() + except OSError as er: + if er.args[0] == 5: # IO error, device disappeared + print("device disconnected") + break + + if n > 0: + c = pyb.serial.read(1) + if c is not None: + # pass character through to the console + oc = ord(c) + if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126: + console_out_write(c) + else: + console_out_write(b"[%02x]" % ord(c)) + + +def do_repl(pyb, args): + capture_file = None + code_to_inject = None + file_to_inject = None + + while len(args): + if args[0] == "--capture": + args.pop(0) + capture_file = args.pop(0) + elif args[0] == "--inject-code": + args.pop(0) + code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8") + elif args[0] == "--inject-file": + args.pop(0) + file_to_inject = args.pop(0) + else: + break + + print("Connected to MicroPython at %s" % pyb.device_name) + print("Use Ctrl-] to exit this shell") + if capture_file is not None: + print('Capturing session to file "%s"' % capture_file) + capture_file = open(capture_file, "wb") + if code_to_inject is not None: + print("Use Ctrl-J to inject", code_to_inject) + if file_to_inject is not None: + print('Use Ctrl-K to inject file "%s"' % file_to_inject) + + console = Console() + console.enter() + + def console_out_write(b): + console.write(b) + if capture_file is not None: + capture_file.write(b) + capture_file.flush() + + try: + do_repl_main_loop( + pyb, + console, + console_out_write, + code_to_inject=code_to_inject, + file_to_inject=file_to_inject, + ) + finally: + console.exit() + if capture_file is not None: + capture_file.close() + + +def execbuffer(pyb, buf, follow): + ret_val = 0 + try: + pyb.exec_raw_no_follow(buf) + if follow: + ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes) + if ret_err: + pyboard.stdout_write_bytes(ret_err) + ret_val = 1 + except pyboard.PyboardError as er: + print(er) + ret_val = 1 + except KeyboardInterrupt: + ret_val = 1 + return ret_val + + +def main(): + config = load_user_config() + prepare_command_expansions(config) + + args = sys.argv[1:] + pyb = None + did_action = False + + try: + while args: + do_command_expansion(args) + + cmds = { + "connect": (False, False, 1), + "disconnect": (False, False, 0), + "mount": (True, False, 1), + "repl": (False, True, 0), + "eval": (True, True, 1), + "exec": (True, True, 1), + "run": (True, True, 1), + "fs": (True, True, 1), + } + cmd = args.pop(0) + try: + need_raw_repl, is_action, num_args_min = cmds[cmd] + except KeyError: + print(f"{_PROG}: '{cmd}' is not a command") + return 1 + + if len(args) < num_args_min: + print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)") + return 1 + + if cmd == "connect": + if pyb is not None: + do_disconnect(pyb) + pyb = do_connect(args) + if pyb is None: + did_action = True + continue + + if pyb is None: + pyb = do_connect(["auto"]) + + if need_raw_repl: + if not pyb.in_raw_repl: + pyb.enter_raw_repl() + else: + if pyb.in_raw_repl: + pyb.exit_raw_repl() + if is_action: + did_action = True + + if cmd == "disconnect": + do_disconnect(pyb) + pyb = None + elif cmd == "mount": + path = args.pop(0) + pyb.mount_local(path) + print(f"Local directory {path} is mounted at /remote") + elif cmd in ("exec", "eval", "run"): + follow = True + if args[0] == "--no-follow": + args.pop(0) + follow = False + if cmd == "exec": + buf = args.pop(0) + elif cmd == "eval": + buf = "print(" + args.pop(0) + ")" + else: + filename = args.pop(0) + try: + with open(filename, "rb") as f: + buf = f.read() + except OSError: + print(f"{_PROG}: could not read file '{filename}'") + return 1 + ret = execbuffer(pyb, buf, follow) + if ret: + return ret + elif cmd == "fs": + do_filesystem(pyb, args) + elif cmd == "repl": + do_repl(pyb, args) + + if not did_action: + if pyb is None: + pyb = do_connect(["auto"]) + if pyb.in_raw_repl: + pyb.exit_raw_repl() + do_repl(pyb, args) + finally: + if pyb is not None: + do_disconnect(pyb) diff --git a/tools/mpremote/mpremote/pyboardextended.py b/tools/mpremote/mpremote/pyboardextended.py new file mode 100644 index 000000000..e3f259b39 --- /dev/null +++ b/tools/mpremote/mpremote/pyboardextended.py @@ -0,0 +1,621 @@ +import os, re, serial, struct, time +from errno import EPERM +from .console import VT_ENABLED + +try: + from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command +except ImportError: + import sys + + sys.path.append(os.path.dirname(__file__) + "/../..") + from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command + +fs_hook_cmds = { + "CMD_STAT": 1, + "CMD_ILISTDIR_START": 2, + "CMD_ILISTDIR_NEXT": 3, + "CMD_OPEN": 4, + "CMD_CLOSE": 5, + "CMD_READ": 6, + "CMD_WRITE": 7, + "CMD_SEEK": 8, + "CMD_REMOVE": 9, + "CMD_RENAME": 10, +} + +fs_hook_code = """\ +import uos, uio, ustruct, micropython, usys + + +class RemoteCommand: + def __init__(self, use_second_port): + self.buf4 = bytearray(4) + try: + import pyb + self.fout = pyb.USB_VCP() + if use_second_port: + self.fin = pyb.USB_VCP(1) + else: + self.fin = pyb.USB_VCP() + except: + import usys + self.fout = usys.stdout.buffer + self.fin = usys.stdin.buffer + import select + self.poller = select.poll() + self.poller.register(self.fin, select.POLLIN) + + def poll_in(self): + for _ in self.poller.ipoll(1000): + return + self.end() + raise Exception('timeout waiting for remote') + + def rd(self, n): + buf = bytearray(n) + self.rd_into(buf, n) + return buf + + def rd_into(self, buf, n): + # implement reading with a timeout in case other side disappears + if n == 0: + return + self.poll_in() + r = self.fin.readinto(buf, n) + if r < n: + mv = memoryview(buf) + while r < n: + self.poll_in() + r += self.fin.readinto(mv[r:], n - r) + + def begin(self, type): + micropython.kbd_intr(-1) + buf4 = self.buf4 + buf4[0] = 0x18 + buf4[1] = type + self.fout.write(buf4, 2) + # Wait for sync byte 0x18, but don't get stuck forever + for i in range(30): + self.poller.poll(1000) + self.fin.readinto(buf4, 1) + if buf4[0] == 0x18: + break + + def end(self): + micropython.kbd_intr(3) + + def rd_s8(self): + self.rd_into(self.buf4, 1) + n = self.buf4[0] + if n & 0x80: + n -= 0x100 + return n + + def rd_s32(self): + buf4 = self.buf4 + self.rd_into(buf4, 4) + n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 + if buf4[3] & 0x80: + n -= 0x100000000 + return n + + def rd_u32(self): + buf4 = self.buf4 + self.rd_into(buf4, 4) + return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 + + def rd_bytes(self, buf): + # TODO if n is large (eg >256) then we may miss bytes on stdin + n = self.rd_s32() + if buf is None: + ret = buf = bytearray(n) + else: + ret = n + self.rd_into(buf, n) + return ret + + def rd_str(self): + n = self.rd_s32() + if n == 0: + return '' + else: + return str(self.rd(n), 'utf8') + + def wr_s8(self, i): + self.buf4[0] = i + self.fout.write(self.buf4, 1) + + def wr_s32(self, i): + ustruct.pack_into('<i', self.buf4, 0, i) + self.fout.write(self.buf4) + + def wr_bytes(self, b): + self.wr_s32(len(b)) + self.fout.write(b) + + # str and bytes act the same in MicroPython + wr_str = wr_bytes + + +class RemoteFile(uio.IOBase): + def __init__(self, cmd, fd, is_text): + self.cmd = cmd + self.fd = fd + self.is_text = is_text + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + self.close() + + def ioctl(self, request, arg): + if request == 4: # CLOSE + self.close() + return 0 + + def flush(self): + pass + + def close(self): + if self.fd is None: + return + c = self.cmd + c.begin(CMD_CLOSE) + c.wr_s8(self.fd) + c.end() + self.fd = None + + def read(self, n=-1): + c = self.cmd + c.begin(CMD_READ) + c.wr_s8(self.fd) + c.wr_s32(n) + data = c.rd_bytes(None) + c.end() + if self.is_text: + data = str(data, 'utf8') + else: + data = bytes(data) + return data + + def readinto(self, buf): + c = self.cmd + c.begin(CMD_READ) + c.wr_s8(self.fd) + c.wr_s32(len(buf)) + n = c.rd_bytes(buf) + c.end() + return n + + def readline(self): + l = '' + while 1: + c = self.read(1) + l += c + if c == '\\n' or c == '': + return l + + def readlines(self): + ls = [] + while 1: + l = self.readline() + if not l: + return ls + ls.append(l) + + def write(self, buf): + c = self.cmd + c.begin(CMD_WRITE) + c.wr_s8(self.fd) + c.wr_bytes(buf) + n = c.rd_s32() + c.end() + return n + + def seek(self, n): + c = self.cmd + c.begin(CMD_SEEK) + c.wr_s8(self.fd) + c.wr_s32(n) + n = c.rd_s32() + c.end() + return n + + +class RemoteFS: + def __init__(self, cmd): + self.cmd = cmd + + def mount(self, readonly, mkfs): + pass + + def umount(self): + pass + + def chdir(self, path): + if not path.startswith("/"): + path = self.path + path + if not path.endswith("/"): + path += "/" + if path != "/": + self.stat(path) + self.path = path + + def getcwd(self): + return self.path + + def remove(self, path): + c = self.cmd + c.begin(CMD_REMOVE) + c.wr_str(self.path + path) + res = c.rd_s32() + c.end() + if res < 0: + raise OSError(-res) + + def rename(self, old, new): + c = self.cmd + c.begin(CMD_RENAME) + c.wr_str(self.path + old) + c.wr_str(self.path + new) + res = c.rd_s32() + c.end() + if res < 0: + raise OSError(-res) + + def stat(self, path): + c = self.cmd + c.begin(CMD_STAT) + c.wr_str(self.path + path) + res = c.rd_s8() + if res < 0: + c.end() + raise OSError(-res) + mode = c.rd_u32() + size = c.rd_u32() + atime = c.rd_u32() + mtime = c.rd_u32() + ctime = c.rd_u32() + c.end() + return mode, 0, 0, 0, 0, 0, size, atime, mtime, ctime + + def ilistdir(self, path): + c = self.cmd + c.begin(CMD_ILISTDIR_START) + c.wr_str(self.path + path) + res = c.rd_s8() + c.end() + if res < 0: + raise OSError(-res) + def next(): + while True: + c.begin(CMD_ILISTDIR_NEXT) + name = c.rd_str() + if name: + type = c.rd_u32() + c.end() + yield (name, type, 0) + else: + c.end() + break + return next() + + def open(self, path, mode): + c = self.cmd + c.begin(CMD_OPEN) + c.wr_str(self.path + path) + c.wr_str(mode) + fd = c.rd_s8() + c.end() + if fd < 0: + raise OSError(-fd) + return RemoteFile(c, fd, mode.find('b') == -1) + + +def __mount(use_second_port): + uos.mount(RemoteFS(RemoteCommand(use_second_port)), '/remote') + uos.chdir('/remote') +""" + +# Apply basic compression on hook code. +for key, value in fs_hook_cmds.items(): + fs_hook_code = re.sub(key, str(value), fs_hook_code) +fs_hook_code = re.sub(" *#.*$", "", fs_hook_code, flags=re.MULTILINE) +fs_hook_code = re.sub("\n\n+", "\n", fs_hook_code) +fs_hook_code = re.sub(" ", " ", fs_hook_code) +fs_hook_code = re.sub("rd_", "r", fs_hook_code) +fs_hook_code = re.sub("wr_", "w", fs_hook_code) +fs_hook_code = re.sub("buf4", "b4", fs_hook_code) + + +class PyboardCommand: + def __init__(self, fin, fout, path): + self.fin = fin + self.fout = fout + self.root = path + "/" + self.data_ilistdir = ["", []] + self.data_files = [] + + def rd_s8(self): + return struct.unpack("<b", self.fin.read(1))[0] + + def rd_s32(self): + return struct.unpack("<i", self.fin.read(4))[0] + + def rd_bytes(self): + n = self.rd_s32() + return self.fin.read(n) + + def rd_str(self): + n = self.rd_s32() + if n == 0: + return "" + else: + return str(self.fin.read(n), "utf8") + + def wr_s8(self, i): + self.fout.write(struct.pack("<b", i)) + + def wr_s32(self, i): + self.fout.write(struct.pack("<i", i)) + + def wr_u32(self, i): + self.fout.write(struct.pack("<I", i)) + + def wr_bytes(self, b): + self.wr_s32(len(b)) + self.fout.write(b) + + def wr_str(self, s): + b = bytes(s, "utf8") + self.wr_s32(len(b)) + self.fout.write(b) + + def log_cmd(self, msg): + print(f"[{msg}]", end="\r\n") + + def path_check(self, path): + parent = os.path.realpath(self.root) + child = os.path.realpath(path) + if parent != os.path.commonpath([parent, child]): + raise OSError(EPERM, "") # File is outside mounted dir + + def do_stat(self): + path = self.root + self.rd_str() + # self.log_cmd(f"stat {path}") + try: + self.path_check(path) + stat = os.stat(path) + except OSError as er: + self.wr_s8(-abs(er.errno)) + else: + self.wr_s8(0) + # Note: st_ino would need to be 64-bit if added here + self.wr_u32(stat.st_mode) + self.wr_u32(stat.st_size) + self.wr_u32(int(stat.st_atime)) + self.wr_u32(int(stat.st_mtime)) + self.wr_u32(int(stat.st_ctime)) + + def do_ilistdir_start(self): + path = self.root + self.rd_str() + try: + self.path_check(path) + self.wr_s8(0) + except OSError as er: + self.wr_s8(-abs(er.errno)) + else: + self.data_ilistdir[0] = path + self.data_ilistdir[1] = os.listdir(path) + + def do_ilistdir_next(self): + if self.data_ilistdir[1]: + entry = self.data_ilistdir[1].pop(0) + try: + stat = os.lstat(self.data_ilistdir[0] + "/" + entry) + mode = stat.st_mode & 0xC000 + except OSError as er: + mode = 0 + self.wr_str(entry) + self.wr_u32(mode) + else: + self.wr_str("") + + def do_open(self): + path = self.root + self.rd_str() + mode = self.rd_str() + # self.log_cmd(f"open {path} {mode}") + try: + self.path_check(path) + f = open(path, mode) + except OSError as er: + self.wr_s8(-abs(er.errno)) + else: + is_text = mode.find("b") == -1 + try: + fd = self.data_files.index(None) + self.data_files[fd] = (f, is_text) + except ValueError: + fd = len(self.data_files) + self.data_files.append((f, is_text)) + self.wr_s8(fd) + + def do_close(self): + fd = self.rd_s8() + # self.log_cmd(f"close {fd}") + self.data_files[fd][0].close() + self.data_files[fd] = None + + def do_read(self): + fd = self.rd_s8() + n = self.rd_s32() + buf = self.data_files[fd][0].read(n) + if self.data_files[fd][1]: + buf = bytes(buf, "utf8") + self.wr_bytes(buf) + # self.log_cmd(f"read {fd} {n} -> {len(buf)}") + + def do_seek(self): + fd = self.rd_s8() + n = self.rd_s32() + # self.log_cmd(f"seek {fd} {n}") + self.data_files[fd][0].seek(n) + self.wr_s32(n) + + def do_write(self): + fd = self.rd_s8() + buf = self.rd_bytes() + if self.data_files[fd][1]: + buf = str(buf, "utf8") + n = self.data_files[fd][0].write(buf) + self.wr_s32(n) + # self.log_cmd(f"write {fd} {len(buf)} -> {n}") + + def do_remove(self): + path = self.root + self.rd_str() + # self.log_cmd(f"remove {path}") + try: + self.path_check(path) + os.remove(path) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + def do_rename(self): + old = self.root + self.rd_str() + new = self.root + self.rd_str() + # self.log_cmd(f"rename {old} {new}") + try: + self.path_check(old) + self.path_check(new) + os.rename(old, new) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + cmd_table = { + fs_hook_cmds["CMD_STAT"]: do_stat, + fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start, + fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next, + fs_hook_cmds["CMD_OPEN"]: do_open, + fs_hook_cmds["CMD_CLOSE"]: do_close, + fs_hook_cmds["CMD_READ"]: do_read, + fs_hook_cmds["CMD_WRITE"]: do_write, + fs_hook_cmds["CMD_SEEK"]: do_seek, + fs_hook_cmds["CMD_REMOVE"]: do_remove, + fs_hook_cmds["CMD_RENAME"]: do_rename, + } + + +class SerialIntercept: + def __init__(self, serial, cmd): + self.orig_serial = serial + self.cmd = cmd + self.buf = b"" + self.orig_serial.timeout = 5.0 + + def _check_input(self, blocking): + if blocking or self.orig_serial.inWaiting() > 0: + c = self.orig_serial.read(1) + if c == b"\x18": + # a special command + c = self.orig_serial.read(1)[0] + self.orig_serial.write(b"\x18") # Acknowledge command + PyboardCommand.cmd_table[c](self.cmd) + elif not VT_ENABLED and c == b"\x1b": + # ESC code, ignore these on windows + esctype = self.orig_serial.read(1) + if esctype == b"[": # CSI + while not (0x40 < self.orig_serial.read(1)[0] < 0x7E): + # Looking for "final byte" of escape sequence + pass + else: + self.buf += c + + @property + def fd(self): + return self.orig_serial.fd + + def close(self): + self.orig_serial.close() + + def inWaiting(self): + self._check_input(False) + return len(self.buf) + + def read(self, n): + while len(self.buf) < n: + self._check_input(True) + out = self.buf[:n] + self.buf = self.buf[n:] + return out + + def write(self, buf): + self.orig_serial.write(buf) + + +class PyboardExtended(Pyboard): + def __init__(self, dev, *args, **kwargs): + super().__init__(dev, *args, **kwargs) + self.device_name = dev + self.mounted = False + + def mount_local(self, path, dev_out=None): + fout = self.serial + if dev_out is not None: + try: + fout = serial.Serial(dev_out) + except serial.SerialException: + port = list(serial.tools.list_ports.grep(dev_out)) + if not port: + raise + for p in port: + try: + fout = serial.Serial(p.device) + break + except serial.SerialException: + pass + self.mounted = True + if self.eval('"RemoteFS" in globals()') == b"False": + self.exec_(fs_hook_code) + self.exec_("__mount(%s)" % (dev_out is not None)) + self.cmd = PyboardCommand(self.serial, fout, path) + self.serial = SerialIntercept(self.serial, self.cmd) + self.dev_out = dev_out + + def soft_reset_with_mount(self, out_callback): + self.serial.write(b"\x04") + if not self.mounted: + return + + # Wait for a response to the soft-reset command. + for i in range(10): + if self.serial.inWaiting(): + break + time.sleep(0.05) + else: + # Device didn't respond so it wasn't in a state to do a soft reset. + return + + out_callback(self.serial.read(1)) + self.serial = self.serial.orig_serial + n = self.serial.inWaiting() + while n > 0: + buf = self.serial.read(n) + out_callback(buf) + time.sleep(0.1) + n = self.serial.inWaiting() + self.serial.write(b"\x01") + self.exec_(fs_hook_code) + self.exec_("__mount(%s)" % (self.dev_out is not None)) + self.exit_raw_repl() + self.read_until(4, b">>> ") + self.serial = SerialIntercept(self.serial, self.cmd) + + def umount_local(self): + if self.mounted: + self.exec_('uos.umount("/remote")') + self.mounted = False diff --git a/tools/mpremote/pyproject.toml b/tools/mpremote/pyproject.toml new file mode 100644 index 000000000..374b58cbf --- /dev/null +++ b/tools/mpremote/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel" +] +build-backend = "setuptools.build_meta" diff --git a/tools/mpremote/setup.cfg b/tools/mpremote/setup.cfg new file mode 100644 index 000000000..4a1bf27c1 --- /dev/null +++ b/tools/mpremote/setup.cfg @@ -0,0 +1,25 @@ +[metadata] +name = mpremote +version = 0.0.4 +author = Damien George +author_email = damien@micropython.org +description = Tool for interacting remotely with MicroPython +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/micropython/micropython +project_urls = + Bug Tracker = https://github.com/micropython/micropython/issues +classifiers = + Programming Language :: Python :: 3 + License :: OSI Approved :: MIT License + Operating System :: OS Independent + +[options] +packages = mpremote +python_requires = >= 3.4 +install_requires = + pyserial >= 3.3 + +[options.entry_points] +console_scripts = + mpremote = mpremote.main:main |
