summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/mpremote/mpremote/commands.py261
-rw-r--r--tools/mpremote/mpremote/main.py557
-rw-r--r--tools/mpremote/mpremote/repl.py101
3 files changed, 510 insertions, 409 deletions
diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py
new file mode 100644
index 000000000..60a625d5e
--- /dev/null
+++ b/tools/mpremote/mpremote/commands.py
@@ -0,0 +1,261 @@
+import os
+import sys
+import tempfile
+
+import serial.tools.list_ports
+
+from . import pyboardextended as pyboard
+
+
+class CommandError(Exception):
+ pass
+
+ do_disconnect(state)
+
+ try:
+ if dev == "list":
+ # List attached devices.
+ for p in sorted(serial.tools.list_ports.comports()):
+ print(
+ "{} {} {:04x}:{:04x} {} {}".format(
+ p.device,
+ p.serial_number,
+ p.vid if isinstance(p.vid, int) else 0,
+ p.pid if isinstance(p.pid, int) else 0,
+ p.manufacturer,
+ p.product,
+ )
+ )
+ # Don't do implicit REPL command.
+ state.did_action()
+ elif dev == "auto":
+ # Auto-detect and auto-connect to the first available device.
+ for p in sorted(serial.tools.list_ports.comports()):
+ try:
+ state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
+ return
+ except pyboard.PyboardError as er:
+ if not er.args[0].startswith("failed to access"):
+ raise er
+ raise pyboard.PyboardError("no device found")
+ elif dev.startswith("id:"):
+ # Search for a device with the given serial number.
+ serial_number = dev[len("id:") :]
+ dev = None
+ for p in serial.tools.list_ports.comports():
+ if p.serial_number == serial_number:
+ state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
+ return
+ raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
+ else:
+ # Connect to the given device.
+ if dev.startswith("port:"):
+ dev = dev[len("port:") :]
+ state.pyb = pyboard.PyboardExtended(dev, baudrate=115200)
+ return
+ except pyboard.PyboardError as er:
+ msg = er.args[0]
+ if msg.startswith("failed to access"):
+ msg += " (it may be in use by another program)"
+ print(msg)
+ sys.exit(1)
+
+
+def do_disconnect(state, _args=None):
+ if not state.pyb:
+ return
+
+ try:
+ if state.pyb.mounted:
+ if not state.pyb.in_raw_repl:
+ state.pyb.enter_raw_repl(soft_reset=False)
+ state.pyb.umount_local()
+ if state.pyb.in_raw_repl:
+ state.pyb.exit_raw_repl()
+ except OSError:
+ # Ignore any OSError exceptions when shutting down, eg:
+ # - pyboard.filesystem_command will close the connecton if it had an error
+ # - umounting will fail if serial port disappeared
+ pass
+ state.pyb.close()
+ state.pyb = None
+ state._auto_soft_reset = True
+
+
+def show_progress_bar(size, total_size, op="copying"):
+ if not sys.stdout.isatty():
+ return
+ verbose_size = 2048
+ bar_length = 20
+ if total_size < verbose_size:
+ return
+ elif size >= total_size:
+ # Clear progress bar when copy completes
+ print("\r" + " " * (13 + len(op) + bar_length) + "\r", end="")
+ else:
+ bar = size * bar_length // total_size
+ progress = size * 100 // total_size
+ print(
+ "\r ... {} {:3d}% [{}{}]".format(op, progress, "#" * bar, "-" * (bar_length - bar)),
+ end="",
+ )
+
+
+# Get all args up to the terminator ("+").
+# The passed args will be updated with these ones removed.
+def _get_fs_args(args):
+ n = 0
+ for src in args:
+ if src == "+":
+ break
+ n += 1
+ fs_args = args[:n]
+ args[:] = args[n + 1 :]
+ return fs_args
+
+
+def do_filesystem(state, args):
+ state.ensure_raw_repl()
+ state.did_action()
+
+ def _list_recursive(files, path):
+ if os.path.isdir(path):
+ for entry in os.listdir(path):
+ _list_recursive(files, "/".join((path, entry)))
+ else:
+ files.append(os.path.split(path))
+
+ fs_args = _get_fs_args(args)
+
+ # Don't be verbose when using cat, so output can be redirected to something.
+ verbose = fs_args[0] != "cat"
+
+ if fs_args[0] == "cp" and fs_args[1] == "-r":
+ fs_args.pop(0)
+ fs_args.pop(0)
+ if fs_args[-1] != ":":
+ print(f"{_PROG}: 'cp -r' destination must be ':'")
+ sys.exit(1)
+ fs_args.pop()
+ src_files = []
+ for path in fs_args:
+ if path.startswith(":"):
+ raise CommandError("'cp -r' source files must be local")
+ _list_recursive(src_files, path)
+ known_dirs = {""}
+ state.pyb.exec_("import uos")
+ for dir, file in src_files:
+ dir_parts = dir.split("/")
+ for i in range(len(dir_parts)):
+ d = "/".join(dir_parts[: i + 1])
+ if d not in known_dirs:
+ state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
+ known_dirs.add(d)
+ pyboard.filesystem_command(
+ state.pyb,
+ ["cp", "/".join((dir, file)), ":" + dir + "/"],
+ progress_callback=show_progress_bar,
+ verbose=verbose,
+ )
+ else:
+ try:
+ pyboard.filesystem_command(
+ state.pyb, fs_args, progress_callback=show_progress_bar, verbose=verbose
+ )
+ except OSError as er:
+ raise CommandError(er)
+
+
+def do_edit(state, args):
+ state.ensure_raw_repl()
+ state.did_action()
+
+ if not os.getenv("EDITOR"):
+ raise pyboard.PyboardError("edit: $EDITOR not set")
+ for src in _get_fs_args(args):
+ src = src.lstrip(":")
+ dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
+ try:
+ print("edit :%s" % (src,))
+ os.close(dest_fd)
+ state.pyb.fs_touch(src)
+ state.pyb.fs_get(src, dest, progress_callback=show_progress_bar)
+ if os.system("$EDITOR '%s'" % (dest,)) == 0:
+ state.pyb.fs_put(dest, src, progress_callback=show_progress_bar)
+ finally:
+ os.unlink(dest)
+
+
+def _get_follow_arg(args):
+ if args[0] == "--no-follow":
+ args.pop(0)
+ return False
+ else:
+ return True
+
+
+def _do_execbuffer(state, buf, follow):
+ state.ensure_raw_repl()
+ state.did_action()
+
+ try:
+ state.pyb.exec_raw_no_follow(buf)
+ if follow:
+ ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
+ if ret_err:
+ pyboard.stdout_write_bytes(ret_err)
+ sys.exit(1)
+ except pyboard.PyboardError as er:
+ print(er)
+ sys.exit(1)
+ except KeyboardInterrupt:
+ sys.exit(1)
+
+
+def do_exec(state, args):
+ follow = _get_follow_arg(args)
+ buf = args.pop(0)
+ _do_execbuffer(state, buf, follow)
+
+
+def do_eval(state, args):
+ follow = _get_follow_arg(args)
+ buf = "print(" + args.pop(0) + ")"
+ _do_execbuffer(state, buf, follow)
+
+
+def do_run(state, args):
+ follow = _get_follow_arg(args)
+ filename = args.pop(0)
+ try:
+ with open(filename, "rb") as f:
+ buf = f.read()
+ except OSError:
+ raise CommandError(f"could not read file '{filename}'")
+ sys.exit(1)
+ _do_execbuffer(state, buf, follow)
+
+
+def do_mount(state, args):
+ state.ensure_raw_repl()
+
+ unsafe_links = False
+ if args[0] == "--unsafe-links" or args[0] == "-l":
+ args.pop(0)
+ unsafe_links = True
+ path = args.pop(0)
+ state.pyb.mount_local(path, unsafe_links=unsafe_links)
+ print(f"Local directory {path} is mounted at /remote")
+
+
+def do_umount(state, path):
+ state.ensure_raw_repl()
+ state.pyb.umount_local()
+
+
+def do_resume(state, _args=None):
+ state._auto_soft_reset = False
+
+
+def do_soft_reset(state, _args=None):
+ state.ensure_raw_repl(soft_reset=True)
diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py
index bd98da882..b96e3f46b 100644
--- a/tools/mpremote/mpremote/main.py
+++ b/tools/mpremote/mpremote/main.py
@@ -19,45 +19,101 @@ MicroPython device over a serial connection. Commands supported are:
import os, sys
from collections.abc import Mapping
-import tempfile
from textwrap import dedent
-import serial.tools.list_ports
-
-from . import pyboardextended as pyboard
-from .console import Console, ConsolePosix
+from .commands import (
+ CommandError,
+ do_connect,
+ do_disconnect,
+ do_edit,
+ do_filesystem,
+ do_mount,
+ do_umount,
+ do_exec,
+ do_eval,
+ do_run,
+ do_resume,
+ do_soft_reset,
+)
+from .repl import do_repl
_PROG = "mpremote"
-# (need_raw_repl, is_action, num_args_min, help_text)
+
+def do_help(state, _args=None):
+ def print_commands_help(cmds, help_idx):
+ max_command_len = max(len(cmd) for cmd in cmds.keys())
+ for cmd in sorted(cmds.keys()):
+ help_message_lines = dedent(cmds[cmd][help_idx]).split("\n")
+ help_message = help_message_lines[0]
+ for line in help_message_lines[1:]:
+ help_message = "{}\n{}{}".format(help_message, " " * (max_command_len + 4), line)
+ print(" ", cmd, " " * (max_command_len - len(cmd) + 2), help_message, sep="")
+
+ print(_PROG, "-- MicroPython remote control")
+ print("See https://docs.micropython.org/en/latest/reference/mpremote.html")
+
+ print("\nList of commands:")
+ print_commands_help(_COMMANDS, 1)
+
+ print("\nList of shortcuts:")
+ print_commands_help(_command_expansions, 2)
+
+ sys.exit(0)
+
+
+def do_version(state, _args=None):
+ from . import __version__
+
+ print(f"{_PROG} {__version__}")
+ sys.exit(0)
+
+
+# Map of "command" to tuple of (num_args_min, help_text, handler).
_COMMANDS = {
"connect": (
- False,
- False,
1,
"""\
connect to given device
device may be: list, auto, id:x, port:x
or any valid device name/path""",
+ do_connect,
+ ),
+ "disconnect": (
+ 0,
+ "disconnect current device",
+ do_disconnect,
+ ),
+ "edit": (
+ 1,
+ "edit files on the device",
+ do_edit,
+ ),
+ "resume": (
+ 0,
+ "resume a previous mpremote session (will not auto soft-reset)",
+ do_resume,
+ ),
+ "soft-reset": (
+ 0,
+ "perform a soft-reset of the device",
+ do_soft_reset,
),
- "disconnect": (False, False, 0, "disconnect current device"),
- "edit": (True, True, 1, "edit files on the device"),
- "resume": (False, False, 0, "resume a previous mpremote session (will not auto soft-reset)"),
- "soft-reset": (False, True, 0, "perform a soft-reset of the device"),
"mount": (
- True,
- False,
1,
"""\
mount local directory on device
options:
--unsafe-links, -l
follow symbolic links pointing outside of local directory""",
+ do_mount,
+ ),
+ "umount": (
+ 0,
+ "unmount the local directory",
+ do_umount,
),
- "umount": (True, False, 0, "unmount the local directory"),
"repl": (
- False,
- True,
0,
"""\
enter REPL
@@ -65,15 +121,45 @@ _COMMANDS = {
--capture <file>
--inject-code <string>
--inject-file <file>""",
+ do_repl,
+ ),
+ "eval": (
+ 1,
+ "evaluate and print the string",
+ do_eval,
+ ),
+ "exec": (
+ 1,
+ "execute the string",
+ do_exec,
+ ),
+ "run": (
+ 1,
+ "run the given local script",
+ do_run,
+ ),
+ "fs": (
+ 1,
+ "execute filesystem commands on the device",
+ do_filesystem,
+ ),
+ "help": (
+ 0,
+ "print help and exit",
+ do_help,
+ ),
+ "version": (
+ 0,
+ "print version and exit",
+ do_version,
),
- "eval": (True, True, 1, "evaluate and print the string"),
- "exec": (True, True, 1, "execute the string"),
- "run": (True, True, 1, "run the given local script"),
- "fs": (True, True, 1, "execute filesystem commands on the device"),
- "help": (False, False, 0, "print help and exit"),
- "version": (False, False, 0, "print version and exit"),
}
+# Additional commands aliases.
+# The value can either be:
+# - A command string.
+# - A list of command strings, each command will be executed sequentially.
+# - A dict of command: { [], help: ""}
_BUILTIN_COMMAND_EXPANSIONS = {
# Device connection shortcuts.
"devs": {
@@ -117,6 +203,8 @@ _BUILTIN_COMMAND_EXPANSIONS = {
"--version": "version",
}
+# Add "a0", "a1", ..., "u0", "u1", ..., "c0", "c1", ... as aliases
+# for "connect /dev/ttyACMn" (and /dev/ttyUSBn, COMn) etc.
for port_num in range(4):
for prefix, port in [("a", "/dev/ttyACM"), ("u", "/dev/ttyUSB"), ("c", "COM")]:
_BUILTIN_COMMAND_EXPANSIONS["{}{}".format(prefix, port_num)] = {
@@ -220,307 +308,33 @@ def do_command_expansion(args):
args[0:0] = ["exec", ";".join(pre)]
-def do_connect(args):
- dev = args.pop(0)
- try:
- if dev == "list":
- # List attached devices.
- for p in sorted(serial.tools.list_ports.comports()):
- print(
- "{} {} {:04x}:{:04x} {} {}".format(
- p.device,
- p.serial_number,
- p.vid if isinstance(p.vid, int) else 0,
- p.pid if isinstance(p.pid, int) else 0,
- p.manufacturer,
- p.product,
- )
- )
- return None
- elif dev == "auto":
- # Auto-detect and auto-connect to the first available device.
- for p in sorted(serial.tools.list_ports.comports()):
- try:
- return pyboard.PyboardExtended(p.device, baudrate=115200)
- except pyboard.PyboardError as er:
- if not er.args[0].startswith("failed to access"):
- raise er
- raise pyboard.PyboardError("no device found")
- elif dev.startswith("id:"):
- # Search for a device with the given serial number.
- serial_number = dev[len("id:") :]
- dev = None
- for p in serial.tools.list_ports.comports():
- if p.serial_number == serial_number:
- return pyboard.PyboardExtended(p.device, baudrate=115200)
- raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
- else:
- # Connect to the given device.
- if dev.startswith("port:"):
- dev = dev[len("port:") :]
- return pyboard.PyboardExtended(dev, baudrate=115200)
- except pyboard.PyboardError as er:
- msg = er.args[0]
- if msg.startswith("failed to access"):
- msg += " (it may be in use by another program)"
- print(msg)
- sys.exit(1)
+class State:
+ def __init__(self):
+ self.pyb = None
+ self._did_action = False
+ self._auto_soft_reset = True
+ def did_action(self):
+ self._did_action = True
-def do_disconnect(pyb):
- try:
- if pyb.mounted:
- if not pyb.in_raw_repl:
- pyb.enter_raw_repl(soft_reset=False)
- pyb.umount_local()
- if pyb.in_raw_repl:
- pyb.exit_raw_repl()
- except OSError:
- # Ignore any OSError exceptions when shutting down, eg:
- # - pyboard.filesystem_command will close the connecton if it had an error
- # - umounting will fail if serial port disappeared
- pass
- pyb.close()
-
-
-def show_progress_bar(size, total_size):
- if not sys.stdout.isatty():
- return
- verbose_size = 2048
- bar_length = 20
- if total_size < verbose_size:
- return
- elif size >= total_size:
- # Clear progress bar when copy completes
- print("\r" + " " * (20 + bar_length) + "\r", end="")
- else:
- progress = size / total_size
- bar = round(progress * bar_length)
- print(
- "\r ... copying {:3.0f}% [{}{}]".format(
- progress * 100, "#" * bar, "-" * (bar_length - bar)
- ),
- end="",
- )
-
-
-# Get all args up to the terminator ("+").
-# The passed args will be updated with these ones removed.
-def get_fs_args(args):
- n = 0
- for src in args:
- if src == "+":
- break
- n += 1
- fs_args = args[:n]
- args[:] = args[n + 1 :]
- return fs_args
-
-
-def do_filesystem(pyb, args):
- def _list_recursive(files, path):
- if os.path.isdir(path):
- for entry in os.listdir(path):
- _list_recursive(files, "/".join((path, entry)))
- else:
- files.append(os.path.split(path))
-
- fs_args = get_fs_args(args)
-
- # Don't be verbose when using cat, so output can be redirected to something.
- verbose = fs_args[0] != "cat"
-
- if fs_args[0] == "cp" and fs_args[1] == "-r":
- fs_args.pop(0)
- fs_args.pop(0)
- if fs_args[-1] != ":":
- print(f"{_PROG}: 'cp -r' destination must be ':'")
- sys.exit(1)
- fs_args.pop()
- src_files = []
- for path in fs_args:
- if path.startswith(":"):
- print(f"{_PROG}: 'cp -r' source files must be local")
- sys.exit(1)
- _list_recursive(src_files, path)
- known_dirs = {""}
- pyb.exec_("import uos")
- for dir, file in src_files:
- dir_parts = dir.split("/")
- for i in range(len(dir_parts)):
- d = "/".join(dir_parts[: i + 1])
- if d not in known_dirs:
- pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
- known_dirs.add(d)
- pyboard.filesystem_command(
- pyb,
- ["cp", "/".join((dir, file)), ":" + dir + "/"],
- progress_callback=show_progress_bar,
- verbose=verbose,
- )
- else:
- try:
- pyboard.filesystem_command(
- pyb, fs_args, progress_callback=show_progress_bar, verbose=verbose
- )
- except OSError as er:
- print(f"{_PROG}: {er}")
- sys.exit(1)
-
-
-def do_edit(pyb, args):
- if not os.getenv("EDITOR"):
- raise pyboard.PyboardError("edit: $EDITOR not set")
- for src in get_fs_args(args):
- src = src.lstrip(":")
- dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
- try:
- print("edit :%s" % (src,))
- os.close(dest_fd)
- pyb.fs_touch(src)
- pyb.fs_get(src, dest, progress_callback=show_progress_bar)
- if os.system("$EDITOR '%s'" % (dest,)) == 0:
- pyb.fs_put(dest, src, progress_callback=show_progress_bar)
- finally:
- os.unlink(dest)
-
-
-def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
- while True:
- console_in.waitchar(pyb.serial)
- c = console_in.readchar()
- if c:
- if c == b"\x1d": # ctrl-], quit
- break
- elif c == b"\x04": # ctrl-D
- # special handling needed for ctrl-D if filesystem is mounted
- pyb.write_ctrl_d(console_out_write)
- elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
- pyb.serial.write(code_to_inject)
- elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
- console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
- pyb.enter_raw_repl(soft_reset=False)
- with open(file_to_inject, "rb") as f:
- pyfile = f.read()
- try:
- pyb.exec_raw_no_follow(pyfile)
- except pyboard.PyboardError as er:
- console_out_write(b"Error:\r\n")
- console_out_write(er)
- pyb.exit_raw_repl()
- else:
- pyb.serial.write(c)
-
- try:
- n = pyb.serial.inWaiting()
- except OSError as er:
- if er.args[0] == 5: # IO error, device disappeared
- print("device disconnected")
- break
-
- if n > 0:
- c = pyb.serial.read(1)
- if c is not None:
- # pass character through to the console
- oc = ord(c)
- if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
- console_out_write(c)
- else:
- console_out_write(b"[%02x]" % ord(c))
-
-
-def do_repl(pyb, args):
- capture_file = None
- code_to_inject = None
- file_to_inject = None
-
- while len(args):
- if args[0] == "--capture":
- args.pop(0)
- capture_file = args.pop(0)
- elif args[0] == "--inject-code":
- args.pop(0)
- code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
- elif args[0] == "--inject-file":
- args.pop(0)
- file_to_inject = args.pop(0)
- else:
- break
-
- print("Connected to MicroPython at %s" % pyb.device_name)
- print("Use Ctrl-] to exit this shell")
- if capture_file is not None:
- print('Capturing session to file "%s"' % capture_file)
- capture_file = open(capture_file, "wb")
- if code_to_inject is not None:
- print("Use Ctrl-J to inject", code_to_inject)
- if file_to_inject is not None:
- print('Use Ctrl-K to inject file "%s"' % file_to_inject)
-
- console = Console()
- console.enter()
-
- def console_out_write(b):
- console.write(b)
- if capture_file is not None:
- capture_file.write(b)
- capture_file.flush()
+ def run_repl_on_completion(self):
+ return not self._did_action
- try:
- do_repl_main_loop(
- pyb,
- console,
- console_out_write,
- code_to_inject=code_to_inject,
- file_to_inject=file_to_inject,
- )
- finally:
- console.exit()
- if capture_file is not None:
- capture_file.close()
+ def ensure_connected(self):
+ if self.pyb is None:
+ do_connect(self, ["auto"])
+ def ensure_raw_repl(self, soft_reset=None):
+ self.ensure_connected()
+ soft_reset = self._auto_soft_reset if soft_reset is None else soft_reset
+ if soft_reset or not self.pyb.in_raw_repl:
+ self.pyb.enter_raw_repl(soft_reset=soft_reset)
+ self._auto_soft_reset = False
-def execbuffer(pyb, buf, follow):
- ret_val = 0
- try:
- pyb.exec_raw_no_follow(buf)
- if follow:
- ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
- if ret_err:
- pyboard.stdout_write_bytes(ret_err)
- ret_val = 1
- except pyboard.PyboardError as er:
- print(er)
- ret_val = 1
- except KeyboardInterrupt:
- ret_val = 1
- return ret_val
-
-
-def print_help():
- def print_commands_help(cmds, help_idx):
- max_command_len = max(len(cmd) for cmd in cmds.keys())
- for cmd in sorted(cmds.keys()):
- help_message_lines = dedent(cmds[cmd][help_idx]).split("\n")
- help_message = help_message_lines[0]
- for line in help_message_lines[1:]:
- help_message = "{}\n{}{}".format(help_message, " " * (max_command_len + 4), line)
- print(" ", cmd, " " * (max_command_len - len(cmd) + 2), help_message, sep="")
-
- print(_PROG, "-- MicroPython remote control")
- print("See https://docs.micropython.org/en/latest/reference/mpremote.html")
-
- print("\nList of commands:")
- print_commands_help(_COMMANDS, 3)
-
- print("\nList of shortcuts:")
- print_commands_help(_command_expansions, 2)
-
-
-def print_version():
- from . import __version__
-
- print(f"{_PROG} {__version__}")
+ def ensure_friendly_repl(self):
+ self.ensure_connected()
+ if self.pyb.in_raw_repl:
+ self.pyb.exit_raw_repl()
def main():
@@ -528,106 +342,31 @@ def main():
prepare_command_expansions(config)
args = sys.argv[1:]
- pyb = None
- auto_soft_reset = True
- did_action = False
+ state = State()
try:
while args:
do_command_expansion(args)
cmd = args.pop(0)
try:
- need_raw_repl, is_action, num_args_min, _ = _COMMANDS[cmd]
+ num_args_min, _help, handler = _COMMANDS[cmd]
except KeyError:
- print(f"{_PROG}: '{cmd}' is not a command")
- return 1
+ raise CommandError(f"'{cmd}' is not a command")
if len(args) < num_args_min:
print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
return 1
- if cmd == "connect":
- if pyb is not None:
- do_disconnect(pyb)
- pyb = do_connect(args)
- if pyb is None:
- did_action = True
- continue
- elif cmd == "help":
- print_help()
- sys.exit(0)
- elif cmd == "version":
- print_version()
- sys.exit(0)
- elif cmd == "resume":
- auto_soft_reset = False
- continue
-
- # The following commands need a connection, and either a raw or friendly REPL.
-
- if pyb is None:
- pyb = do_connect(["auto"])
-
- if need_raw_repl:
- if not pyb.in_raw_repl:
- pyb.enter_raw_repl(soft_reset=auto_soft_reset)
- auto_soft_reset = False
- else:
- if pyb.in_raw_repl:
- pyb.exit_raw_repl()
- if is_action:
- did_action = True
-
- if cmd == "disconnect":
- do_disconnect(pyb)
- pyb = None
- auto_soft_reset = True
- elif cmd == "soft-reset":
- pyb.enter_raw_repl(soft_reset=True)
- auto_soft_reset = False
- elif cmd == "mount":
- unsafe_links = False
- if args[0] == "--unsafe-links" or args[0] == "-l":
- args.pop(0)
- unsafe_links = True
- path = args.pop(0)
- pyb.mount_local(path, unsafe_links=unsafe_links)
- print(f"Local directory {path} is mounted at /remote")
- elif cmd == "umount":
- pyb.umount_local()
- elif cmd in ("exec", "eval", "run"):
- follow = True
- if args[0] == "--no-follow":
- args.pop(0)
- follow = False
- if cmd == "exec":
- buf = args.pop(0)
- elif cmd == "eval":
- buf = "print(" + args.pop(0) + ")"
- else:
- filename = args.pop(0)
- try:
- with open(filename, "rb") as f:
- buf = f.read()
- except OSError:
- print(f"{_PROG}: could not read file '{filename}'")
- return 1
- ret = execbuffer(pyb, buf, follow)
- if ret:
- return ret
- elif cmd == "fs":
- do_filesystem(pyb, args)
- elif cmd == "edit":
- do_edit(pyb, args)
- elif cmd == "repl":
- do_repl(pyb, args)
-
- if not did_action:
- if pyb is None:
- pyb = do_connect(["auto"])
- if pyb.in_raw_repl:
- pyb.exit_raw_repl()
- do_repl(pyb, args)
+ handler(state, args)
+
+
+ # If no commands were "actions" then implicitly finish with the REPL.
+ if state.run_repl_on_completion():
+ do_repl(state, args)
+
+ return 0
+ except CommandError as e:
+ print(f"{_PROG}: {e}", file=sys.stderr)
+ return 1
finally:
- if pyb is not None:
- do_disconnect(pyb)
+ do_disconnect(state)
diff --git a/tools/mpremote/mpremote/repl.py b/tools/mpremote/mpremote/repl.py
new file mode 100644
index 000000000..f92d20ae7
--- /dev/null
+++ b/tools/mpremote/mpremote/repl.py
@@ -0,0 +1,101 @@
+from .console import Console, ConsolePosix
+
+from . import pyboardextended as pyboard
+
+
+def do_repl_main_loop(state, console_in, console_out_write, *, code_to_inject, file_to_inject):
+ while True:
+ console_in.waitchar(state.pyb.serial)
+ c = console_in.readchar()
+ if c:
+ if c == b"\x1d": # ctrl-], quit
+ break
+ elif c == b"\x04": # ctrl-D
+ # special handling needed for ctrl-D if filesystem is mounted
+ state.pyb.write_ctrl_d(console_out_write)
+ elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
+ state.pyb.serial.write(code_to_inject)
+ elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
+ console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
+ state.pyb.enter_raw_repl(soft_reset=False)
+ with open(file_to_inject, "rb") as f:
+ pyfile = f.read()
+ try:
+ state.pyb.exec_raw_no_follow(pyfile)
+ except pyboard.PyboardError as er:
+ console_out_write(b"Error:\r\n")
+ console_out_write(er)
+ state.pyb.exit_raw_repl()
+ else:
+ state.pyb.serial.write(c)
+
+ try:
+ n = state.pyb.serial.inWaiting()
+ except OSError as er:
+ if er.args[0] == 5: # IO error, device disappeared
+ print("device disconnected")
+ break
+
+ if n > 0:
+ c = state.pyb.serial.read(1)
+ if c is not None:
+ # pass character through to the console
+ oc = ord(c)
+ if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
+ console_out_write(c)
+ else:
+ console_out_write(b"[%02x]" % ord(c))
+
+
+def do_repl(state, args):
+ state.ensure_friendly_repl()
+ state.did_action()
+
+ capture_file = None
+ code_to_inject = None
+ file_to_inject = None
+
+ while len(args):
+ if args[0] == "--capture":
+ args.pop(0)
+ capture_file = args.pop(0)
+ elif args[0] == "--inject-code":
+ args.pop(0)
+ code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
+ elif args[0] == "--inject-file":
+ args.pop(0)
+ file_to_inject = args.pop(0)
+ else:
+ break
+
+ print("Connected to MicroPython at %s" % state.pyb.device_name)
+ print("Use Ctrl-] to exit this shell")
+ if capture_file is not None:
+ print('Capturing session to file "%s"' % capture_file)
+ capture_file = open(capture_file, "wb")
+ if code_to_inject is not None:
+ print("Use Ctrl-J to inject", code_to_inject)
+ if file_to_inject is not None:
+ print('Use Ctrl-K to inject file "%s"' % file_to_inject)
+
+ console = Console()
+ console.enter()
+
+ def console_out_write(b):
+ console.write(b)
+ if capture_file is not None:
+ capture_file.write(b)
+ capture_file.flush()
+
+ try:
+ do_repl_main_loop(
+ state,
+ console,
+ console_out_write,
+ code_to_inject=code_to_inject,
+ file_to_inject=file_to_inject,
+ )
+ finally:
+ console.exit()
+ if capture_file is not None:
+ capture_file.close()