diff options
-rw-r--r-- | docs/reference/mpremote.rst | 24 | ||||
-rw-r--r-- | tools/mpremote/mpremote/commands.py | 183 | ||||
-rw-r--r-- | tools/mpremote/mpremote/main.py | 31 | ||||
-rw-r--r-- | tools/mpremote/mpremote/romfs.py | 148 |
4 files changed, 385 insertions, 1 deletions
diff --git a/docs/reference/mpremote.rst b/docs/reference/mpremote.rst index d4c698359..ef23cd85c 100644 --- a/docs/reference/mpremote.rst +++ b/docs/reference/mpremote.rst @@ -78,6 +78,7 @@ The full list of supported commands are: - `mip <mpremote_command_mip>` - `mount <mpremote_command_mount>` - `unmount <mpremote_command_unmount>` +- `romfs <mpremote_command_romfs>` - `rtc <mpremote_command_rtc>` - `sleep <mpremote_command_sleep>` - `reset <mpremote_command_reset>` @@ -347,6 +348,29 @@ The full list of supported commands are: This happens automatically when ``mpremote`` terminates, but it can be used in a sequence to unmount an earlier mount before subsequent command are run. +.. _mpremote_command_romfs: + +- **romfs** -- manage ROMFS partitions on the device: + + .. code-block:: bash + + $ mpremote romfs <sub-command> + + ``<sub-command>`` may be: + + - ``romfs query`` to list all the available ROMFS partitions and their size + - ``romfs [-o <output>] build <source>`` to create a ROMFS image from the given + source directory; the default output file is the source appended by ``.romfs`` + - ``romfs [-p <partition>] deploy <source>`` to deploy a ROMFS image to the device; + will also create a temporary ROMFS image if the source is a directory + + The ``build`` and ``deploy`` sub-commands both support the ``-m``/``--mpy`` option + to automatically compile ``.py`` files to ``.mpy`` when creating the ROMFS image. + This option is enabled by default, but only works if the ``mpy_cross`` Python + package has been installed (eg via ``pip install mpy_cross``). If the package is + not installed then a warning is printed and ``.py`` files remain as is. Compiling + of ``.py`` files can be disabled with the ``--no-mpy`` option. + .. _mpremote_command_rtc: - **rtc** -- set/get the device clock (RTC): diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py index f86befd08..7dd448c8b 100644 --- a/tools/mpremote/mpremote/commands.py +++ b/tools/mpremote/mpremote/commands.py @@ -1,12 +1,15 @@ +import binascii import hashlib import os import sys import tempfile +import zlib import serial.tools.list_ports -from .transport import TransportError, stdout_write_bytes +from .transport import TransportError, TransportExecError, stdout_write_bytes from .transport_serial import SerialTransport +from .romfs import make_romfs class CommandError(Exception): @@ -478,3 +481,181 @@ def do_rtc(state, args): state.transport.exec("machine.RTC().datetime({})".format(timetuple)) else: print(state.transport.eval("machine.RTC().datetime()")) + + +def _do_romfs_query(state, args): + state.ensure_raw_repl() + state.did_action() + + # Detect the romfs and get its associated device. + state.transport.exec("import vfs") + if not state.transport.eval("hasattr(vfs,'rom_ioctl')"): + print("ROMFS is not enabled on this device") + return + num_rom_partitions = state.transport.eval("vfs.rom_ioctl(1)") + if num_rom_partitions <= 0: + print("No ROMFS partitions available") + return + + for rom_id in range(num_rom_partitions): + state.transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})") + has_object = state.transport.eval("hasattr(dev,'ioctl')") + if has_object: + rom_block_count = state.transport.eval("dev.ioctl(4,0)") + rom_block_size = state.transport.eval("dev.ioctl(5,0)") + rom_size = rom_block_count * rom_block_size + print( + f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)" + ) + else: + rom_size = state.transport.eval("len(dev)") + print(f"ROMFS{rom_id} partition has size {rom_size} bytes") + romfs = state.transport.eval("bytes(memoryview(dev)[:12])") + print(f" Raw contents: {romfs.hex(':')} ...") + if not romfs.startswith(b"\xd2\xcd\x31"): + print(" Not a valid ROMFS") + else: + size = 0 + for value in romfs[3:]: + size = (size << 7) | (value & 0x7F) + if not value & 0x80: + break + print(f" ROMFS image size: {size}") + + +def _do_romfs_build(state, args): + state.did_action() + + if args.path is None: + raise CommandError("romfs build: source path not given") + + input_directory = args.path + + if args.output is None: + output_file = input_directory + ".romfs" + else: + output_file = args.output + + romfs = make_romfs(input_directory, mpy_cross=args.mpy) + + print(f"Writing {len(romfs)} bytes to output file {output_file}") + with open(output_file, "wb") as f: + f.write(romfs) + + +def _do_romfs_deploy(state, args): + state.ensure_raw_repl() + state.did_action() + transport = state.transport + + if args.path is None: + raise CommandError("romfs deploy: source path not given") + + rom_id = args.partition + romfs_filename = args.path + + # Read in or create the ROMFS filesystem image. + if romfs_filename.endswith(".romfs"): + with open(romfs_filename, "rb") as f: + romfs = f.read() + else: + romfs = make_romfs(romfs_filename, mpy_cross=args.mpy) + print(f"Image size is {len(romfs)} bytes") + + # Detect the ROMFS partition and get its associated device. + state.transport.exec("import vfs") + if not state.transport.eval("hasattr(vfs,'rom_ioctl')"): + raise CommandError("ROMFS is not enabled on this device") + transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})") + if transport.eval("isinstance(dev,int) and dev<0"): + raise CommandError(f"ROMFS{rom_id} partition not found on device") + has_object = transport.eval("hasattr(dev,'ioctl')") + if has_object: + rom_block_count = transport.eval("dev.ioctl(4,0)") + rom_block_size = transport.eval("dev.ioctl(5,0)") + rom_size = rom_block_count * rom_block_size + print( + f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)" + ) + else: + rom_size = transport.eval("len(dev)") + print(f"ROMFS{rom_id} partition has size {rom_size} bytes") + + # Check if ROMFS filesystem image will fit in the target partition. + if len(romfs) > rom_size: + print("ROMFS image is too big for the target partition") + sys.exit(1) + + # Prepare ROMFS partition for writing. + print(f"Preparing ROMFS{rom_id} partition for writing") + transport.exec("import vfs\ntry:\n vfs.umount('/rom')\nexcept:\n pass") + chunk_size = 4096 + if has_object: + for offset in range(0, len(romfs), rom_block_size): + transport.exec(f"dev.ioctl(6,{offset // rom_block_size})") + chunk_size = min(chunk_size, rom_block_size) + else: + rom_min_write = transport.eval(f"vfs.rom_ioctl(3,{rom_id},{len(romfs)})") + chunk_size = max(chunk_size, rom_min_write) + + # Detect capabilities of the device to use the fastest method of transfer. + has_bytes_fromhex = transport.eval("hasattr(bytes,'fromhex')") + try: + transport.exec("from binascii import a2b_base64") + has_a2b_base64 = True + except TransportExecError: + has_a2b_base64 = False + try: + transport.exec("from io import BytesIO") + transport.exec("from deflate import DeflateIO,RAW") + has_deflate_io = True + except TransportExecError: + has_deflate_io = False + + # Deploy the ROMFS filesystem image to the device. + for offset in range(0, len(romfs), chunk_size): + romfs_chunk = romfs[offset : offset + chunk_size] + romfs_chunk += bytes(chunk_size - len(romfs_chunk)) + if has_deflate_io: + # Needs: binascii.a2b_base64, io.BytesIO, deflate.DeflateIO. + romfs_chunk_compressed = zlib.compress(romfs_chunk, wbits=-9) + buf = binascii.b2a_base64(romfs_chunk_compressed).strip() + transport.exec(f"buf=DeflateIO(BytesIO(a2b_base64({buf})),RAW,9).read()") + elif has_a2b_base64: + # Needs: binascii.a2b_base64. + buf = binascii.b2a_base64(romfs_chunk) + transport.exec(f"buf=a2b_base64({buf})") + elif has_bytes_fromhex: + # Needs: bytes.fromhex. + buf = romfs_chunk.hex() + transport.exec(f"buf=bytes.fromhex('{buf}')") + else: + # Needs nothing special. + transport.exec("buf=" + repr(romfs_chunk)) + print(f"\rWriting at offset {offset}", end="") + if has_object: + transport.exec( + f"dev.writeblocks({offset // rom_block_size},buf,{offset % rom_block_size})" + ) + else: + transport.exec(f"vfs.rom_ioctl(4,{rom_id},{offset},buf)") + + # Complete writing. + if not has_object: + transport.eval(f"vfs.rom_ioctl(5,{rom_id})") + + print() + print("ROMFS image deployed") + + +def do_romfs(state, args): + if args.command[0] == "query": + _do_romfs_query(state, args) + elif args.command[0] == "build": + _do_romfs_build(state, args) + elif args.command[0] == "deploy": + _do_romfs_deploy(state, args) + else: + raise CommandError( + f"romfs: '{args.command[0]}' is not a command; pass romfs --help for a list" + ) diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py index e6e397020..bdae8f163 100644 --- a/tools/mpremote/mpremote/main.py +++ b/tools/mpremote/mpremote/main.py @@ -36,6 +36,7 @@ from .commands import ( do_resume, do_rtc, do_soft_reset, + do_romfs, ) from .mip import do_mip from .repl import do_repl @@ -228,6 +229,32 @@ def argparse_mip(): return cmd_parser +def argparse_romfs(): + cmd_parser = argparse.ArgumentParser(description="manage ROM partitions") + _bool_flag( + cmd_parser, + "mpy", + "m", + True, + "automatically compile .py files to .mpy when building the ROMFS image (default)", + ) + cmd_parser.add_argument( + "--partition", + "-p", + type=int, + default=0, + help="ROMFS partition to use", + ) + cmd_parser.add_argument( + "--output", + "-o", + help="output file", + ) + cmd_parser.add_argument("command", nargs=1, help="romfs command, one of: query, build, deploy") + cmd_parser.add_argument("path", nargs="?", help="path to directory to deploy") + return cmd_parser + + def argparse_none(description): return lambda: argparse.ArgumentParser(description=description) @@ -302,6 +329,10 @@ _COMMANDS = { do_version, argparse_none("print version and exit"), ), + "romfs": ( + do_romfs, + argparse_romfs, + ), } # Additional commands aliases. diff --git a/tools/mpremote/mpremote/romfs.py b/tools/mpremote/mpremote/romfs.py new file mode 100644 index 000000000..ae781a36d --- /dev/null +++ b/tools/mpremote/mpremote/romfs.py @@ -0,0 +1,148 @@ +# MIT license; Copyright (c) 2022 Damien P. George + +import struct, sys, os + +try: + from mpy_cross import run as mpy_cross_run +except ImportError: + mpy_cross_run = None + + +class VfsRomWriter: + ROMFS_HEADER = b"\xd2\xcd\x31" + + ROMFS_RECORD_KIND_UNUSED = 0 + ROMFS_RECORD_KIND_PADDING = 1 + ROMFS_RECORD_KIND_DATA_VERBATIM = 2 + ROMFS_RECORD_KIND_DATA_POINTER = 3 + ROMFS_RECORD_KIND_DIRECTORY = 4 + ROMFS_RECORD_KIND_FILE = 5 + + def __init__(self): + self._dir_stack = [(None, bytearray())] + + def _encode_uint(self, value): + encoded = [value & 0x7F] + value >>= 7 + while value != 0: + encoded.insert(0, 0x80 | (value & 0x7F)) + value >>= 7 + return bytes(encoded) + + def _pack(self, kind, payload): + return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload + + def _extend(self, data): + buf = self._dir_stack[-1][1] + buf.extend(data) + return len(buf) + + def finalise(self): + _, data = self._dir_stack.pop() + encoded_kind = VfsRomWriter.ROMFS_HEADER + encoded_len = self._encode_uint(len(data)) + if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1: + encoded_len = b"\x80" + encoded_len + data = encoded_kind + encoded_len + data + return data + + def opendir(self, dirname): + self._dir_stack.append((dirname, bytearray())) + + def closedir(self): + dirname, dirdata = self._dir_stack.pop() + dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata + self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata)) + + def mkdata(self, data): + assert len(self._dir_stack) == 1 + return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len( + data + ) + + def mkfile(self, filename, filedata): + filename = bytes(filename, "ascii") + payload = self._encode_uint(len(filename)) + payload += filename + if isinstance(filedata, tuple): + sub_payload = self._encode_uint(filedata[0]) + sub_payload += self._encode_uint(filedata[1]) + payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload) + else: + payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata) + self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload)) + + +def copy_recursively(vfs, src_dir, print_prefix, mpy_cross): + assert src_dir.endswith("/") + DIR = 1 << 14 + mpy_cross_missed = 0 + dir_contents = sorted(os.listdir(src_dir)) + for name in dir_contents: + src_name = src_dir + name + st = os.stat(src_name) + + if name == dir_contents[-1]: + # Last entry in the directory listing. + print_entry = "\\--" + print_recurse = " " + else: + # Not the last entry in the directory listing. + print_entry = "|--" + print_recurse = "| " + + if st[0] & DIR: + # A directory, enter it and copy its contents recursively. + print(print_prefix + print_entry, name + "/") + vfs.opendir(name) + mpy_cross_missed += copy_recursively( + vfs, src_name + "/", print_prefix + print_recurse, mpy_cross + ) + vfs.closedir() + else: + # A file. + did_mpy = False + name_extra = "" + if mpy_cross and name.endswith(".py"): + name_mpy = name[:-3] + ".mpy" + src_name_mpy = src_dir + name_mpy + if not os.path.isfile(src_name_mpy): + if mpy_cross_run is not None: + did_mpy = True + proc = mpy_cross_run(src_name) + proc.wait() + else: + mpy_cross_missed += 1 + if did_mpy: + name_extra = " -> .mpy" + print(print_prefix + print_entry, name + name_extra) + if did_mpy: + name = name_mpy + src_name = src_name_mpy + with open(src_name, "rb") as src: + vfs.mkfile(name, src.read()) + if did_mpy: + os.remove(src_name_mpy) + return mpy_cross_missed + + +def make_romfs(src_dir, *, mpy_cross): + if not src_dir.endswith("/"): + src_dir += "/" + + vfs = VfsRomWriter() + + # Build the filesystem recursively. + print("Building romfs filesystem, source directory: {}".format(src_dir)) + print("/") + try: + mpy_cross_missed = copy_recursively(vfs, src_dir, "", mpy_cross) + except OSError as er: + print("Error: OSError {}".format(er), file=sys.stderr) + sys.exit(1) + + if mpy_cross_missed: + print("Warning: `mpy_cross` module not found, .py files were not precompiled") + mpy_cross = False + + return vfs.finalise() |