summaryrefslogtreecommitdiff
path: root/tools/pydfu.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/pydfu.py')
-rwxr-xr-xtools/pydfu.py52
1 files changed, 42 insertions, 10 deletions
diff --git a/tools/pydfu.py b/tools/pydfu.py
index 030f56bf8..8b8e33898 100755
--- a/tools/pydfu.py
+++ b/tools/pydfu.py
@@ -22,6 +22,8 @@ import sys
import usb.core
import usb.util
import zlib
+import time
+import math
# VID/PID
__VID = 0x0483
@@ -29,6 +31,8 @@ __PID = 0xDF11
# USB request __TIMEOUT
__TIMEOUT = 4000
+__NEXT_TIMEOUT = 0
+__STATUS_TIMEOUT = 20000
# DFU commands
__DFU_DETACH = 0
@@ -95,6 +99,13 @@ else:
return usb.util.get_string(dev, index)
+def timeout():
+ global __NEXT_TIMEOUT
+ t = max(__TIMEOUT, __NEXT_TIMEOUT)
+ __NEXT_TIMEOUT = 0
+ return int(math.ceil(t))
+
+
def find_dfu_cfg_descr(descr):
if len(descr) == 9 and descr[0] == 9 and descr[1] == _DFU_DESCRIPTOR_TYPE:
nt = collections.namedtuple(
@@ -150,17 +161,32 @@ def init():
def abort_request():
"""Sends an abort request."""
- __dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, timeout())
def clr_status():
"""Clears any error status (perhaps left over from a previous session)."""
- __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, timeout())
def get_status():
"""Get the status of the last operation."""
- stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000)
+ _timeout = time.time() + max(__STATUS_TIMEOUT, timeout())
+ stat = None
+ while time.time() < _timeout:
+ try:
+ stat = __dev.ctrl_transfer(
+ 0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, int(_timeout - time.time())
+ )
+ break
+ except usb.core.USBError as ex:
+ # If the firmware is blocked the transfer can timeout much quicker than
+ # the supplied timeout. If so, retry until the overall timeout is used up.
+ if "Operation timed out" not in str(ex):
+ raise
+
+ if stat is None:
+ raise SystemExit("DFU: get_status timed out")
# firmware can provide an optional string for any error
if stat[5]:
@@ -168,6 +194,12 @@ def get_status():
if message:
print(message)
+ # firmware can send a longer timeout request while it's performing slow operation eg. erase
+ timeout_ms = stat[1] << 16 | stat[2] << 8 | stat[3]
+ if timeout_ms:
+ global __NEXT_TIMEOUT
+ __NEXT_TIMEOUT = __TIMEOUT + timeout_ms
+
return stat[4]
@@ -180,9 +212,9 @@ def check_status(stage, expected):
def mass_erase():
"""Performs a MASS erase (i.e. erases the entire device)."""
# Send DNLOAD with first byte=0x41
- __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", timeout())
- # Execute last command
+ # Execute erase and wait until complete
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
@@ -196,7 +228,7 @@ def page_erase(addr):
# Send DNLOAD with first byte=0x41 and page address
buf = struct.pack("<BI", 0x41, addr)
- __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, timeout())
# Execute last command
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
@@ -209,7 +241,7 @@ def set_address(addr):
"""Sets the address for the next operation."""
# Send DNLOAD with first byte=0x21 and page address
buf = struct.pack("<BI", 0x21, addr)
- __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, timeout())
# Execute last command
check_status("set address", __DFU_STATE_DFU_DOWNLOAD_BUSY)
@@ -243,7 +275,7 @@ def write_memory(addr, buf, progress=None, progress_addr=0, progress_size=0):
# Send DNLOAD with fw data
chunk = min(__cfg_descr.wTransferSize, xfer_total - xfer_bytes)
__dev.ctrl_transfer(
- 0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf[xfer_bytes : xfer_bytes + chunk], __TIMEOUT
+ 0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf[xfer_bytes : xfer_bytes + chunk], timeout()
)
# Execute last command
@@ -267,7 +299,7 @@ def write_page(buf, xfer_offset):
set_address(xfer_base + xfer_offset)
# Send DNLOAD with fw data
- __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, timeout())
# Execute last command
check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY)
@@ -285,7 +317,7 @@ def exit_dfu():
set_address(0x08000000)
# Send DNLOAD with 0 length to exit DFU
- __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, None, __TIMEOUT)
+ __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, None, timeout())
try:
# Execute last command