summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/mpremote/LICENSE21
-rw-r--r--tools/mpremote/README.md71
-rwxr-xr-xtools/mpremote/mpremote.py6
-rw-r--r--tools/mpremote/mpremote/__init__.py1
-rw-r--r--tools/mpremote/mpremote/console.py162
-rw-r--r--tools/mpremote/mpremote/main.py477
-rw-r--r--tools/mpremote/mpremote/pyboardextended.py621
-rw-r--r--tools/mpremote/pyproject.toml6
-rw-r--r--tools/mpremote/setup.cfg25
9 files changed, 1390 insertions, 0 deletions
diff --git a/tools/mpremote/LICENSE b/tools/mpremote/LICENSE
new file mode 100644
index 000000000..5ec904cca
--- /dev/null
+++ b/tools/mpremote/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2021 Damien P. George
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/tools/mpremote/README.md b/tools/mpremote/README.md
new file mode 100644
index 000000000..a6aaa1755
--- /dev/null
+++ b/tools/mpremote/README.md
@@ -0,0 +1,71 @@
+# mpremote -- MicroPython remote control
+
+This CLI tool provides an integrated set of utilities to remotely interact with
+and automate a MicroPython device over a serial connection.
+
+The simplest way to use this tool is:
+
+ mpremote
+
+This will automatically connect to the device and provide an interactive REPL.
+
+The full list of supported commands are:
+
+ mpremote connect <device> -- connect to given device
+ device may be: list, auto, id:x, port:x
+ or any valid device name/path
+ mpremote disconnect -- disconnect current device
+ mpremote mount <local-dir> -- mount local directory on device
+ mpremote eval <string> -- evaluate and print the string
+ mpremote exec <string> -- execute the string
+ mpremote run <file> -- run the given local script
+ mpremote fs <command> <args...> -- execute filesystem commands on the device
+ command may be: cat, ls, cp, rm, mkdir, rmdir
+ use ":" as a prefix to specify a file on the device
+ mpremote repl -- enter REPL
+ options:
+ --capture <file>
+ --inject-code <string>
+ --inject-file <file>
+
+Multiple commands can be specified and they will be run sequentially. Connection
+and disconnection will be done automatically at the start and end of the execution
+of the tool, if such commands are not explicitly given. Automatic connection will
+search for the first available serial device. If no action is specified then the
+REPL will be entered.
+
+Shortcuts can be defined using the macro system. Built-in shortcuts are:
+
+- a0, a1, a2, a3: connect to `/dev/ttyACM?`
+- u0, u1, u2, u3: connect to `/dev/ttyUSB?`
+- c0, c1, c2, c3: connect to `COM?`
+- cat, ls, cp, rm, mkdir, rmdir, df: filesystem commands
+- reset: reset the device
+- bootloader: make the device enter its bootloader
+
+Any user configuration, including user-defined shortcuts, can be placed in
+.config/mpremote/config.py. For example:
+
+ # Custom macro commands
+ commands = {
+ "c33": "connect id:334D335C3138",
+ "bl": "bootloader",
+ "double x=4": "eval x*2",
+ }
+
+Examples:
+
+ mpremote
+ mpremote a1
+ mpremote connect /dev/ttyUSB0 repl
+ mpremote ls
+ mpremote a1 ls
+ mpremote exec "import micropython; micropython.mem_info()"
+ mpremote eval 1/2 eval 3/4
+ mpremote mount .
+ mpremote mount . exec "import local_script"
+ mpremote ls
+ mpremote cat boot.py
+ mpremote cp :main.py .
+ mpremote cp main.py :
+ mpremote cp -r dir/ :
diff --git a/tools/mpremote/mpremote.py b/tools/mpremote/mpremote.py
new file mode 100755
index 000000000..a91ff67b1
--- /dev/null
+++ b/tools/mpremote/mpremote.py
@@ -0,0 +1,6 @@
+#!/usr/bin/env python3
+
+import sys
+from mpremote import main
+
+sys.exit(main.main())
diff --git a/tools/mpremote/mpremote/__init__.py b/tools/mpremote/mpremote/__init__.py
new file mode 100644
index 000000000..1bb8bf6d7
--- /dev/null
+++ b/tools/mpremote/mpremote/__init__.py
@@ -0,0 +1 @@
+# empty
diff --git a/tools/mpremote/mpremote/console.py b/tools/mpremote/mpremote/console.py
new file mode 100644
index 000000000..646bbfa22
--- /dev/null
+++ b/tools/mpremote/mpremote/console.py
@@ -0,0 +1,162 @@
+import sys
+
+try:
+ import select, termios
+except ImportError:
+ termios = None
+ select = None
+ import msvcrt
+
+
+class ConsolePosix:
+ def __init__(self):
+ self.infd = sys.stdin.fileno()
+ self.infile = sys.stdin.buffer.raw
+ self.outfile = sys.stdout.buffer.raw
+ self.orig_attr = termios.tcgetattr(self.infd)
+
+ def enter(self):
+ # attr is: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
+ attr = termios.tcgetattr(self.infd)
+ attr[0] &= ~(
+ termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON
+ )
+ attr[1] = 0
+ attr[2] = attr[2] & ~(termios.CSIZE | termios.PARENB) | termios.CS8
+ attr[3] = 0
+ attr[6][termios.VMIN] = 1
+ attr[6][termios.VTIME] = 0
+ termios.tcsetattr(self.infd, termios.TCSANOW, attr)
+
+ def exit(self):
+ termios.tcsetattr(self.infd, termios.TCSANOW, self.orig_attr)
+
+ def waitchar(self):
+ # TODO pyb.serial might not have fd
+ select.select([console_in.infd, pyb.serial.fd], [], [])
+
+ def readchar(self):
+ res = select.select([self.infd], [], [], 0)
+ if res[0]:
+ return self.infile.read(1)
+ else:
+ return None
+
+ def write(self, buf):
+ self.outfile.write(buf)
+
+
+class ConsoleWindows:
+ KEY_MAP = {
+ b"H": b"A", # UP
+ b"P": b"B", # DOWN
+ b"M": b"C", # RIGHT
+ b"K": b"D", # LEFT
+ b"G": b"H", # POS1
+ b"O": b"F", # END
+ b"Q": b"6~", # PGDN
+ b"I": b"5~", # PGUP
+ b"s": b"1;5D", # CTRL-LEFT,
+ b"t": b"1;5C", # CTRL-RIGHT,
+ b"\x8d": b"1;5A", # CTRL-UP,
+ b"\x91": b"1;5B", # CTRL-DOWN,
+ b"w": b"1;5H", # CTRL-POS1
+ b"u": b"1;5F", # CTRL-END
+ b"\x98": b"1;3A", # ALT-UP,
+ b"\xa0": b"1;3B", # ALT-DOWN,
+ b"\x9d": b"1;3C", # ALT-RIGHT,
+ b"\x9b": b"1;3D", # ALT-LEFT,
+ b"\x97": b"1;3H", # ALT-POS1,
+ b"\x9f": b"1;3F", # ALT-END,
+ b"S": b"3~", # DEL,
+ b"\x93": b"3;5~", # CTRL-DEL
+ b"R": b"2~", # INS
+ b"\x92": b"2;5~", # CTRL-INS
+ b"\x94": b"Z", # Ctrl-Tab = BACKTAB,
+ }
+
+ def enter(self):
+ pass
+
+ def exit(self):
+ pass
+
+ def inWaiting(self):
+ return 1 if msvcrt.kbhit() else 0
+
+ def waitchar(self):
+ while not (self.inWaiting() or pyb.serial.inWaiting()):
+ time.sleep(0.01)
+
+ def readchar(self):
+ if msvcrt.kbhit():
+ ch = msvcrt.getch()
+ while ch in b"\x00\xe0": # arrow or function key prefix?
+ if not msvcrt.kbhit():
+ return None
+ ch = msvcrt.getch() # second call returns the actual key code
+ try:
+ ch = b"\x1b[" + self.KEY_MAP[ch]
+ except KeyError:
+ return None
+ return ch
+
+ def write(self, buf):
+ buf = buf.decode() if isinstance(buf, bytes) else buf
+ sys.stdout.write(buf)
+ sys.stdout.flush()
+ # for b in buf:
+ # if isinstance(b, bytes):
+ # msvcrt.putch(b)
+ # else:
+ # msvcrt.putwch(b)
+
+
+if termios:
+ Console = ConsolePosix
+ VT_ENABLED = True
+else:
+ Console = ConsoleWindows
+
+ # Windows VT mode ( >= win10 only)
+ # https://bugs.python.org/msg291732
+ import ctypes
+ from ctypes import wintypes
+
+ kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
+
+ ERROR_INVALID_PARAMETER = 0x0057
+ ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
+
+ def _check_bool(result, func, args):
+ if not result:
+ raise ctypes.WinError(ctypes.get_last_error())
+ return args
+
+ LPDWORD = ctypes.POINTER(wintypes.DWORD)
+ kernel32.GetConsoleMode.errcheck = _check_bool
+ kernel32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD)
+ kernel32.SetConsoleMode.errcheck = _check_bool
+ kernel32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD)
+
+ def set_conout_mode(new_mode, mask=0xFFFFFFFF):
+ # don't assume StandardOutput is a console.
+ # open CONOUT$ instead
+ fdout = os.open("CONOUT$", os.O_RDWR)
+ try:
+ hout = msvcrt.get_osfhandle(fdout)
+ old_mode = wintypes.DWORD()
+ kernel32.GetConsoleMode(hout, ctypes.byref(old_mode))
+ mode = (new_mode & mask) | (old_mode.value & ~mask)
+ kernel32.SetConsoleMode(hout, mode)
+ return old_mode.value
+ finally:
+ os.close(fdout)
+
+ # def enable_vt_mode():
+ mode = mask = ENABLE_VIRTUAL_TERMINAL_PROCESSING
+ try:
+ set_conout_mode(mode, mask)
+ VT_ENABLED = True
+ except WindowsError as e:
+ VT_ENABLED = False
diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py
new file mode 100644
index 000000000..1ce00305e
--- /dev/null
+++ b/tools/mpremote/mpremote/main.py
@@ -0,0 +1,477 @@
+"""
+MicroPython Remote - Interaction and automation tool for MicroPython
+MIT license; Copyright (c) 2019-2021 Damien P. George
+
+This program provides a set of utilities to interact with and automate a
+MicroPython device over a serial connection. Commands supported are:
+
+ mpremote -- auto-detect, connect and enter REPL
+ mpremote <device-shortcut> -- connect to given device
+ mpremote connect <device> -- connect to given device
+ mpremote disconnect -- disconnect current device
+ mpremote mount <local-dir> -- mount local directory on device
+ mpremote eval <string> -- evaluate and print the string
+ mpremote exec <string> -- execute the string
+ mpremote run <script> -- run the given local script
+ mpremote fs <command> <args...> -- execute filesystem commands on the device
+ mpremote repl -- enter REPL
+"""
+
+import os, select, sys, time
+import serial.tools.list_ports
+
+from . import pyboardextended as pyboard
+from .console import Console, ConsolePosix
+
+_PROG = "mpremote"
+
+_AUTO_CONNECT_SEARCH_LIST = [
+ "/dev/ttyACM0",
+ "/dev/ttyACM1",
+ "/dev/ttyACM2",
+ "/dev/ttyACM3",
+ "/dev/ttyUSB0",
+ "/dev/ttyUSB1",
+ "/dev/ttyUSB2",
+ "/dev/ttyUSB3",
+ "COM0",
+ "COM1",
+ "COM2",
+ "COM3",
+]
+
+_BUILTIN_COMMAND_EXPANSIONS = {
+ # Device connection shortcuts.
+ "a0": "connect /dev/ttyACM0",
+ "a1": "connect /dev/ttyACM1",
+ "a2": "connect /dev/ttyACM2",
+ "a3": "connect /dev/ttyACM3",
+ "u0": "connect /dev/ttyUSB0",
+ "u1": "connect /dev/ttyUSB1",
+ "u2": "connect /dev/ttyUSB2",
+ "u3": "connect /dev/ttyUSB3",
+ "c0": "connect COM0",
+ "c1": "connect COM1",
+ "c2": "connect COM2",
+ "c3": "connect COM3",
+ # Filesystem shortcuts.
+ "cat": "fs cat",
+ "ls": "fs ls",
+ "cp": "fs cp",
+ "rm": "fs rm",
+ "mkdir": "fs mkdir",
+ "rmdir": "fs rmdir",
+ "df": [
+ "exec",
+ "import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
+ ],
+ # Other shortcuts.
+ "reset t_ms=100": [
+ "exec",
+ "--no-follow",
+ "import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()",
+ ],
+ "bootloader t_ms=100": [
+ "exec",
+ "--no-follow",
+ "import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()",
+ ],
+ "setrtc": [
+ "exec",
+ "import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))",
+ ],
+}
+
+
+def load_user_config():
+ # Create empty config object.
+ config = __build_class__(lambda: None, "Config")()
+ config.commands = {}
+
+ # Get config file name.
+ path = os.getenv("XDG_CONFIG_HOME")
+ if path is None:
+ path = os.getenv("HOME")
+ if path is None:
+ return config
+ path = os.path.join(path, ".config")
+ path = os.path.join(path, _PROG)
+ config_file = os.path.join(path, "config.py")
+
+ # Check if config file exists.
+ if not os.path.exists(config_file):
+ return config
+
+ # Exec the config file in its directory.
+ with open(config_file) as f:
+ config_data = f.read()
+ prev_cwd = os.getcwd()
+ os.chdir(path)
+ exec(config_data, config.__dict__)
+ os.chdir(prev_cwd)
+
+ return config
+
+
+def prepare_command_expansions(config):
+ global _command_expansions
+
+ _command_expansions = {}
+
+ for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands):
+ for cmd, sub in command_set.items():
+ cmd = cmd.split()
+ if len(cmd) == 1:
+ args = ()
+ else:
+ args = tuple(c.split("=") for c in cmd[1:])
+ if isinstance(sub, str):
+ sub = sub.split()
+ _command_expansions[cmd[0]] = (args, sub)
+
+
+def do_command_expansion(args):
+ def usage_error(cmd, exp_args, msg):
+ print(f"Command {cmd} {msg}; signature is:")
+ print(" ", cmd, " ".join("=".join(a) for a in exp_args))
+ sys.exit(1)
+
+ last_arg_idx = len(args)
+ pre = []
+ while args and args[0] in _command_expansions:
+ cmd = args.pop(0)
+ exp_args, exp_sub = _command_expansions[cmd]
+ for exp_arg in exp_args:
+ exp_arg_name = exp_arg[0]
+ if args and "=" not in args[0]:
+ # Argument given without a name.
+ value = args.pop(0)
+ elif args and args[0].startswith(exp_arg_name + "="):
+ # Argument given with correct name.
+ value = args.pop(0).split("=", 1)[1]
+ else:
+ # No argument given, or argument given with a different name.
+ if len(exp_arg) == 1:
+ # Required argument (it has no default).
+ usage_error(cmd, exp_args, f"missing argument {exp_arg_name}")
+ else:
+ # Optional argument with a default.
+ value = exp_arg[1]
+ pre.append(f"{exp_arg_name}={value}")
+
+ args[0:0] = exp_sub
+ last_arg_idx = len(exp_sub)
+
+ if last_arg_idx < len(args) and "=" in args[last_arg_idx]:
+ # Extra unknown arguments given.
+ arg = args[last_arg_idx].split("=", 1)[0]
+ usage_error(cmd, exp_args, f"given unexpected argument {arg}")
+ sys.exit(1)
+
+ # Insert expansion with optional setting of arguments.
+ if pre:
+ args[0:0] = ["exec", ";".join(pre)]
+
+
+def do_connect(args):
+ dev = args.pop(0)
+ try:
+ if dev == "list":
+ # List attached devices.
+ for p in sorted(serial.tools.list_ports.comports()):
+ print(
+ "{} {} {:04x}:{:04x} {} {}".format(
+ p.device, p.serial_number, p.pid, p.vid, p.manufacturer, p.product
+ )
+ )
+ return None
+ elif dev == "auto":
+ # Auto-detect and auto-connect to the first available device.
+ ports = serial.tools.list_ports.comports()
+ for dev in _AUTO_CONNECT_SEARCH_LIST:
+ if any(p.device == dev for p in ports):
+ try:
+ return pyboard.PyboardExtended(dev, baudrate=115200)
+ except pyboard.PyboardError as er:
+ if not er.args[0].startswith("failed to access"):
+ raise er
+ raise pyboard.PyboardError("no device found")
+ elif dev.startswith("id:"):
+ # Search for a device with the given serial number.
+ serial_number = dev[len("id:") :]
+ dev = None
+ for p in serial.tools.list_ports.comports():
+ if p.serial_number == serial_number:
+ return pyboard.PyboardExtended(p.device, baudrate=115200)
+ raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
+ else:
+ # Connect to the given device.
+ if dev.startswith("port:"):
+ dev = dev[len("port:") :]
+ return pyboard.PyboardExtended(dev, baudrate=115200)
+ except pyboard.PyboardError as er:
+ msg = er.args[0]
+ if msg.startswith("failed to access"):
+ msg += " (it may be in use by another program)"
+ print(msg)
+ sys.exit(1)
+
+
+def do_disconnect(pyb):
+ try:
+ if pyb.mounted:
+ if not pyb.in_raw_repl:
+ pyb.enter_raw_repl(soft_reset=False)
+ pyb.umount_local()
+ if pyb.in_raw_repl:
+ pyb.exit_raw_repl()
+ except OSError:
+ # Ignore any OSError exceptions when shutting down, eg:
+ # - pyboard.filesystem_command will close the connecton if it had an error
+ # - umounting will fail if serial port disappeared
+ pass
+ pyb.close()
+
+
+def do_filesystem(pyb, args):
+ def _list_recursive(files, path):
+ if os.path.isdir(path):
+ for entry in os.listdir(path):
+ _list_recursive(files, os.path.join(path, entry))
+ else:
+ files.append(os.path.split(path))
+
+ if args[0] == "cp" and args[1] == "-r":
+ args.pop(0)
+ args.pop(0)
+ assert args[-1] == ":"
+ args.pop()
+ src_files = []
+ for path in args:
+ _list_recursive(src_files, path)
+ known_dirs = {""}
+ pyb.exec_("import uos")
+ for dir, file in src_files:
+ dir_parts = dir.split("/")
+ for i in range(len(dir_parts)):
+ d = "/".join(dir_parts[: i + 1])
+ if d not in known_dirs:
+ pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
+ known_dirs.add(d)
+ pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"])
+ else:
+ pyboard.filesystem_command(pyb, args)
+ args.clear()
+
+
+def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
+ while True:
+ if isinstance(console_in, ConsolePosix):
+ # TODO pyb.serial might not have fd
+ select.select([console_in.infd, pyb.serial.fd], [], [])
+ else:
+ while not (console_in.inWaiting() or pyb.serial.inWaiting()):
+ time.sleep(0.01)
+ c = console_in.readchar()
+ if c:
+ if c == b"\x1d": # ctrl-], quit
+ break
+ elif c == b"\x04": # ctrl-D
+ # do a soft reset and reload the filesystem hook
+ pyb.soft_reset_with_mount(console_out_write)
+ elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
+ pyb.serial.write(code_to_inject)
+ elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
+ console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
+ pyb.enter_raw_repl(soft_reset=False)
+ with open(file_to_inject, "rb") as f:
+ pyfile = f.read()
+ try:
+ pyb.exec_raw_no_follow(pyfile)
+ except pyboard.PyboardError as er:
+ console_out_write(b"Error:\r\n")
+ console_out_write(er)
+ pyb.exit_raw_repl()
+ else:
+ pyb.serial.write(c)
+
+ try:
+ n = pyb.serial.inWaiting()
+ except OSError as er:
+ if er.args[0] == 5: # IO error, device disappeared
+ print("device disconnected")
+ break
+
+ if n > 0:
+ c = pyb.serial.read(1)
+ if c is not None:
+ # pass character through to the console
+ oc = ord(c)
+ if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
+ console_out_write(c)
+ else:
+ console_out_write(b"[%02x]" % ord(c))
+
+
+def do_repl(pyb, args):
+ capture_file = None
+ code_to_inject = None
+ file_to_inject = None
+
+ while len(args):
+ if args[0] == "--capture":
+ args.pop(0)
+ capture_file = args.pop(0)
+ elif args[0] == "--inject-code":
+ args.pop(0)
+ code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
+ elif args[0] == "--inject-file":
+ args.pop(0)
+ file_to_inject = args.pop(0)
+ else:
+ break
+
+ print("Connected to MicroPython at %s" % pyb.device_name)
+ print("Use Ctrl-] to exit this shell")
+ if capture_file is not None:
+ print('Capturing session to file "%s"' % capture_file)
+ capture_file = open(capture_file, "wb")
+ if code_to_inject is not None:
+ print("Use Ctrl-J to inject", code_to_inject)
+ if file_to_inject is not None:
+ print('Use Ctrl-K to inject file "%s"' % file_to_inject)
+
+ console = Console()
+ console.enter()
+
+ def console_out_write(b):
+ console.write(b)
+ if capture_file is not None:
+ capture_file.write(b)
+ capture_file.flush()
+
+ try:
+ do_repl_main_loop(
+ pyb,
+ console,
+ console_out_write,
+ code_to_inject=code_to_inject,
+ file_to_inject=file_to_inject,
+ )
+ finally:
+ console.exit()
+ if capture_file is not None:
+ capture_file.close()
+
+
+def execbuffer(pyb, buf, follow):
+ ret_val = 0
+ try:
+ pyb.exec_raw_no_follow(buf)
+ if follow:
+ ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
+ if ret_err:
+ pyboard.stdout_write_bytes(ret_err)
+ ret_val = 1
+ except pyboard.PyboardError as er:
+ print(er)
+ ret_val = 1
+ except KeyboardInterrupt:
+ ret_val = 1
+ return ret_val
+
+
+def main():
+ config = load_user_config()
+ prepare_command_expansions(config)
+
+ args = sys.argv[1:]
+ pyb = None
+ did_action = False
+
+ try:
+ while args:
+ do_command_expansion(args)
+
+ cmds = {
+ "connect": (False, False, 1),
+ "disconnect": (False, False, 0),
+ "mount": (True, False, 1),
+ "repl": (False, True, 0),
+ "eval": (True, True, 1),
+ "exec": (True, True, 1),
+ "run": (True, True, 1),
+ "fs": (True, True, 1),
+ }
+ cmd = args.pop(0)
+ try:
+ need_raw_repl, is_action, num_args_min = cmds[cmd]
+ except KeyError:
+ print(f"{_PROG}: '{cmd}' is not a command")
+ return 1
+
+ if len(args) < num_args_min:
+ print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
+ return 1
+
+ if cmd == "connect":
+ if pyb is not None:
+ do_disconnect(pyb)
+ pyb = do_connect(args)
+ if pyb is None:
+ did_action = True
+ continue
+
+ if pyb is None:
+ pyb = do_connect(["auto"])
+
+ if need_raw_repl:
+ if not pyb.in_raw_repl:
+ pyb.enter_raw_repl()
+ else:
+ if pyb.in_raw_repl:
+ pyb.exit_raw_repl()
+ if is_action:
+ did_action = True
+
+ if cmd == "disconnect":
+ do_disconnect(pyb)
+ pyb = None
+ elif cmd == "mount":
+ path = args.pop(0)
+ pyb.mount_local(path)
+ print(f"Local directory {path} is mounted at /remote")
+ elif cmd in ("exec", "eval", "run"):
+ follow = True
+ if args[0] == "--no-follow":
+ args.pop(0)
+ follow = False
+ if cmd == "exec":
+ buf = args.pop(0)
+ elif cmd == "eval":
+ buf = "print(" + args.pop(0) + ")"
+ else:
+ filename = args.pop(0)
+ try:
+ with open(filename, "rb") as f:
+ buf = f.read()
+ except OSError:
+ print(f"{_PROG}: could not read file '{filename}'")
+ return 1
+ ret = execbuffer(pyb, buf, follow)
+ if ret:
+ return ret
+ elif cmd == "fs":
+ do_filesystem(pyb, args)
+ elif cmd == "repl":
+ do_repl(pyb, args)
+
+ if not did_action:
+ if pyb is None:
+ pyb = do_connect(["auto"])
+ if pyb.in_raw_repl:
+ pyb.exit_raw_repl()
+ do_repl(pyb, args)
+ finally:
+ if pyb is not None:
+ do_disconnect(pyb)
diff --git a/tools/mpremote/mpremote/pyboardextended.py b/tools/mpremote/mpremote/pyboardextended.py
new file mode 100644
index 000000000..e3f259b39
--- /dev/null
+++ b/tools/mpremote/mpremote/pyboardextended.py
@@ -0,0 +1,621 @@
+import os, re, serial, struct, time
+from errno import EPERM
+from .console import VT_ENABLED
+
+try:
+ from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
+except ImportError:
+ import sys
+
+ sys.path.append(os.path.dirname(__file__) + "/../..")
+ from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
+
+fs_hook_cmds = {
+ "CMD_STAT": 1,
+ "CMD_ILISTDIR_START": 2,
+ "CMD_ILISTDIR_NEXT": 3,
+ "CMD_OPEN": 4,
+ "CMD_CLOSE": 5,
+ "CMD_READ": 6,
+ "CMD_WRITE": 7,
+ "CMD_SEEK": 8,
+ "CMD_REMOVE": 9,
+ "CMD_RENAME": 10,
+}
+
+fs_hook_code = """\
+import uos, uio, ustruct, micropython, usys
+
+
+class RemoteCommand:
+ def __init__(self, use_second_port):
+ self.buf4 = bytearray(4)
+ try:
+ import pyb
+ self.fout = pyb.USB_VCP()
+ if use_second_port:
+ self.fin = pyb.USB_VCP(1)
+ else:
+ self.fin = pyb.USB_VCP()
+ except:
+ import usys
+ self.fout = usys.stdout.buffer
+ self.fin = usys.stdin.buffer
+ import select
+ self.poller = select.poll()
+ self.poller.register(self.fin, select.POLLIN)
+
+ def poll_in(self):
+ for _ in self.poller.ipoll(1000):
+ return
+ self.end()
+ raise Exception('timeout waiting for remote')
+
+ def rd(self, n):
+ buf = bytearray(n)
+ self.rd_into(buf, n)
+ return buf
+
+ def rd_into(self, buf, n):
+ # implement reading with a timeout in case other side disappears
+ if n == 0:
+ return
+ self.poll_in()
+ r = self.fin.readinto(buf, n)
+ if r < n:
+ mv = memoryview(buf)
+ while r < n:
+ self.poll_in()
+ r += self.fin.readinto(mv[r:], n - r)
+
+ def begin(self, type):
+ micropython.kbd_intr(-1)
+ buf4 = self.buf4
+ buf4[0] = 0x18
+ buf4[1] = type
+ self.fout.write(buf4, 2)
+ # Wait for sync byte 0x18, but don't get stuck forever
+ for i in range(30):
+ self.poller.poll(1000)
+ self.fin.readinto(buf4, 1)
+ if buf4[0] == 0x18:
+ break
+
+ def end(self):
+ micropython.kbd_intr(3)
+
+ def rd_s8(self):
+ self.rd_into(self.buf4, 1)
+ n = self.buf4[0]
+ if n & 0x80:
+ n -= 0x100
+ return n
+
+ def rd_s32(self):
+ buf4 = self.buf4
+ self.rd_into(buf4, 4)
+ n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
+ if buf4[3] & 0x80:
+ n -= 0x100000000
+ return n
+
+ def rd_u32(self):
+ buf4 = self.buf4
+ self.rd_into(buf4, 4)
+ return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
+
+ def rd_bytes(self, buf):
+ # TODO if n is large (eg >256) then we may miss bytes on stdin
+ n = self.rd_s32()
+ if buf is None:
+ ret = buf = bytearray(n)
+ else:
+ ret = n
+ self.rd_into(buf, n)
+ return ret
+
+ def rd_str(self):
+ n = self.rd_s32()
+ if n == 0:
+ return ''
+ else:
+ return str(self.rd(n), 'utf8')
+
+ def wr_s8(self, i):
+ self.buf4[0] = i
+ self.fout.write(self.buf4, 1)
+
+ def wr_s32(self, i):
+ ustruct.pack_into('<i', self.buf4, 0, i)
+ self.fout.write(self.buf4)
+
+ def wr_bytes(self, b):
+ self.wr_s32(len(b))
+ self.fout.write(b)
+
+ # str and bytes act the same in MicroPython
+ wr_str = wr_bytes
+
+
+class RemoteFile(uio.IOBase):
+ def __init__(self, cmd, fd, is_text):
+ self.cmd = cmd
+ self.fd = fd
+ self.is_text = is_text
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ self.close()
+
+ def ioctl(self, request, arg):
+ if request == 4: # CLOSE
+ self.close()
+ return 0
+
+ def flush(self):
+ pass
+
+ def close(self):
+ if self.fd is None:
+ return
+ c = self.cmd
+ c.begin(CMD_CLOSE)
+ c.wr_s8(self.fd)
+ c.end()
+ self.fd = None
+
+ def read(self, n=-1):
+ c = self.cmd
+ c.begin(CMD_READ)
+ c.wr_s8(self.fd)
+ c.wr_s32(n)
+ data = c.rd_bytes(None)
+ c.end()
+ if self.is_text:
+ data = str(data, 'utf8')
+ else:
+ data = bytes(data)
+ return data
+
+ def readinto(self, buf):
+ c = self.cmd
+ c.begin(CMD_READ)
+ c.wr_s8(self.fd)
+ c.wr_s32(len(buf))
+ n = c.rd_bytes(buf)
+ c.end()
+ return n
+
+ def readline(self):
+ l = ''
+ while 1:
+ c = self.read(1)
+ l += c
+ if c == '\\n' or c == '':
+ return l
+
+ def readlines(self):
+ ls = []
+ while 1:
+ l = self.readline()
+ if not l:
+ return ls
+ ls.append(l)
+
+ def write(self, buf):
+ c = self.cmd
+ c.begin(CMD_WRITE)
+ c.wr_s8(self.fd)
+ c.wr_bytes(buf)
+ n = c.rd_s32()
+ c.end()
+ return n
+
+ def seek(self, n):
+ c = self.cmd
+ c.begin(CMD_SEEK)
+ c.wr_s8(self.fd)
+ c.wr_s32(n)
+ n = c.rd_s32()
+ c.end()
+ return n
+
+
+class RemoteFS:
+ def __init__(self, cmd):
+ self.cmd = cmd
+
+ def mount(self, readonly, mkfs):
+ pass
+
+ def umount(self):
+ pass
+
+ def chdir(self, path):
+ if not path.startswith("/"):
+ path = self.path + path
+ if not path.endswith("/"):
+ path += "/"
+ if path != "/":
+ self.stat(path)
+ self.path = path
+
+ def getcwd(self):
+ return self.path
+
+ def remove(self, path):
+ c = self.cmd
+ c.begin(CMD_REMOVE)
+ c.wr_str(self.path + path)
+ res = c.rd_s32()
+ c.end()
+ if res < 0:
+ raise OSError(-res)
+
+ def rename(self, old, new):
+ c = self.cmd
+ c.begin(CMD_RENAME)
+ c.wr_str(self.path + old)
+ c.wr_str(self.path + new)
+ res = c.rd_s32()
+ c.end()
+ if res < 0:
+ raise OSError(-res)
+
+ def stat(self, path):
+ c = self.cmd
+ c.begin(CMD_STAT)
+ c.wr_str(self.path + path)
+ res = c.rd_s8()
+ if res < 0:
+ c.end()
+ raise OSError(-res)
+ mode = c.rd_u32()
+ size = c.rd_u32()
+ atime = c.rd_u32()
+ mtime = c.rd_u32()
+ ctime = c.rd_u32()
+ c.end()
+ return mode, 0, 0, 0, 0, 0, size, atime, mtime, ctime
+
+ def ilistdir(self, path):
+ c = self.cmd
+ c.begin(CMD_ILISTDIR_START)
+ c.wr_str(self.path + path)
+ res = c.rd_s8()
+ c.end()
+ if res < 0:
+ raise OSError(-res)
+ def next():
+ while True:
+ c.begin(CMD_ILISTDIR_NEXT)
+ name = c.rd_str()
+ if name:
+ type = c.rd_u32()
+ c.end()
+ yield (name, type, 0)
+ else:
+ c.end()
+ break
+ return next()
+
+ def open(self, path, mode):
+ c = self.cmd
+ c.begin(CMD_OPEN)
+ c.wr_str(self.path + path)
+ c.wr_str(mode)
+ fd = c.rd_s8()
+ c.end()
+ if fd < 0:
+ raise OSError(-fd)
+ return RemoteFile(c, fd, mode.find('b') == -1)
+
+
+def __mount(use_second_port):
+ uos.mount(RemoteFS(RemoteCommand(use_second_port)), '/remote')
+ uos.chdir('/remote')
+"""
+
+# Apply basic compression on hook code.
+for key, value in fs_hook_cmds.items():
+ fs_hook_code = re.sub(key, str(value), fs_hook_code)
+fs_hook_code = re.sub(" *#.*$", "", fs_hook_code, flags=re.MULTILINE)
+fs_hook_code = re.sub("\n\n+", "\n", fs_hook_code)
+fs_hook_code = re.sub(" ", " ", fs_hook_code)
+fs_hook_code = re.sub("rd_", "r", fs_hook_code)
+fs_hook_code = re.sub("wr_", "w", fs_hook_code)
+fs_hook_code = re.sub("buf4", "b4", fs_hook_code)
+
+
+class PyboardCommand:
+ def __init__(self, fin, fout, path):
+ self.fin = fin
+ self.fout = fout
+ self.root = path + "/"
+ self.data_ilistdir = ["", []]
+ self.data_files = []
+
+ def rd_s8(self):
+ return struct.unpack("<b", self.fin.read(1))[0]
+
+ def rd_s32(self):
+ return struct.unpack("<i", self.fin.read(4))[0]
+
+ def rd_bytes(self):
+ n = self.rd_s32()
+ return self.fin.read(n)
+
+ def rd_str(self):
+ n = self.rd_s32()
+ if n == 0:
+ return ""
+ else:
+ return str(self.fin.read(n), "utf8")
+
+ def wr_s8(self, i):
+ self.fout.write(struct.pack("<b", i))
+
+ def wr_s32(self, i):
+ self.fout.write(struct.pack("<i", i))
+
+ def wr_u32(self, i):
+ self.fout.write(struct.pack("<I", i))
+
+ def wr_bytes(self, b):
+ self.wr_s32(len(b))
+ self.fout.write(b)
+
+ def wr_str(self, s):
+ b = bytes(s, "utf8")
+ self.wr_s32(len(b))
+ self.fout.write(b)
+
+ def log_cmd(self, msg):
+ print(f"[{msg}]", end="\r\n")
+
+ def path_check(self, path):
+ parent = os.path.realpath(self.root)
+ child = os.path.realpath(path)
+ if parent != os.path.commonpath([parent, child]):
+ raise OSError(EPERM, "") # File is outside mounted dir
+
+ def do_stat(self):
+ path = self.root + self.rd_str()
+ # self.log_cmd(f"stat {path}")
+ try:
+ self.path_check(path)
+ stat = os.stat(path)
+ except OSError as er:
+ self.wr_s8(-abs(er.errno))
+ else:
+ self.wr_s8(0)
+ # Note: st_ino would need to be 64-bit if added here
+ self.wr_u32(stat.st_mode)
+ self.wr_u32(stat.st_size)
+ self.wr_u32(int(stat.st_atime))
+ self.wr_u32(int(stat.st_mtime))
+ self.wr_u32(int(stat.st_ctime))
+
+ def do_ilistdir_start(self):
+ path = self.root + self.rd_str()
+ try:
+ self.path_check(path)
+ self.wr_s8(0)
+ except OSError as er:
+ self.wr_s8(-abs(er.errno))
+ else:
+ self.data_ilistdir[0] = path
+ self.data_ilistdir[1] = os.listdir(path)
+
+ def do_ilistdir_next(self):
+ if self.data_ilistdir[1]:
+ entry = self.data_ilistdir[1].pop(0)
+ try:
+ stat = os.lstat(self.data_ilistdir[0] + "/" + entry)
+ mode = stat.st_mode & 0xC000
+ except OSError as er:
+ mode = 0
+ self.wr_str(entry)
+ self.wr_u32(mode)
+ else:
+ self.wr_str("")
+
+ def do_open(self):
+ path = self.root + self.rd_str()
+ mode = self.rd_str()
+ # self.log_cmd(f"open {path} {mode}")
+ try:
+ self.path_check(path)
+ f = open(path, mode)
+ except OSError as er:
+ self.wr_s8(-abs(er.errno))
+ else:
+ is_text = mode.find("b") == -1
+ try:
+ fd = self.data_files.index(None)
+ self.data_files[fd] = (f, is_text)
+ except ValueError:
+ fd = len(self.data_files)
+ self.data_files.append((f, is_text))
+ self.wr_s8(fd)
+
+ def do_close(self):
+ fd = self.rd_s8()
+ # self.log_cmd(f"close {fd}")
+ self.data_files[fd][0].close()
+ self.data_files[fd] = None
+
+ def do_read(self):
+ fd = self.rd_s8()
+ n = self.rd_s32()
+ buf = self.data_files[fd][0].read(n)
+ if self.data_files[fd][1]:
+ buf = bytes(buf, "utf8")
+ self.wr_bytes(buf)
+ # self.log_cmd(f"read {fd} {n} -> {len(buf)}")
+
+ def do_seek(self):
+ fd = self.rd_s8()
+ n = self.rd_s32()
+ # self.log_cmd(f"seek {fd} {n}")
+ self.data_files[fd][0].seek(n)
+ self.wr_s32(n)
+
+ def do_write(self):
+ fd = self.rd_s8()
+ buf = self.rd_bytes()
+ if self.data_files[fd][1]:
+ buf = str(buf, "utf8")
+ n = self.data_files[fd][0].write(buf)
+ self.wr_s32(n)
+ # self.log_cmd(f"write {fd} {len(buf)} -> {n}")
+
+ def do_remove(self):
+ path = self.root + self.rd_str()
+ # self.log_cmd(f"remove {path}")
+ try:
+ self.path_check(path)
+ os.remove(path)
+ ret = 0
+ except OSError as er:
+ ret = -abs(er.errno)
+ self.wr_s32(ret)
+
+ def do_rename(self):
+ old = self.root + self.rd_str()
+ new = self.root + self.rd_str()
+ # self.log_cmd(f"rename {old} {new}")
+ try:
+ self.path_check(old)
+ self.path_check(new)
+ os.rename(old, new)
+ ret = 0
+ except OSError as er:
+ ret = -abs(er.errno)
+ self.wr_s32(ret)
+
+ cmd_table = {
+ fs_hook_cmds["CMD_STAT"]: do_stat,
+ fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start,
+ fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next,
+ fs_hook_cmds["CMD_OPEN"]: do_open,
+ fs_hook_cmds["CMD_CLOSE"]: do_close,
+ fs_hook_cmds["CMD_READ"]: do_read,
+ fs_hook_cmds["CMD_WRITE"]: do_write,
+ fs_hook_cmds["CMD_SEEK"]: do_seek,
+ fs_hook_cmds["CMD_REMOVE"]: do_remove,
+ fs_hook_cmds["CMD_RENAME"]: do_rename,
+ }
+
+
+class SerialIntercept:
+ def __init__(self, serial, cmd):
+ self.orig_serial = serial
+ self.cmd = cmd
+ self.buf = b""
+ self.orig_serial.timeout = 5.0
+
+ def _check_input(self, blocking):
+ if blocking or self.orig_serial.inWaiting() > 0:
+ c = self.orig_serial.read(1)
+ if c == b"\x18":
+ # a special command
+ c = self.orig_serial.read(1)[0]
+ self.orig_serial.write(b"\x18") # Acknowledge command
+ PyboardCommand.cmd_table[c](self.cmd)
+ elif not VT_ENABLED and c == b"\x1b":
+ # ESC code, ignore these on windows
+ esctype = self.orig_serial.read(1)
+ if esctype == b"[": # CSI
+ while not (0x40 < self.orig_serial.read(1)[0] < 0x7E):
+ # Looking for "final byte" of escape sequence
+ pass
+ else:
+ self.buf += c
+
+ @property
+ def fd(self):
+ return self.orig_serial.fd
+
+ def close(self):
+ self.orig_serial.close()
+
+ def inWaiting(self):
+ self._check_input(False)
+ return len(self.buf)
+
+ def read(self, n):
+ while len(self.buf) < n:
+ self._check_input(True)
+ out = self.buf[:n]
+ self.buf = self.buf[n:]
+ return out
+
+ def write(self, buf):
+ self.orig_serial.write(buf)
+
+
+class PyboardExtended(Pyboard):
+ def __init__(self, dev, *args, **kwargs):
+ super().__init__(dev, *args, **kwargs)
+ self.device_name = dev
+ self.mounted = False
+
+ def mount_local(self, path, dev_out=None):
+ fout = self.serial
+ if dev_out is not None:
+ try:
+ fout = serial.Serial(dev_out)
+ except serial.SerialException:
+ port = list(serial.tools.list_ports.grep(dev_out))
+ if not port:
+ raise
+ for p in port:
+ try:
+ fout = serial.Serial(p.device)
+ break
+ except serial.SerialException:
+ pass
+ self.mounted = True
+ if self.eval('"RemoteFS" in globals()') == b"False":
+ self.exec_(fs_hook_code)
+ self.exec_("__mount(%s)" % (dev_out is not None))
+ self.cmd = PyboardCommand(self.serial, fout, path)
+ self.serial = SerialIntercept(self.serial, self.cmd)
+ self.dev_out = dev_out
+
+ def soft_reset_with_mount(self, out_callback):
+ self.serial.write(b"\x04")
+ if not self.mounted:
+ return
+
+ # Wait for a response to the soft-reset command.
+ for i in range(10):
+ if self.serial.inWaiting():
+ break
+ time.sleep(0.05)
+ else:
+ # Device didn't respond so it wasn't in a state to do a soft reset.
+ return
+
+ out_callback(self.serial.read(1))
+ self.serial = self.serial.orig_serial
+ n = self.serial.inWaiting()
+ while n > 0:
+ buf = self.serial.read(n)
+ out_callback(buf)
+ time.sleep(0.1)
+ n = self.serial.inWaiting()
+ self.serial.write(b"\x01")
+ self.exec_(fs_hook_code)
+ self.exec_("__mount(%s)" % (self.dev_out is not None))
+ self.exit_raw_repl()
+ self.read_until(4, b">>> ")
+ self.serial = SerialIntercept(self.serial, self.cmd)
+
+ def umount_local(self):
+ if self.mounted:
+ self.exec_('uos.umount("/remote")')
+ self.mounted = False
diff --git a/tools/mpremote/pyproject.toml b/tools/mpremote/pyproject.toml
new file mode 100644
index 000000000..374b58cbf
--- /dev/null
+++ b/tools/mpremote/pyproject.toml
@@ -0,0 +1,6 @@
+[build-system]
+requires = [
+ "setuptools>=42",
+ "wheel"
+]
+build-backend = "setuptools.build_meta"
diff --git a/tools/mpremote/setup.cfg b/tools/mpremote/setup.cfg
new file mode 100644
index 000000000..4a1bf27c1
--- /dev/null
+++ b/tools/mpremote/setup.cfg
@@ -0,0 +1,25 @@
+[metadata]
+name = mpremote
+version = 0.0.4
+author = Damien George
+author_email = damien@micropython.org
+description = Tool for interacting remotely with MicroPython
+long_description = file: README.md
+long_description_content_type = text/markdown
+url = https://github.com/micropython/micropython
+project_urls =
+ Bug Tracker = https://github.com/micropython/micropython/issues
+classifiers =
+ Programming Language :: Python :: 3
+ License :: OSI Approved :: MIT License
+ Operating System :: OS Independent
+
+[options]
+packages = mpremote
+python_requires = >= 3.4
+install_requires =
+ pyserial >= 3.3
+
+[options.entry_points]
+console_scripts =
+ mpremote = mpremote.main:main