summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/mpremote/mpremote/commands.py68
-rw-r--r--tools/mpremote/mpremote/main.py12
-rw-r--r--tools/mpremote/mpremote/mip.py42
-rw-r--r--tools/mpremote/mpremote/repl.py24
-rw-r--r--tools/mpremote/mpremote/transport.py33
-rw-r--r--tools/mpremote/mpremote/transport_serial.py (renamed from tools/mpremote/mpremote/pyboardextended.py)711
-rw-r--r--tools/mpremote/pyproject.toml6
7 files changed, 708 insertions, 188 deletions
diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py
index 805fcd4fc..9e05aca37 100644
--- a/tools/mpremote/mpremote/commands.py
+++ b/tools/mpremote/mpremote/commands.py
@@ -4,7 +4,8 @@ import tempfile
import serial.tools.list_ports
-from . import pyboardextended as pyboard
+from .transport import TransportError
+from .transport_serial import SerialTransport
class CommandError(Exception):
@@ -36,28 +37,28 @@ def do_connect(state, args=None):
for p in sorted(serial.tools.list_ports.comports()):
if p.vid is not None and p.pid is not None:
try:
- state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
+ state.transport = SerialTransport(p.device, baudrate=115200)
return
- except pyboard.PyboardError as er:
+ except TransportError as er:
if not er.args[0].startswith("failed to access"):
raise er
- raise pyboard.PyboardError("no device found")
+ raise TransportError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
- state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
+ state.transport = SerialTransport(p.device, baudrate=115200)
return
- raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
+ raise TransportError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
- state.pyb = pyboard.PyboardExtended(dev, baudrate=115200)
+ state.transport = SerialTransport(dev, baudrate=115200)
return
- except pyboard.PyboardError as er:
+ except TransportError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
@@ -66,23 +67,23 @@ def do_connect(state, args=None):
def do_disconnect(state, _args=None):
- if not state.pyb:
+ if not state.transport:
return
try:
- if state.pyb.mounted:
- if not state.pyb.in_raw_repl:
- state.pyb.enter_raw_repl(soft_reset=False)
- state.pyb.umount_local()
- if state.pyb.in_raw_repl:
- state.pyb.exit_raw_repl()
+ if state.transport.mounted:
+ if not state.transport.in_raw_repl:
+ state.transport.enter_raw_repl(soft_reset=False)
+ state.transport.umount_local()
+ if state.transport.in_raw_repl:
+ state.transport.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connection if it had an error
# - umounting will fail if serial port disappeared
pass
- state.pyb.close()
- state.pyb = None
+ state.transport.close()
+ state.transport = None
state._auto_soft_reset = True
@@ -136,16 +137,17 @@ def do_filesystem(state, args):
raise CommandError("'cp -r' source files must be local")
_list_recursive(src_files, path)
known_dirs = {""}
- state.pyb.exec_("import uos")
+ state.transport.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
- state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
+ state.transport.exec_(
+ "try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
+ )
known_dirs.add(d)
- pyboard.filesystem_command(
- state.pyb,
+ state.transport.filesystem_command(
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
@@ -154,8 +156,8 @@ def do_filesystem(state, args):
if args.recursive:
raise CommandError("'-r' only supported for 'cp'")
try:
- pyboard.filesystem_command(
- state.pyb, [command] + paths, progress_callback=show_progress_bar, verbose=verbose
+ state.transport.filesystem_command(
+ [command] + paths, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
raise CommandError(er)
@@ -166,17 +168,17 @@ def do_edit(state, args):
state.did_action()
if not os.getenv("EDITOR"):
- raise pyboard.PyboardError("edit: $EDITOR not set")
+ raise TransportError("edit: $EDITOR not set")
for src in args.files:
src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
- state.pyb.fs_touch(src)
- state.pyb.fs_get(src, dest, progress_callback=show_progress_bar)
+ state.transport.fs_touch(src)
+ state.transport.fs_get(src, dest, progress_callback=show_progress_bar)
if os.system('%s "%s"' % (os.getenv("EDITOR"), dest)) == 0:
- state.pyb.fs_put(dest, src, progress_callback=show_progress_bar)
+ state.transport.fs_put(dest, src, progress_callback=show_progress_bar)
finally:
os.unlink(dest)
@@ -186,13 +188,15 @@ def _do_execbuffer(state, buf, follow):
state.did_action()
try:
- state.pyb.exec_raw_no_follow(buf)
+ state.transport.exec_raw_no_follow(buf)
if follow:
- ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
+ ret, ret_err = state.transport.follow(
+ timeout=None, data_consumer=pyboard.stdout_write_bytes
+ )
if ret_err:
pyboard.stdout_write_bytes(ret_err)
sys.exit(1)
- except pyboard.PyboardError as er:
+ except TransportError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
@@ -221,13 +225,13 @@ def do_run(state, args):
def do_mount(state, args):
state.ensure_raw_repl()
path = args.path[0]
- state.pyb.mount_local(path, unsafe_links=args.unsafe_links)
+ state.transport.mount_local(path, unsafe_links=args.unsafe_links)
print(f"Local directory {path} is mounted at /remote")
def do_umount(state, path):
state.ensure_raw_repl()
- state.pyb.umount_local()
+ state.transport.umount_local()
def do_resume(state, _args=None):
diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py
index cb96369a5..668926609 100644
--- a/tools/mpremote/mpremote/main.py
+++ b/tools/mpremote/mpremote/main.py
@@ -449,7 +449,7 @@ def do_command_expansion(args):
class State:
def __init__(self):
- self.pyb = None
+ self.transport = None
self._did_action = False
self._auto_soft_reset = True
@@ -460,20 +460,20 @@ class State:
return not self._did_action
def ensure_connected(self):
- if self.pyb is None:
+ if self.transport is None:
do_connect(self)
def ensure_raw_repl(self, soft_reset=None):
self.ensure_connected()
soft_reset = self._auto_soft_reset if soft_reset is None else soft_reset
- if soft_reset or not self.pyb.in_raw_repl:
- self.pyb.enter_raw_repl(soft_reset=soft_reset)
+ if soft_reset or not self.transport.in_raw_repl:
+ self.transport.enter_raw_repl(soft_reset=soft_reset)
self._auto_soft_reset = False
def ensure_friendly_repl(self):
self.ensure_connected()
- if self.pyb.in_raw_repl:
- self.pyb.exit_raw_repl()
+ if self.transport.in_raw_repl:
+ self.transport.exit_raw_repl()
def main():
diff --git a/tools/mpremote/mpremote/mip.py b/tools/mpremote/mpremote/mip.py
index 99ca9ff7e..f42c7a0b4 100644
--- a/tools/mpremote/mpremote/mip.py
+++ b/tools/mpremote/mpremote/mip.py
@@ -16,7 +16,7 @@ _CHUNK_SIZE = 128
# This implements os.makedirs(os.dirname(path))
-def _ensure_path_exists(pyb, path):
+def _ensure_path_exists(transport, path):
import os
split = path.split("/")
@@ -29,8 +29,8 @@ def _ensure_path_exists(pyb, path):
prefix = ""
for i in range(len(split) - 1):
prefix += split[i]
- if not pyb.fs_exists(prefix):
- pyb.fs_mkdir(prefix)
+ if not transport.fs_exists(prefix):
+ transport.fs_mkdir(prefix)
prefix += "/"
@@ -68,7 +68,7 @@ def _rewrite_url(url, branch=None):
return url
-def _download_file(pyb, url, dest):
+def _download_file(transport, url, dest):
try:
with urllib.request.urlopen(url) as src:
fd, path = tempfile.mkstemp()
@@ -76,8 +76,8 @@ def _download_file(pyb, url, dest):
print("Installing:", dest)
with os.fdopen(fd, "wb") as f:
_chunk(src, f.write, src.length)
- _ensure_path_exists(pyb, dest)
- pyb.fs_put(path, dest, progress_callback=show_progress_bar)
+ _ensure_path_exists(transport, dest)
+ transport.fs_put(path, dest, progress_callback=show_progress_bar)
finally:
os.unlink(path)
except urllib.error.HTTPError as e:
@@ -89,7 +89,7 @@ def _download_file(pyb, url, dest):
raise CommandError(f"{e.reason} requesting {url}")
-def _install_json(pyb, package_json_url, index, target, version, mpy):
+def _install_json(transport, package_json_url, index, target, version, mpy):
try:
with urllib.request.urlopen(_rewrite_url(package_json_url, version)) as response:
package_json = json.load(response)
@@ -103,15 +103,15 @@ def _install_json(pyb, package_json_url, index, target, version, mpy):
for target_path, short_hash in package_json.get("hashes", ()):
fs_target_path = target + "/" + target_path
file_url = f"{index}/file/{short_hash[:2]}/{short_hash}"
- _download_file(pyb, file_url, fs_target_path)
+ _download_file(transport, file_url, fs_target_path)
for target_path, url in package_json.get("urls", ()):
fs_target_path = target + "/" + target_path
- _download_file(pyb, _rewrite_url(url, version), fs_target_path)
+ _download_file(transport, _rewrite_url(url, version), fs_target_path)
for dep, dep_version in package_json.get("deps", ()):
- _install_package(pyb, dep, index, target, dep_version, mpy)
+ _install_package(transport, dep, index, target, dep_version, mpy)
-def _install_package(pyb, package, index, target, version, mpy):
+def _install_package(transport, package, index, target, version, mpy):
if (
package.startswith("http://")
or package.startswith("https://")
@@ -120,7 +120,7 @@ def _install_package(pyb, package, index, target, version, mpy):
if package.endswith(".py") or package.endswith(".mpy"):
print(f"Downloading {package} to {target}")
_download_file(
- pyb, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
+ transport, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
)
return
else:
@@ -136,14 +136,15 @@ def _install_package(pyb, package, index, target, version, mpy):
mpy_version = "py"
if mpy:
- pyb.exec("import sys")
+ transport.exec("import sys")
mpy_version = (
- int(pyb.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode()) or "py"
+ int(transport.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode())
+ or "py"
)
package = f"{index}/package/{mpy_version}/{package}/{version}.json"
- _install_json(pyb, package, index, target, version, mpy)
+ _install_json(transport, package, index, target, version, mpy)
def do_mip(state, args):
@@ -163,9 +164,9 @@ def do_mip(state, args):
args.index = _PACKAGE_INDEX
if args.target is None:
- state.pyb.exec("import sys")
+ state.transport.exec("import sys")
lib_paths = (
- state.pyb.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))")
+ state.transport.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))")
.decode()
.split("\n")
)
@@ -181,7 +182,12 @@ def do_mip(state, args):
try:
_install_package(
- state.pyb, package, args.index.rstrip("/"), args.target, version, args.mpy
+ state.transport,
+ package,
+ args.index.rstrip("/"),
+ args.target,
+ version,
+ args.mpy,
)
except CommandError:
print("Package may be partially installed")
diff --git a/tools/mpremote/mpremote/repl.py b/tools/mpremote/mpremote/repl.py
index 239e267d7..d24a7774a 100644
--- a/tools/mpremote/mpremote/repl.py
+++ b/tools/mpremote/mpremote/repl.py
@@ -1,45 +1,45 @@
from .console import Console, ConsolePosix
-from . import pyboardextended as pyboard
+from .transport import TransportError
def do_repl_main_loop(
state, console_in, console_out_write, *, escape_non_printable, code_to_inject, file_to_inject
):
while True:
- console_in.waitchar(state.pyb.serial)
+ console_in.waitchar(state.transport.serial)
c = console_in.readchar()
if c:
if c in (b"\x1d", b"\x18"): # ctrl-] or ctrl-x, quit
break
elif c == b"\x04": # ctrl-D
# special handling needed for ctrl-D if filesystem is mounted
- state.pyb.write_ctrl_d(console_out_write)
+ state.transport.write_ctrl_d(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
- state.pyb.serial.write(code_to_inject)
+ state.transport.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
- state.pyb.enter_raw_repl(soft_reset=False)
+ state.transport.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
- state.pyb.exec_raw_no_follow(pyfile)
- except pyboard.PyboardError as er:
+ state.transport.exec_raw_no_follow(pyfile)
+ except TransportError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
- state.pyb.exit_raw_repl()
+ state.transport.exit_raw_repl()
else:
- state.pyb.serial.write(c)
+ state.transport.serial.write(c)
try:
- n = state.pyb.serial.inWaiting()
+ n = state.transport.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
- dev_data_in = state.pyb.serial.read(n)
+ dev_data_in = state.transport.serial.read(n)
if dev_data_in is not None:
if escape_non_printable:
# Pass data through to the console, with escaping of non-printables.
@@ -63,7 +63,7 @@ def do_repl(state, args):
code_to_inject = args.inject_code
file_to_inject = args.inject_file
- print("Connected to MicroPython at %s" % state.pyb.device_name)
+ print("Connected to MicroPython at %s" % state.transport.device_name)
print("Use Ctrl-] or Ctrl-x to exit this shell")
if escape_non_printable:
print("Escaping non-printable bytes/characters by printing their hex code")
diff --git a/tools/mpremote/mpremote/transport.py b/tools/mpremote/mpremote/transport.py
new file mode 100644
index 000000000..6e9a77b2b
--- /dev/null
+++ b/tools/mpremote/mpremote/transport.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+#
+# This file is part of the MicroPython project, http://micropython.org/
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2023 Jim Mussared
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+class TransportError(Exception):
+ pass
+
+
+class Transport:
+ pass
diff --git a/tools/mpremote/mpremote/pyboardextended.py b/tools/mpremote/mpremote/transport_serial.py
index b9dbd1e3b..84822fe69 100644
--- a/tools/mpremote/mpremote/pyboardextended.py
+++ b/tools/mpremote/mpremote/transport_serial.py
@@ -1,14 +1,604 @@
-import io, os, re, struct, time
+#!/usr/bin/env python3
+#
+# This file is part of the MicroPython project, http://micropython.org/
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2014-2021 Damien P. George
+# Copyright (c) 2017 Paul Sokolovsky
+# Copyright (c) 2023 Jim Mussared
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+# This is based on the serial-only parts of tools/pyboard.py, with Python 2
+# support removed, and is currently in the process of being refactored to
+# support multiple transports (webrepl, socket, BLE, etc). At the moment,
+# SerialTransport is just the old Pyboard+PyboardExtended class without any
+# of this refactoring. The API is going to change significantly.
+
+# Once the API is stabilised, the idea is that mpremote can be used both
+# as a command line tool and a library for interacting with devices.
+
+import ast, io, errno, os, re, struct, sys, time
+from collections import namedtuple
from errno import EPERM
from .console import VT_ENABLED
+from .transport import TransportError, Transport
-try:
- from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
-except ImportError:
- import sys
- sys.path.append(os.path.dirname(__file__) + "/../..")
- from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
+def stdout_write_bytes(b):
+ b = b.replace(b"\x04", b"")
+ sys.stdout.buffer.write(b)
+ sys.stdout.buffer.flush()
+
+
+listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
+
+
+def reraise_filesystem_error(e, info):
+ if len(e.args) >= 3:
+ if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
+ raise FileNotFoundError(info)
+ raise
+
+
+class SerialTransport(Transport):
+ def __init__(self, device, baudrate=115200, wait=0, exclusive=True):
+ self.in_raw_repl = False
+ self.use_raw_paste = True
+ self.device_name = device
+ self.mounted = False
+
+ import serial
+ import serial.tools.list_ports
+
+ # Set options, and exclusive if pyserial supports it
+ serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1}
+ if serial.__version__ >= "3.3":
+ serial_kwargs["exclusive"] = exclusive
+
+ delayed = False
+ for attempt in range(wait + 1):
+ try:
+ if os.name == "nt":
+ self.serial = serial.Serial(**serial_kwargs)
+ self.serial.port = device
+ portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore
+ if portinfo and portinfo[0].manufacturer != "Microsoft":
+ # ESP8266/ESP32 boards use RTS/CTS for flashing and boot mode selection.
+ # DTR False: to avoid using the reset button will hang the MCU in bootloader mode
+ # RTS False: to prevent pulses on rts on serial.close() that would POWERON_RESET an ESPxx
+ self.serial.dtr = False # DTR False = gpio0 High = Normal boot
+ self.serial.rts = False # RTS False = EN High = MCU enabled
+ self.serial.open()
+ else:
+ self.serial = serial.Serial(device, **serial_kwargs)
+ break
+ except OSError:
+ if wait == 0:
+ continue
+ if attempt == 0:
+ sys.stdout.write("Waiting {} seconds for pyboard ".format(wait))
+ delayed = True
+ time.sleep(1)
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ else:
+ if delayed:
+ print("")
+ raise TransportError("failed to access " + device)
+ if delayed:
+ print("")
+
+ def close(self):
+ self.serial.close()
+
+ def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
+ # if data_consumer is used then data is not accumulated and the ending must be 1 byte long
+ assert data_consumer is None or len(ending) == 1
+
+ data = self.serial.read(min_num_bytes)
+ if data_consumer:
+ data_consumer(data)
+ timeout_count = 0
+ while True:
+ if data.endswith(ending):
+ break
+ elif self.serial.inWaiting() > 0:
+ new_data = self.serial.read(1)
+ if data_consumer:
+ data_consumer(new_data)
+ data = new_data
+ else:
+ data = data + new_data
+ timeout_count = 0
+ else:
+ timeout_count += 1
+ if timeout is not None and timeout_count >= 100 * timeout:
+ break
+ time.sleep(0.01)
+ return data
+
+ def enter_raw_repl(self, soft_reset=True):
+ self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program
+
+ # flush input (without relying on serial.flushInput())
+ n = self.serial.inWaiting()
+ while n > 0:
+ self.serial.read(n)
+ n = self.serial.inWaiting()
+
+ self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL
+
+ if soft_reset:
+ data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>")
+ if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"):
+ print(data)
+ raise TransportError("could not enter raw repl")
+
+ self.serial.write(b"\x04") # ctrl-D: soft reset
+
+ # Waiting for "soft reboot" independently to "raw REPL" (done below)
+ # allows boot.py to print, which will show up after "soft reboot"
+ # and before "raw REPL".
+ data = self.read_until(1, b"soft reboot\r\n")
+ if not data.endswith(b"soft reboot\r\n"):
+ print(data)
+ raise TransportError("could not enter raw repl")
+
+ data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n")
+ if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"):
+ print(data)
+ raise TransportError("could not enter raw repl")
+
+ self.in_raw_repl = True
+
+ def exit_raw_repl(self):
+ self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL
+ self.in_raw_repl = False
+
+ def follow(self, timeout, data_consumer=None):
+ # wait for normal output
+ data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer)
+ if not data.endswith(b"\x04"):
+ raise TransportError("timeout waiting for first EOF reception")
+ data = data[:-1]
+
+ # wait for error output
+ data_err = self.read_until(1, b"\x04", timeout=timeout)
+ if not data_err.endswith(b"\x04"):
+ raise TransportError("timeout waiting for second EOF reception")
+ data_err = data_err[:-1]
+
+ # return normal and error output
+ return data, data_err
+
+ def raw_paste_write(self, command_bytes):
+ # Read initial header, with window size.
+ data = self.serial.read(2)
+ window_size = struct.unpack("<H", data)[0]
+ window_remain = window_size
+
+ # Write out the command_bytes data.
+ i = 0
+ while i < len(command_bytes):
+ while window_remain == 0 or self.serial.inWaiting():
+ data = self.serial.read(1)
+ if data == b"\x01":
+ # Device indicated that a new window of data can be sent.
+ window_remain += window_size
+ elif data == b"\x04":
+ # Device indicated abrupt end. Acknowledge it and finish.
+ self.serial.write(b"\x04")
+ return
+ else:
+ # Unexpected data from device.
+ raise TransportError("unexpected read during raw paste: {}".format(data))
+ # Send out as much data as possible that fits within the allowed window.
+ b = command_bytes[i : min(i + window_remain, len(command_bytes))]
+ self.serial.write(b)
+ window_remain -= len(b)
+ i += len(b)
+
+ # Indicate end of data.
+ self.serial.write(b"\x04")
+
+ # Wait for device to acknowledge end of data.
+ data = self.read_until(1, b"\x04")
+ if not data.endswith(b"\x04"):
+ raise TransportError("could not complete raw paste: {}".format(data))
+
+ def exec_raw_no_follow(self, command):
+ if isinstance(command, bytes):
+ command_bytes = command
+ else:
+ command_bytes = bytes(command, encoding="utf8")
+
+ # check we have a prompt
+ data = self.read_until(1, b">")
+ if not data.endswith(b">"):
+ raise TransportError("could not enter raw repl")
+
+ if self.use_raw_paste:
+ # Try to enter raw-paste mode.
+ self.serial.write(b"\x05A\x01")
+ data = self.serial.read(2)
+ if data == b"R\x00":
+ # Device understood raw-paste command but doesn't support it.
+ pass
+ elif data == b"R\x01":
+ # Device supports raw-paste mode, write out the command using this mode.
+ return self.raw_paste_write(command_bytes)
+ else:
+ # Device doesn't support raw-paste, fall back to normal raw REPL.
+ data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>")
+ if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"):
+ print(data)
+ raise TransportError("could not enter raw repl")
+ # Don't try to use raw-paste mode again for this connection.
+ self.use_raw_paste = False
+
+ # Write command using standard raw REPL, 256 bytes every 10ms.
+ for i in range(0, len(command_bytes), 256):
+ self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))])
+ time.sleep(0.01)
+ self.serial.write(b"\x04")
+
+ # check if we could exec command
+ data = self.serial.read(2)
+ if data != b"OK":
+ raise TransportError("could not exec command (response: %r)" % data)
+
+ def exec_raw(self, command, timeout=10, data_consumer=None):
+ self.exec_raw_no_follow(command)
+ return self.follow(timeout, data_consumer)
+
+ def eval(self, expression, parse=False):
+ if parse:
+ ret = self.exec("print(repr({}))".format(expression))
+ ret = ret.strip()
+ return ast.literal_eval(ret.decode())
+ else:
+ ret = self.exec("print({})".format(expression))
+ ret = ret.strip()
+ return ret
+
+ def exec(self, command, data_consumer=None):
+ ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
+ if ret_err:
+ raise TransportError("exception", ret, ret_err)
+ return ret
+
+ def execfile(self, filename):
+ with open(filename, "rb") as f:
+ pyfile = f.read()
+ return self.exec(pyfile)
+
+ def fs_exists(self, src):
+ try:
+ self.exec("import uos\nuos.stat(%s)" % (("'%s'" % src) if src else ""))
+ return True
+ except TransportError:
+ return False
+
+ def fs_ls(self, src):
+ cmd = (
+ "import uos\nfor f in uos.ilistdir(%s):\n"
+ " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
+ % (("'%s'" % src) if src else "")
+ )
+ self.exec(cmd, data_consumer=stdout_write_bytes)
+
+ def fs_listdir(self, src=""):
+ buf = bytearray()
+
+ def repr_consumer(b):
+ buf.extend(b.replace(b"\x04", b""))
+
+ cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
+ ("'%s'" % src) if src else ""
+ )
+ try:
+ buf.extend(b"[")
+ self.exec(cmd, data_consumer=repr_consumer)
+ buf.extend(b"]")
+ except TransportError as e:
+ reraise_filesystem_error(e, src)
+
+ return [
+ listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
+ for f in ast.literal_eval(buf.decode())
+ ]
+
+ def fs_stat(self, src):
+ try:
+ self.exec("import uos")
+ return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
+ except TransportError as e:
+ reraise_filesystem_error(e, src)
+
+ def fs_cat(self, src, chunk_size=256):
+ cmd = (
+ "with open('%s') as f:\n while 1:\n"
+ " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
+ )
+ self.exec(cmd, data_consumer=stdout_write_bytes)
+
+ def fs_readfile(self, src, chunk_size=256):
+ buf = bytearray()
+
+ def repr_consumer(b):
+ buf.extend(b.replace(b"\x04", b""))
+
+ cmd = (
+ "with open('%s', 'rb') as f:\n while 1:\n"
+ " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
+ )
+ try:
+ self.exec(cmd, data_consumer=repr_consumer)
+ except TransportError as e:
+ reraise_filesystem_error(e, src)
+ return ast.literal_eval(buf.decode())
+
+ def fs_writefile(self, dest, data, chunk_size=256):
+ self.exec("f=open('%s','wb')\nw=f.write" % dest)
+ while data:
+ chunk = data[:chunk_size]
+ self.exec("w(" + repr(chunk) + ")")
+ data = data[len(chunk) :]
+ self.exec("f.close()")
+
+ def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
+ if progress_callback:
+ src_size = self.fs_stat(src).st_size
+ written = 0
+ self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
+ while True:
+ data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size))
+ if not data_len:
+ break
+ if progress_callback:
+ written += data_len
+ progress_callback(written, src_size)
+ self.exec("fr.close()\nfw.close()")
+
+ def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
+ if progress_callback:
+ src_size = self.fs_stat(src).st_size
+ written = 0
+ self.exec("f=open('%s','rb')\nr=f.read" % src)
+ with open(dest, "wb") as f:
+ while True:
+ data = bytearray()
+ self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d))
+ assert data.endswith(b"\r\n\x04")
+ try:
+ data = ast.literal_eval(str(data[:-3], "ascii"))
+ if not isinstance(data, bytes):
+ raise ValueError("Not bytes")
+ except (UnicodeError, ValueError) as e:
+ raise TransportError("fs_get: Could not interpret received data: %s" % str(e))
+ if not data:
+ break
+ f.write(data)
+ if progress_callback:
+ written += len(data)
+ progress_callback(written, src_size)
+ self.exec("f.close()")
+
+ def fs_put(self, src, dest, chunk_size=256, progress_callback=None):
+ if progress_callback:
+ src_size = os.path.getsize(src)
+ written = 0
+ self.exec("f=open('%s','wb')\nw=f.write" % dest)
+ with open(src, "rb") as f:
+ while True:
+ data = f.read(chunk_size)
+ if not data:
+ break
+ if sys.version_info < (3,):
+ self.exec("w(b" + repr(data) + ")")
+ else:
+ self.exec("w(" + repr(data) + ")")
+ if progress_callback:
+ written += len(data)
+ progress_callback(written, src_size)
+ self.exec("f.close()")
+
+ def fs_mkdir(self, dir):
+ self.exec("import uos\nuos.mkdir('%s')" % dir)
+
+ def fs_rmdir(self, dir):
+ self.exec("import uos\nuos.rmdir('%s')" % dir)
+
+ def fs_rm(self, src):
+ self.exec("import uos\nuos.remove('%s')" % src)
+
+ def fs_touch(self, src):
+ self.exec("f=open('%s','a')\nf.close()" % src)
+
+ def filesystem_command(self, args, progress_callback=None, verbose=False):
+ def fname_remote(src):
+ if src.startswith(":"):
+ src = src[1:]
+ # Convert all path separators to "/", because that's what a remote device uses.
+ return src.replace(os.path.sep, "/")
+
+ def fname_cp_dest(src, dest):
+ _, src = os.path.split(src)
+ if dest is None or dest == "":
+ dest = src
+ elif dest == ".":
+ dest = "./" + src
+ elif dest.endswith("/"):
+ dest += src
+ return dest
+
+ cmd = args[0]
+ args = args[1:]
+ try:
+ if cmd == "cp":
+ srcs = args[:-1]
+ dest = args[-1]
+ if dest.startswith(":"):
+ op_remote_src = self.fs_cp
+ op_local_src = self.fs_put
+ else:
+ op_remote_src = self.fs_get
+ op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest)
+ for src in srcs:
+ if verbose:
+ print("cp %s %s" % (src, dest))
+ if src.startswith(":"):
+ op = op_remote_src
+ else:
+ op = op_local_src
+ src2 = fname_remote(src)
+ dest2 = fname_cp_dest(src2, fname_remote(dest))
+ op(src2, dest2, progress_callback=progress_callback)
+ else:
+ ops = {
+ "cat": self.fs_cat,
+ "ls": self.fs_ls,
+ "mkdir": self.fs_mkdir,
+ "rm": self.fs_rm,
+ "rmdir": self.fs_rmdir,
+ "touch": self.fs_touch,
+ }
+ if cmd not in ops:
+ raise TransportError("'{}' is not a filesystem command".format(cmd))
+ if cmd == "ls" and not args:
+ args = [""]
+ for src in args:
+ src = fname_remote(src)
+ if verbose:
+ print("%s :%s" % (cmd, src))
+ ops[cmd](src)
+ except TransportError as er:
+ if len(er.args) > 1:
+ print(str(er.args[2], "ascii"))
+ else:
+ print(er)
+ self.exit_raw_repl()
+ self.close()
+ sys.exit(1)
+
+ def mount_local(self, path, unsafe_links=False):
+ fout = self.serial
+ if self.eval('"RemoteFS" in globals()') == b"False":
+ self.exec(fs_hook_code)
+ self.exec("__mount()")
+ self.mounted = True
+ self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links)
+ self.serial = SerialIntercept(self.serial, self.cmd)
+
+ def write_ctrl_d(self, out_callback):
+ self.serial.write(b"\x04")
+ if not self.mounted:
+ return
+
+ # Read response from the device until it is quiet (with a timeout).
+ INITIAL_TIMEOUT = 0.5
+ BANNER_TIMEOUT = 2
+ QUIET_TIMEOUT = 0.1
+ FULL_TIMEOUT = 5
+ t_start = t_last_activity = time.monotonic()
+ data_all = b""
+ soft_reboot_started = False
+ soft_reboot_banner = False
+ while True:
+ t = time.monotonic()
+ n = self.serial.inWaiting()
+ if n > 0:
+ data = self.serial.read(n)
+ out_callback(data)
+ data_all += data
+ t_last_activity = t
+ else:
+ if len(data_all) == 0:
+ if t - t_start > INITIAL_TIMEOUT:
+ return
+ else:
+ if t - t_start > FULL_TIMEOUT:
+ if soft_reboot_started:
+ break
+ return
+
+ next_data_timeout = QUIET_TIMEOUT
+
+ if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1:
+ soft_reboot_started = True
+
+ if soft_reboot_started and not soft_reboot_banner:
+ # Once soft reboot has been initiated, give some more time for the startup
+ # banner to be shown
+ if data_all.find(b"\nMicroPython ") != -1:
+ soft_reboot_banner = True
+ elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1:
+ soft_reboot_banner = True
+ else:
+ next_data_timeout = BANNER_TIMEOUT
+
+ if t - t_last_activity > next_data_timeout:
+ break
+
+ if not soft_reboot_started:
+ return
+
+ if not soft_reboot_banner:
+ out_callback(b"Warning: Could not remount local filesystem\r\n")
+ return
+
+ # Determine type of prompt
+ if data_all.endswith(b">"):
+ in_friendly_repl = False
+ prompt = b">"
+ else:
+ in_friendly_repl = True
+ prompt = data_all.rsplit(b"\r\n", 1)[-1]
+
+ # Clear state while board remounts, it will be re-set once mounted.
+ self.mounted = False
+ self.serial = self.serial.orig_serial
+
+ # Provide a message about the remount.
+ out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8"))
+
+ # Enter raw REPL and re-mount the remote filesystem.
+ self.serial.write(b"\x01")
+ self.exec(fs_hook_code)
+ self.exec("__mount()")
+ self.mounted = True
+
+ # Exit raw REPL if needed, and wait for the friendly REPL prompt.
+ if in_friendly_repl:
+ self.exit_raw_repl()
+ self.read_until(len(prompt), prompt)
+ out_callback(prompt)
+ self.serial = SerialIntercept(self.serial, self.cmd)
+
+ def umount_local(self):
+ if self.mounted:
+ self.exec('uos.umount("/remote")')
+ self.mounted = False
+ self.serial = self.serial.orig_serial
+
fs_hook_cmds = {
"CMD_STAT": 1,
@@ -617,110 +1207,3 @@ class SerialIntercept:
def write(self, buf):
self.orig_serial.write(buf)
-
-
-class PyboardExtended(Pyboard):
- def __init__(self, dev, *args, **kwargs):
- super().__init__(dev, *args, **kwargs)
- self.device_name = dev
- self.mounted = False
-
- def mount_local(self, path, unsafe_links=False):
- fout = self.serial
- if self.eval('"RemoteFS" in globals()') == b"False":
- self.exec_(fs_hook_code)
- self.exec_("__mount()")
- self.mounted = True
- self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links)
- self.serial = SerialIntercept(self.serial, self.cmd)
-
- def write_ctrl_d(self, out_callback):
- self.serial.write(b"\x04")
- if not self.mounted:
- return
-
- # Read response from the device until it is quiet (with a timeout).
- INITIAL_TIMEOUT = 0.5
- BANNER_TIMEOUT = 2
- QUIET_TIMEOUT = 0.1
- FULL_TIMEOUT = 5
- t_start = t_last_activity = time.monotonic()
- data_all = b""
- soft_reboot_started = False
- soft_reboot_banner = False
- while True:
- t = time.monotonic()
- n = self.serial.inWaiting()
- if n > 0:
- data = self.serial.read(n)
- out_callback(data)
- data_all += data
- t_last_activity = t
- else:
- if len(data_all) == 0:
- if t - t_start > INITIAL_TIMEOUT:
- return
- else:
- if t - t_start > FULL_TIMEOUT:
- if soft_reboot_started:
- break
- return
-
- next_data_timeout = QUIET_TIMEOUT
-
- if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1:
- soft_reboot_started = True
-
- if soft_reboot_started and not soft_reboot_banner:
- # Once soft reboot has been initiated, give some more time for the startup
- # banner to be shown
- if data_all.find(b"\nMicroPython ") != -1:
- soft_reboot_banner = True
- elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1:
- soft_reboot_banner = True
- else:
- next_data_timeout = BANNER_TIMEOUT
-
- if t - t_last_activity > next_data_timeout:
- break
-
- if not soft_reboot_started:
- return
-
- if not soft_reboot_banner:
- out_callback(b"Warning: Could not remount local filesystem\r\n")
- return
-
- # Determine type of prompt
- if data_all.endswith(b">"):
- in_friendly_repl = False
- prompt = b">"
- else:
- in_friendly_repl = True
- prompt = data_all.rsplit(b"\r\n", 1)[-1]
-
- # Clear state while board remounts, it will be re-set once mounted.
- self.mounted = False
- self.serial = self.serial.orig_serial
-
- # Provide a message about the remount.
- out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8"))
-
- # Enter raw REPL and re-mount the remote filesystem.
- self.serial.write(b"\x01")
- self.exec_(fs_hook_code)
- self.exec_("__mount()")
- self.mounted = True
-
- # Exit raw REPL if needed, and wait for the friendly REPL prompt.
- if in_friendly_repl:
- self.exit_raw_repl()
- self.read_until(len(prompt), prompt)
- out_callback(prompt)
- self.serial = SerialIntercept(self.serial, self.cmd)
-
- def umount_local(self):
- if self.mounted:
- self.exec_('uos.umount("/remote")')
- self.mounted = False
- self.serial = self.serial.orig_serial
diff --git a/tools/mpremote/pyproject.toml b/tools/mpremote/pyproject.toml
index 1b6c2173d..b01385c3d 100644
--- a/tools/mpremote/pyproject.toml
+++ b/tools/mpremote/pyproject.toml
@@ -44,11 +44,5 @@ raw-options = { root = "../..", version_scheme = "post-release" }
[tool.hatch.build]
packages = ["mpremote"]
-# Also grab pyboard.py from /tools and add it to the package for both wheel and sdist.
-[tool.hatch.build.force-include]
-"../pyboard.py" = "mpremote/pyboard.py"
-
-# Workaround to allow `python -m build` to work.
[tool.hatch.build.targets.sdist.force-include]
-"../pyboard.py" = "mpremote/pyboard.py"
"requirements.txt" = "requirements.txt"