summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/reference/mpremote.rst24
-rw-r--r--tools/mpremote/mpremote/commands.py183
-rw-r--r--tools/mpremote/mpremote/main.py31
-rw-r--r--tools/mpremote/mpremote/romfs.py148
4 files changed, 385 insertions, 1 deletions
diff --git a/docs/reference/mpremote.rst b/docs/reference/mpremote.rst
index d4c698359..ef23cd85c 100644
--- a/docs/reference/mpremote.rst
+++ b/docs/reference/mpremote.rst
@@ -78,6 +78,7 @@ The full list of supported commands are:
- `mip <mpremote_command_mip>`
- `mount <mpremote_command_mount>`
- `unmount <mpremote_command_unmount>`
+- `romfs <mpremote_command_romfs>`
- `rtc <mpremote_command_rtc>`
- `sleep <mpremote_command_sleep>`
- `reset <mpremote_command_reset>`
@@ -347,6 +348,29 @@ The full list of supported commands are:
This happens automatically when ``mpremote`` terminates, but it can be used
in a sequence to unmount an earlier mount before subsequent command are run.
+.. _mpremote_command_romfs:
+
+- **romfs** -- manage ROMFS partitions on the device:
+
+ .. code-block:: bash
+
+ $ mpremote romfs <sub-command>
+
+ ``<sub-command>`` may be:
+
+ - ``romfs query`` to list all the available ROMFS partitions and their size
+ - ``romfs [-o <output>] build <source>`` to create a ROMFS image from the given
+ source directory; the default output file is the source appended by ``.romfs``
+ - ``romfs [-p <partition>] deploy <source>`` to deploy a ROMFS image to the device;
+ will also create a temporary ROMFS image if the source is a directory
+
+ The ``build`` and ``deploy`` sub-commands both support the ``-m``/``--mpy`` option
+ to automatically compile ``.py`` files to ``.mpy`` when creating the ROMFS image.
+ This option is enabled by default, but only works if the ``mpy_cross`` Python
+ package has been installed (eg via ``pip install mpy_cross``). If the package is
+ not installed then a warning is printed and ``.py`` files remain as is. Compiling
+ of ``.py`` files can be disabled with the ``--no-mpy`` option.
+
.. _mpremote_command_rtc:
- **rtc** -- set/get the device clock (RTC):
diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py
index f86befd08..7dd448c8b 100644
--- a/tools/mpremote/mpremote/commands.py
+++ b/tools/mpremote/mpremote/commands.py
@@ -1,12 +1,15 @@
+import binascii
import hashlib
import os
import sys
import tempfile
+import zlib
import serial.tools.list_ports
-from .transport import TransportError, stdout_write_bytes
+from .transport import TransportError, TransportExecError, stdout_write_bytes
from .transport_serial import SerialTransport
+from .romfs import make_romfs
class CommandError(Exception):
@@ -478,3 +481,181 @@ def do_rtc(state, args):
state.transport.exec("machine.RTC().datetime({})".format(timetuple))
else:
print(state.transport.eval("machine.RTC().datetime()"))
+
+
+def _do_romfs_query(state, args):
+ state.ensure_raw_repl()
+ state.did_action()
+
+ # Detect the romfs and get its associated device.
+ state.transport.exec("import vfs")
+ if not state.transport.eval("hasattr(vfs,'rom_ioctl')"):
+ print("ROMFS is not enabled on this device")
+ return
+ num_rom_partitions = state.transport.eval("vfs.rom_ioctl(1)")
+ if num_rom_partitions <= 0:
+ print("No ROMFS partitions available")
+ return
+
+ for rom_id in range(num_rom_partitions):
+ state.transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})")
+ has_object = state.transport.eval("hasattr(dev,'ioctl')")
+ if has_object:
+ rom_block_count = state.transport.eval("dev.ioctl(4,0)")
+ rom_block_size = state.transport.eval("dev.ioctl(5,0)")
+ rom_size = rom_block_count * rom_block_size
+ print(
+ f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)"
+ )
+ else:
+ rom_size = state.transport.eval("len(dev)")
+ print(f"ROMFS{rom_id} partition has size {rom_size} bytes")
+ romfs = state.transport.eval("bytes(memoryview(dev)[:12])")
+ print(f" Raw contents: {romfs.hex(':')} ...")
+ if not romfs.startswith(b"\xd2\xcd\x31"):
+ print(" Not a valid ROMFS")
+ else:
+ size = 0
+ for value in romfs[3:]:
+ size = (size << 7) | (value & 0x7F)
+ if not value & 0x80:
+ break
+ print(f" ROMFS image size: {size}")
+
+
+def _do_romfs_build(state, args):
+ state.did_action()
+
+ if args.path is None:
+ raise CommandError("romfs build: source path not given")
+
+ input_directory = args.path
+
+ if args.output is None:
+ output_file = input_directory + ".romfs"
+ else:
+ output_file = args.output
+
+ romfs = make_romfs(input_directory, mpy_cross=args.mpy)
+
+ print(f"Writing {len(romfs)} bytes to output file {output_file}")
+ with open(output_file, "wb") as f:
+ f.write(romfs)
+
+
+def _do_romfs_deploy(state, args):
+ state.ensure_raw_repl()
+ state.did_action()
+ transport = state.transport
+
+ if args.path is None:
+ raise CommandError("romfs deploy: source path not given")
+
+ rom_id = args.partition
+ romfs_filename = args.path
+
+ # Read in or create the ROMFS filesystem image.
+ if romfs_filename.endswith(".romfs"):
+ with open(romfs_filename, "rb") as f:
+ romfs = f.read()
+ else:
+ romfs = make_romfs(romfs_filename, mpy_cross=args.mpy)
+ print(f"Image size is {len(romfs)} bytes")
+
+ # Detect the ROMFS partition and get its associated device.
+ state.transport.exec("import vfs")
+ if not state.transport.eval("hasattr(vfs,'rom_ioctl')"):
+ raise CommandError("ROMFS is not enabled on this device")
+ transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})")
+ if transport.eval("isinstance(dev,int) and dev<0"):
+ raise CommandError(f"ROMFS{rom_id} partition not found on device")
+ has_object = transport.eval("hasattr(dev,'ioctl')")
+ if has_object:
+ rom_block_count = transport.eval("dev.ioctl(4,0)")
+ rom_block_size = transport.eval("dev.ioctl(5,0)")
+ rom_size = rom_block_count * rom_block_size
+ print(
+ f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)"
+ )
+ else:
+ rom_size = transport.eval("len(dev)")
+ print(f"ROMFS{rom_id} partition has size {rom_size} bytes")
+
+ # Check if ROMFS filesystem image will fit in the target partition.
+ if len(romfs) > rom_size:
+ print("ROMFS image is too big for the target partition")
+ sys.exit(1)
+
+ # Prepare ROMFS partition for writing.
+ print(f"Preparing ROMFS{rom_id} partition for writing")
+ transport.exec("import vfs\ntry:\n vfs.umount('/rom')\nexcept:\n pass")
+ chunk_size = 4096
+ if has_object:
+ for offset in range(0, len(romfs), rom_block_size):
+ transport.exec(f"dev.ioctl(6,{offset // rom_block_size})")
+ chunk_size = min(chunk_size, rom_block_size)
+ else:
+ rom_min_write = transport.eval(f"vfs.rom_ioctl(3,{rom_id},{len(romfs)})")
+ chunk_size = max(chunk_size, rom_min_write)
+
+ # Detect capabilities of the device to use the fastest method of transfer.
+ has_bytes_fromhex = transport.eval("hasattr(bytes,'fromhex')")
+ try:
+ transport.exec("from binascii import a2b_base64")
+ has_a2b_base64 = True
+ except TransportExecError:
+ has_a2b_base64 = False
+ try:
+ transport.exec("from io import BytesIO")
+ transport.exec("from deflate import DeflateIO,RAW")
+ has_deflate_io = True
+ except TransportExecError:
+ has_deflate_io = False
+
+ # Deploy the ROMFS filesystem image to the device.
+ for offset in range(0, len(romfs), chunk_size):
+ romfs_chunk = romfs[offset : offset + chunk_size]
+ romfs_chunk += bytes(chunk_size - len(romfs_chunk))
+ if has_deflate_io:
+ # Needs: binascii.a2b_base64, io.BytesIO, deflate.DeflateIO.
+ romfs_chunk_compressed = zlib.compress(romfs_chunk, wbits=-9)
+ buf = binascii.b2a_base64(romfs_chunk_compressed).strip()
+ transport.exec(f"buf=DeflateIO(BytesIO(a2b_base64({buf})),RAW,9).read()")
+ elif has_a2b_base64:
+ # Needs: binascii.a2b_base64.
+ buf = binascii.b2a_base64(romfs_chunk)
+ transport.exec(f"buf=a2b_base64({buf})")
+ elif has_bytes_fromhex:
+ # Needs: bytes.fromhex.
+ buf = romfs_chunk.hex()
+ transport.exec(f"buf=bytes.fromhex('{buf}')")
+ else:
+ # Needs nothing special.
+ transport.exec("buf=" + repr(romfs_chunk))
+ print(f"\rWriting at offset {offset}", end="")
+ if has_object:
+ transport.exec(
+ f"dev.writeblocks({offset // rom_block_size},buf,{offset % rom_block_size})"
+ )
+ else:
+ transport.exec(f"vfs.rom_ioctl(4,{rom_id},{offset},buf)")
+
+ # Complete writing.
+ if not has_object:
+ transport.eval(f"vfs.rom_ioctl(5,{rom_id})")
+
+ print()
+ print("ROMFS image deployed")
+
+
+def do_romfs(state, args):
+ if args.command[0] == "query":
+ _do_romfs_query(state, args)
+ elif args.command[0] == "build":
+ _do_romfs_build(state, args)
+ elif args.command[0] == "deploy":
+ _do_romfs_deploy(state, args)
+ else:
+ raise CommandError(
+ f"romfs: '{args.command[0]}' is not a command; pass romfs --help for a list"
+ )
diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py
index e6e397020..bdae8f163 100644
--- a/tools/mpremote/mpremote/main.py
+++ b/tools/mpremote/mpremote/main.py
@@ -36,6 +36,7 @@ from .commands import (
do_resume,
do_rtc,
do_soft_reset,
+ do_romfs,
)
from .mip import do_mip
from .repl import do_repl
@@ -228,6 +229,32 @@ def argparse_mip():
return cmd_parser
+def argparse_romfs():
+ cmd_parser = argparse.ArgumentParser(description="manage ROM partitions")
+ _bool_flag(
+ cmd_parser,
+ "mpy",
+ "m",
+ True,
+ "automatically compile .py files to .mpy when building the ROMFS image (default)",
+ )
+ cmd_parser.add_argument(
+ "--partition",
+ "-p",
+ type=int,
+ default=0,
+ help="ROMFS partition to use",
+ )
+ cmd_parser.add_argument(
+ "--output",
+ "-o",
+ help="output file",
+ )
+ cmd_parser.add_argument("command", nargs=1, help="romfs command, one of: query, build, deploy")
+ cmd_parser.add_argument("path", nargs="?", help="path to directory to deploy")
+ return cmd_parser
+
+
def argparse_none(description):
return lambda: argparse.ArgumentParser(description=description)
@@ -302,6 +329,10 @@ _COMMANDS = {
do_version,
argparse_none("print version and exit"),
),
+ "romfs": (
+ do_romfs,
+ argparse_romfs,
+ ),
}
# Additional commands aliases.
diff --git a/tools/mpremote/mpremote/romfs.py b/tools/mpremote/mpremote/romfs.py
new file mode 100644
index 000000000..ae781a36d
--- /dev/null
+++ b/tools/mpremote/mpremote/romfs.py
@@ -0,0 +1,148 @@
+# MIT license; Copyright (c) 2022 Damien P. George
+
+import struct, sys, os
+
+try:
+ from mpy_cross import run as mpy_cross_run
+except ImportError:
+ mpy_cross_run = None
+
+
+class VfsRomWriter:
+ ROMFS_HEADER = b"\xd2\xcd\x31"
+
+ ROMFS_RECORD_KIND_UNUSED = 0
+ ROMFS_RECORD_KIND_PADDING = 1
+ ROMFS_RECORD_KIND_DATA_VERBATIM = 2
+ ROMFS_RECORD_KIND_DATA_POINTER = 3
+ ROMFS_RECORD_KIND_DIRECTORY = 4
+ ROMFS_RECORD_KIND_FILE = 5
+
+ def __init__(self):
+ self._dir_stack = [(None, bytearray())]
+
+ def _encode_uint(self, value):
+ encoded = [value & 0x7F]
+ value >>= 7
+ while value != 0:
+ encoded.insert(0, 0x80 | (value & 0x7F))
+ value >>= 7
+ return bytes(encoded)
+
+ def _pack(self, kind, payload):
+ return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload
+
+ def _extend(self, data):
+ buf = self._dir_stack[-1][1]
+ buf.extend(data)
+ return len(buf)
+
+ def finalise(self):
+ _, data = self._dir_stack.pop()
+ encoded_kind = VfsRomWriter.ROMFS_HEADER
+ encoded_len = self._encode_uint(len(data))
+ if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1:
+ encoded_len = b"\x80" + encoded_len
+ data = encoded_kind + encoded_len + data
+ return data
+
+ def opendir(self, dirname):
+ self._dir_stack.append((dirname, bytearray()))
+
+ def closedir(self):
+ dirname, dirdata = self._dir_stack.pop()
+ dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata
+ self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata))
+
+ def mkdata(self, data):
+ assert len(self._dir_stack) == 1
+ return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len(
+ data
+ )
+
+ def mkfile(self, filename, filedata):
+ filename = bytes(filename, "ascii")
+ payload = self._encode_uint(len(filename))
+ payload += filename
+ if isinstance(filedata, tuple):
+ sub_payload = self._encode_uint(filedata[0])
+ sub_payload += self._encode_uint(filedata[1])
+ payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload)
+ else:
+ payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata)
+ self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload))
+
+
+def copy_recursively(vfs, src_dir, print_prefix, mpy_cross):
+ assert src_dir.endswith("/")
+ DIR = 1 << 14
+ mpy_cross_missed = 0
+ dir_contents = sorted(os.listdir(src_dir))
+ for name in dir_contents:
+ src_name = src_dir + name
+ st = os.stat(src_name)
+
+ if name == dir_contents[-1]:
+ # Last entry in the directory listing.
+ print_entry = "\\--"
+ print_recurse = " "
+ else:
+ # Not the last entry in the directory listing.
+ print_entry = "|--"
+ print_recurse = "| "
+
+ if st[0] & DIR:
+ # A directory, enter it and copy its contents recursively.
+ print(print_prefix + print_entry, name + "/")
+ vfs.opendir(name)
+ mpy_cross_missed += copy_recursively(
+ vfs, src_name + "/", print_prefix + print_recurse, mpy_cross
+ )
+ vfs.closedir()
+ else:
+ # A file.
+ did_mpy = False
+ name_extra = ""
+ if mpy_cross and name.endswith(".py"):
+ name_mpy = name[:-3] + ".mpy"
+ src_name_mpy = src_dir + name_mpy
+ if not os.path.isfile(src_name_mpy):
+ if mpy_cross_run is not None:
+ did_mpy = True
+ proc = mpy_cross_run(src_name)
+ proc.wait()
+ else:
+ mpy_cross_missed += 1
+ if did_mpy:
+ name_extra = " -> .mpy"
+ print(print_prefix + print_entry, name + name_extra)
+ if did_mpy:
+ name = name_mpy
+ src_name = src_name_mpy
+ with open(src_name, "rb") as src:
+ vfs.mkfile(name, src.read())
+ if did_mpy:
+ os.remove(src_name_mpy)
+ return mpy_cross_missed
+
+
+def make_romfs(src_dir, *, mpy_cross):
+ if not src_dir.endswith("/"):
+ src_dir += "/"
+
+ vfs = VfsRomWriter()
+
+ # Build the filesystem recursively.
+ print("Building romfs filesystem, source directory: {}".format(src_dir))
+ print("/")
+ try:
+ mpy_cross_missed = copy_recursively(vfs, src_dir, "", mpy_cross)
+ except OSError as er:
+ print("Error: OSError {}".format(er), file=sys.stderr)
+ sys.exit(1)
+
+ if mpy_cross_missed:
+ print("Warning: `mpy_cross` module not found, .py files were not precompiled")
+ mpy_cross = False
+
+ return vfs.finalise()