summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim Mussared <jim.mussared@gmail.com>2023-06-29 15:52:03 +1000
committerDamien George <damien@micropython.org>2023-07-21 19:32:42 +1000
commit8b315ef0d82a31991def597906f7f24bd39c6e23 (patch)
treef0f40f656a1fd2e54671e17a1635aa384c1a4580
parent3533924c36ae85ce6e8bf8598dd71cf16bbdb10b (diff)
tests/extmod: Add deflate.DeflateIO tests.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
-rw-r--r--tests/extmod/deflate_compress.py148
-rw-r--r--tests/extmod/deflate_compress.py.exp41
-rw-r--r--tests/extmod/deflate_decompress.py174
-rw-r--r--tests/extmod/deflate_decompress.py.exp80
-rw-r--r--tests/extmod/deflate_stream_error.py89
-rw-r--r--tests/extmod/deflate_stream_error.py.exp25
6 files changed, 557 insertions, 0 deletions
diff --git a/tests/extmod/deflate_compress.py b/tests/extmod/deflate_compress.py
new file mode 100644
index 000000000..612af663e
--- /dev/null
+++ b/tests/extmod/deflate_compress.py
@@ -0,0 +1,148 @@
+try:
+ # Check if deflate is available.
+ import deflate
+ import io
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Check if compression is enabled.
+if not hasattr(deflate.DeflateIO, "write"):
+ print("SKIP")
+ raise SystemExit
+
+# Simple compression & decompression.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+data = b"micropython"
+N = 10
+for i in range(N):
+ g.write(data)
+g.close()
+result_raw = b.getvalue()
+print(len(result_raw) < len(data) * N)
+b = io.BytesIO(result_raw)
+g = deflate.DeflateIO(b, deflate.RAW)
+print(g.read())
+
+# Same, but using a context manager.
+b = io.BytesIO()
+with deflate.DeflateIO(b, deflate.RAW) as g:
+ for i in range(N):
+ g.write(data)
+result_raw = b.getvalue()
+print(len(result_raw) < len(data) * N)
+b = io.BytesIO(result_raw)
+with deflate.DeflateIO(b, deflate.RAW) as g:
+ print(g.read())
+
+# Writing to a closed underlying stream.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+g.write(b"micropython")
+b.close()
+try:
+ g.write(b"micropython")
+except ValueError:
+ print("ValueError")
+
+# Writing to a closed DeflateIO.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+g.write(b"micropython")
+g.close()
+try:
+ g.write(b"micropython")
+except OSError:
+ print("OSError")
+
+
+def decompress(data, *args):
+ buf = io.BytesIO(data)
+ with deflate.DeflateIO(buf, *args) as g:
+ return g.read()
+
+
+def compress(data, *args):
+ b = io.BytesIO()
+ with deflate.DeflateIO(b, *args) as g:
+ g.write(data)
+ return b.getvalue()
+
+
+def compress_error(data, *args):
+ try:
+ compress(data, *args)
+ except OSError:
+ print("OSError")
+ except ValueError:
+ print("ValueError")
+
+
+# More test patterns.
+PATTERNS_RAW = (
+ (b"0", b"3\x00\x00"),
+ (b"a", b"K\x04\x00"),
+ (b"0" * 100, b"3\xa0\x03\x00\x00"),
+ (
+ bytes(range(64)),
+ b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
+ ),
+)
+for unpacked, packed in PATTERNS_RAW:
+ print(compress(unpacked) == packed)
+ print(compress(unpacked, deflate.RAW) == packed)
+
+# Verify header and checksum format.
+unpacked = b"hello"
+packed = b"\xcbH\xcd\xc9\xc9\x07\x00"
+
+
+def check_header(n, a, b):
+ if a == b:
+ print(n)
+ else:
+ print(n, a, b)
+
+
+check_header("RAW", compress(unpacked, deflate.RAW), packed)
+check_header(
+ "ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
+)
+check_header(
+ "ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
+)
+check_header(
+ "GZIP",
+ compress(unpacked, deflate.GZIP, 9),
+ b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
+)
+
+# Valid wbits values.
+compress_error(unpacked, deflate.RAW, -1)
+print(len(compress(unpacked, deflate.RAW, 0)))
+compress_error(unpacked, deflate.RAW, 1)
+compress_error(unpacked, deflate.RAW, 4)
+for i in range(5, 16):
+ print(len(compress(unpacked, deflate.RAW, i)))
+compress_error(unpacked, deflate.RAW, 16)
+
+# Invalid values for format.
+compress_error(unpacked, -1)
+compress_error(unpacked, 5)
+
+# Fill buf with a predictable pseudorandom sequence.
+buf = bytearray(1024)
+lfsr = 1 << 15 | 1
+for i in range(len(buf)):
+ bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
+ lfsr = (lfsr >> 1) | (bit << 15)
+ buf[i] = lfsr & 0xFF
+
+# Verify that compression improves as the window size increases.
+prev_len = len(buf)
+for wbits in range(5, 10):
+ result = compress(buf, deflate.RAW, wbits)
+ next_len = len(result)
+ print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
+ prev_len = next_len
diff --git a/tests/extmod/deflate_compress.py.exp b/tests/extmod/deflate_compress.py.exp
new file mode 100644
index 000000000..5da70f491
--- /dev/null
+++ b/tests/extmod/deflate_compress.py.exp
@@ -0,0 +1,41 @@
+True
+b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
+True
+b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
+ValueError
+OSError
+True
+True
+True
+True
+True
+True
+True
+True
+RAW
+ZLIB(9)
+ZLIB(15)
+GZIP
+ValueError
+7
+ValueError
+ValueError
+7
+7
+7
+7
+7
+7
+7
+7
+7
+7
+7
+ValueError
+ValueError
+ValueError
+False
+True
+True
+True
+True
diff --git a/tests/extmod/deflate_decompress.py b/tests/extmod/deflate_decompress.py
new file mode 100644
index 000000000..29d3ec2d7
--- /dev/null
+++ b/tests/extmod/deflate_decompress.py
@@ -0,0 +1,174 @@
+try:
+ # Check if deflate is available.
+ import deflate
+ import io
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# zlib.compress(b'micropython hello world hello world micropython', wbits=-9)
+data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00'
+# zlib.compress(b'micropython hello world hello world micropython', wbits=9)
+data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91'
+# zlib.compress(b'micropython hello world hello world micropython', wbits=25)
+data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00'
+
+# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5)
+data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00"
+# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6)
+data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00"
+# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8)
+data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00"
+# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10)
+data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00"
+
+
+def decompress(data, *args):
+ buf = io.BytesIO(data)
+ with deflate.DeflateIO(buf, *args) as g:
+ return g.read()
+
+
+def decompress_error(data, *args):
+ try:
+ decompress(data, *args)
+ except OSError:
+ print("OSError")
+ except EOFError:
+ print("EOFError")
+ except ValueError:
+ print("ValueError")
+
+
+# Basic handling of format and detection.
+print(decompress(data_raw, deflate.RAW))
+print(decompress(data_zlib, deflate.ZLIB))
+print(decompress(data_gzip, deflate.GZIP))
+print(decompress(data_zlib)) # detect zlib/gzip.
+print(decompress(data_gzip)) # detect zlib/gzip.
+
+decompress_error(data_raw) # cannot detect zlib/gzip from raw stream
+decompress_error(data_raw, deflate.ZLIB)
+decompress_error(data_raw, deflate.GZIP)
+decompress_error(data_zlib, deflate.RAW)
+decompress_error(data_zlib, deflate.GZIP)
+decompress_error(data_gzip, deflate.RAW)
+decompress_error(data_gzip, deflate.ZLIB)
+
+# Invalid data stream.
+decompress_error(b"abcef", deflate.RAW)
+
+# Invalid block type. final-block, block-type=3.
+decompress_error(b"\x07", deflate.RAW)
+
+# Truncated stream.
+decompress_error(data_raw[:10], deflate.RAW)
+
+# Partial reads.
+buf = io.BytesIO(data_zlib)
+with deflate.DeflateIO(buf) as g:
+ print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream.
+ print(g.read(1))
+ print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source
+ print(g.read(1))
+ print(buf.seek(0, 1))
+ print(g.read(2))
+ print(buf.seek(0, 1))
+ print(g.read())
+ print(buf.seek(0, 1))
+ print(g.read(1))
+ print(buf.seek(0, 1))
+ print(g.read())
+
+# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are
+# currently detected, see the end of uzlib_uncompress_chksum().
+decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00")
+decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00")
+decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB)
+decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP)
+
+# Reading from a closed underlying stream.
+b = io.BytesIO(data_raw)
+g = deflate.DeflateIO(b, deflate.RAW)
+g.read(4)
+b.close()
+try:
+ g.read(4)
+except ValueError:
+ print("ValueError")
+
+# Reading from a closed DeflateIO.
+b = io.BytesIO(data_raw)
+g = deflate.DeflateIO(b, deflate.RAW)
+g.read(4)
+g.close()
+try:
+ g.read(4)
+except OSError:
+ print("OSError")
+
+# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled.
+data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00"
+print(decompress(data_gzip_header_extra))
+
+# Test patterns.
+PATTERNS_ZLIB = [
+ # Packed results produced by CPy's zlib.compress()
+ (b"0", b"x\x9c3\x00\x00\x001\x001"),
+ (b"a", b"x\x9cK\x04\x00\x00b\x00b"),
+ (b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"),
+ (
+ bytes(range(64)),
+ b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1",
+ ),
+ (b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0
+ # adaptive/dynamic huffman tree
+ (
+ b"13371813150|13764518736|12345678901",
+ b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}<e\xb5g\x83\x0f\x89X\x07\xab",
+ ),
+ # dynamic Huffman tree with "case 17" (repeat code for 3-10 times)
+ (
+ b">I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D",
+ b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089",
+ ),
+]
+for unpacked, packed in PATTERNS_ZLIB:
+ print(decompress(packed) == unpacked)
+ print(decompress(packed, deflate.ZLIB) == unpacked)
+
+# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream).
+# Make sure there're no problem decompressing this.
+data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00"
+print(decompress(data_raw_with_footer, deflate.RAW))
+
+# Valid wbits values.
+decompress_error(data_wbits_5, deflate.RAW, -1)
+print(len(decompress(data_wbits_5, deflate.RAW, 0)))
+decompress_error(data_wbits_5, deflate.RAW, 1)
+decompress_error(data_wbits_5, deflate.RAW, 4)
+for i in range(5, 16):
+ print(len(decompress(data_wbits_5, deflate.RAW, i)))
+decompress_error(data_wbits_5, deflate.RAW, 16)
+
+# Invalid values for format.
+decompress_error(data_raw, -1)
+decompress_error(data_raw, 5)
+
+# Data that requires a higher wbits value.
+decompress_error(data_wbits_6, deflate.RAW, 5)
+print(len(decompress(data_wbits_6, deflate.RAW, 6)))
+print(len(decompress(data_wbits_6, deflate.RAW, 7)))
+decompress_error(data_wbits_8, deflate.RAW, 7)
+print(len(decompress(data_wbits_8, deflate.RAW, 8)))
+print(len(decompress(data_wbits_8, deflate.RAW, 9)))
+decompress_error(data_wbits_10, deflate.RAW)
+decompress_error(data_wbits_10, deflate.RAW, 9)
+print(len(decompress(data_wbits_10, deflate.RAW, 10)))
+
+# zlib header sets the size, so works with wbits unset or wbits >= 10.
+data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)"
+print(len(decompress(data_wbits_10_zlib, deflate.ZLIB)))
+decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9)
+print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10)))
+print(len(decompress(data_wbits_10_zlib)))
diff --git a/tests/extmod/deflate_decompress.py.exp b/tests/extmod/deflate_decompress.py.exp
new file mode 100644
index 000000000..381f2b068
--- /dev/null
+++ b/tests/extmod/deflate_decompress.py.exp
@@ -0,0 +1,80 @@
+b'micropython hello world hello world micropython'
+b'micropython hello world hello world micropython'
+b'micropython hello world hello world micropython'
+b'micropython hello world hello world micropython'
+b'micropython hello world hello world micropython'
+OSError
+OSError
+OSError
+OSError
+OSError
+OSError
+OSError
+OSError
+OSError
+EOFError
+0
+b'm'
+4
+b'i'
+5
+b'cr'
+7
+b'opython hello world hello world micropython'
+36
+b''
+36
+b''
+OSError
+OSError
+OSError
+OSError
+ValueError
+OSError
+b'micropython'
+True
+True
+True
+True
+True
+True
+True
+True
+True
+True
+True
+True
+True
+True
+b'micropython hello world hello world micropython'
+ValueError
+310
+ValueError
+ValueError
+310
+310
+310
+310
+310
+310
+310
+310
+310
+310
+310
+ValueError
+ValueError
+ValueError
+OSError
+310
+310
+OSError
+310
+310
+OSError
+OSError
+2010
+2010
+OSError
+2010
+2010
diff --git a/tests/extmod/deflate_stream_error.py b/tests/extmod/deflate_stream_error.py
new file mode 100644
index 000000000..aee6b2803
--- /dev/null
+++ b/tests/extmod/deflate_stream_error.py
@@ -0,0 +1,89 @@
+# Test deflate module with stream errors.
+
+try:
+ # Check if deflate & IOBase are available.
+ import deflate, io
+
+ io.IOBase
+except (ImportError, AttributeError):
+ print("SKIP")
+ raise SystemExit
+
+# Check if compression is enabled.
+if not hasattr(deflate.DeflateIO, "write"):
+ print("SKIP")
+ raise SystemExit
+
+formats = (deflate.RAW, deflate.ZLIB, deflate.GZIP)
+
+# Test error on read when decompressing.
+
+
+class Stream(io.IOBase):
+ def readinto(self, buf):
+ print("Stream.readinto", len(buf))
+ return -1
+
+
+try:
+ deflate.DeflateIO(Stream()).read()
+except OSError as er:
+ print(repr(er))
+
+# Test error on write when compressing.
+
+
+class Stream(io.IOBase):
+ def write(self, buf):
+ print("Stream.write", buf)
+ return -1
+
+
+for format in formats:
+ try:
+ deflate.DeflateIO(Stream(), format).write("a")
+ except OSError as er:
+ print(repr(er))
+
+# Test write after close.
+
+
+class Stream(io.IOBase):
+ def write(self, buf):
+ print("Stream.write", buf)
+ return -1
+
+ def ioctl(self, cmd, arg):
+ print("Stream.ioctl", cmd, arg)
+ return 0
+
+
+try:
+ d = deflate.DeflateIO(Stream(), deflate.RAW, 0, True)
+ d.close()
+ d.write("a")
+except OSError as er:
+ print(repr(er))
+
+# Test error on write when closing.
+
+
+class Stream(io.IOBase):
+ def __init__(self):
+ self.num_writes = 0
+
+ def write(self, buf):
+ print("Stream.write", buf)
+ if self.num_writes >= 4:
+ return -1
+ self.num_writes += 1
+ return len(buf)
+
+
+for format in formats:
+ d = deflate.DeflateIO(Stream(), format)
+ d.write("a")
+ try:
+ d.close()
+ except OSError as er:
+ print(repr(er))
diff --git a/tests/extmod/deflate_stream_error.py.exp b/tests/extmod/deflate_stream_error.py.exp
new file mode 100644
index 000000000..4ec90d799
--- /dev/null
+++ b/tests/extmod/deflate_stream_error.py.exp
@@ -0,0 +1,25 @@
+Stream.readinto 1
+OSError(1,)
+Stream.write bytearray(b'K')
+OSError(1,)
+Stream.write bytearray(b'\x18\x95')
+OSError(22,)
+Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03')
+OSError(22,)
+Stream.ioctl 4 0
+OSError(22,)
+Stream.write bytearray(b'K')
+Stream.write bytearray(b'\x04')
+Stream.write bytearray(b'\x00')
+Stream.write bytearray(b'\x18\x95')
+Stream.write bytearray(b'K')
+Stream.write bytearray(b'\x04')
+Stream.write bytearray(b'\x00')
+Stream.write bytearray(b'\x00b\x00b')
+OSError(1,)
+Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03')
+Stream.write bytearray(b'K')
+Stream.write bytearray(b'\x04')
+Stream.write bytearray(b'\x00')
+Stream.write bytearray(b'C\xbe\xb7\xe8\x01\x00\x00\x00')
+OSError(1,)