summaryrefslogtreecommitdiff
path: root/tools/pyboard.py
diff options
context:
space:
mode:
authorJim Mussared <jim.mussared@gmail.com>2022-12-21 11:36:18 +1100
committerDamien George <damien@micropython.org>2023-01-13 16:38:34 +1100
commitaa64280666d40208ade0183503fe683fbb32c2ab (patch)
tree06ecd876a6443bbc7d48d8efea4e069b75c9dfad /tools/pyboard.py
parent6013d27dd59dd111041f72106a5a5eddfb02a792 (diff)
tools/pyboard.py: Add fs_{listdir,readfile,writefile,stat}.
These are for working with the filesystem when using pyboard.py as a library, rather than at the command line. - fs_listdir returns a list of tuples, in the same format as os.ilistdir(). - fs_readfile returns the contents of a file as a bytes object. - fs_writefile allows writing a bytes object to a file. - fs_stat returns an os.statresult. All raise FileNotFoundError (or OSError(ENOENT) on Python 2) if the file is not found (or PyboardError on other errors). Updated fs_cp and fs_get to use fs_stat to compute file size. This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Diffstat (limited to 'tools/pyboard.py')
-rwxr-xr-xtools/pyboard.py69
1 files changed, 66 insertions, 3 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py
index b45cc1919..d1abd2b78 100755
--- a/tools/pyboard.py
+++ b/tools/pyboard.py
@@ -73,6 +73,8 @@ import struct
import sys
import time
+from collections import namedtuple
+
try:
stdout = sys.stdout.buffer
except AttributeError:
@@ -87,7 +89,15 @@ def stdout_write_bytes(b):
class PyboardError(Exception):
- pass
+ def convert(self, info):
+ if len(self.args) >= 3:
+ if b"OSError" in self.args[2] and b"ENOENT" in self.args[2]:
+ return OSError(errno.ENOENT, info)
+
+ return self
+
+
+listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
class TelnetToSerial:
@@ -467,6 +477,7 @@ class Pyboard:
ret = ret.strip()
return ret
+ # In Python3, call as pyboard.exec(), see the setattr call below.
def exec_(self, command, data_consumer=None):
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err:
@@ -497,6 +508,34 @@ class Pyboard:
)
self.exec_(cmd, data_consumer=stdout_write_bytes)
+ def fs_listdir(self, src=""):
+ buf = bytearray()
+
+ def repr_consumer(b):
+ buf.extend(b.replace(b"\x04", b""))
+
+ cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
+ ("'%s'" % src) if src else ""
+ )
+ try:
+ buf.extend(b"[")
+ self.exec_(cmd, data_consumer=repr_consumer)
+ buf.extend(b"]")
+ except PyboardError as e:
+ raise e.convert(src)
+
+ return [
+ listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
+ for f in ast.literal_eval(buf.decode())
+ ]
+
+ def fs_stat(self, src):
+ try:
+ self.exec_("import uos")
+ return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
+ except PyboardError as e:
+ raise e.convert(src)
+
def fs_cat(self, src, chunk_size=256):
cmd = (
"with open('%s') as f:\n while 1:\n"
@@ -504,9 +543,33 @@ class Pyboard:
)
self.exec_(cmd, data_consumer=stdout_write_bytes)
+ def fs_readfile(self, src, chunk_size=256):
+ buf = bytearray()
+
+ def repr_consumer(b):
+ buf.extend(b.replace(b"\x04", b""))
+
+ cmd = (
+ "with open('%s', 'rb') as f:\n while 1:\n"
+ " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
+ )
+ try:
+ self.exec_(cmd, data_consumer=repr_consumer)
+ except PyboardError as e:
+ raise e.convert(src)
+ return ast.literal_eval(buf.decode())
+
+ def fs_writefile(self, dest, data, chunk_size=256):
+ self.exec_("f=open('%s','wb')\nw=f.write" % dest)
+ while data:
+ chunk = data[:chunk_size]
+ self.exec_("w(" + repr(chunk) + ")")
+ data = data[len(chunk) :]
+ self.exec_("f.close()")
+
def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
- src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src))
+ src_size = self.fs_stat(src).st_size
written = 0
self.exec_("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
while True:
@@ -520,7 +583,7 @@ class Pyboard:
def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
- src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src))
+ src_size = self.fs_stat(src).st_size
written = 0
self.exec_("f=open('%s','rb')\nr=f.read" % src)
with open(dest, "wb") as f: