summaryrefslogtreecommitdiff
path: root/tests/extmod/deflate_compress.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/extmod/deflate_compress.py')
-rw-r--r--tests/extmod/deflate_compress.py148
1 files changed, 148 insertions, 0 deletions
diff --git a/tests/extmod/deflate_compress.py b/tests/extmod/deflate_compress.py
new file mode 100644
index 000000000..612af663e
--- /dev/null
+++ b/tests/extmod/deflate_compress.py
@@ -0,0 +1,148 @@
+try:
+ # Check if deflate is available.
+ import deflate
+ import io
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# Check if compression is enabled.
+if not hasattr(deflate.DeflateIO, "write"):
+ print("SKIP")
+ raise SystemExit
+
+# Simple compression & decompression.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+data = b"micropython"
+N = 10
+for i in range(N):
+ g.write(data)
+g.close()
+result_raw = b.getvalue()
+print(len(result_raw) < len(data) * N)
+b = io.BytesIO(result_raw)
+g = deflate.DeflateIO(b, deflate.RAW)
+print(g.read())
+
+# Same, but using a context manager.
+b = io.BytesIO()
+with deflate.DeflateIO(b, deflate.RAW) as g:
+ for i in range(N):
+ g.write(data)
+result_raw = b.getvalue()
+print(len(result_raw) < len(data) * N)
+b = io.BytesIO(result_raw)
+with deflate.DeflateIO(b, deflate.RAW) as g:
+ print(g.read())
+
+# Writing to a closed underlying stream.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+g.write(b"micropython")
+b.close()
+try:
+ g.write(b"micropython")
+except ValueError:
+ print("ValueError")
+
+# Writing to a closed DeflateIO.
+b = io.BytesIO()
+g = deflate.DeflateIO(b, deflate.RAW)
+g.write(b"micropython")
+g.close()
+try:
+ g.write(b"micropython")
+except OSError:
+ print("OSError")
+
+
+def decompress(data, *args):
+ buf = io.BytesIO(data)
+ with deflate.DeflateIO(buf, *args) as g:
+ return g.read()
+
+
+def compress(data, *args):
+ b = io.BytesIO()
+ with deflate.DeflateIO(b, *args) as g:
+ g.write(data)
+ return b.getvalue()
+
+
+def compress_error(data, *args):
+ try:
+ compress(data, *args)
+ except OSError:
+ print("OSError")
+ except ValueError:
+ print("ValueError")
+
+
+# More test patterns.
+PATTERNS_RAW = (
+ (b"0", b"3\x00\x00"),
+ (b"a", b"K\x04\x00"),
+ (b"0" * 100, b"3\xa0\x03\x00\x00"),
+ (
+ bytes(range(64)),
+ b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
+ ),
+)
+for unpacked, packed in PATTERNS_RAW:
+ print(compress(unpacked) == packed)
+ print(compress(unpacked, deflate.RAW) == packed)
+
+# Verify header and checksum format.
+unpacked = b"hello"
+packed = b"\xcbH\xcd\xc9\xc9\x07\x00"
+
+
+def check_header(n, a, b):
+ if a == b:
+ print(n)
+ else:
+ print(n, a, b)
+
+
+check_header("RAW", compress(unpacked, deflate.RAW), packed)
+check_header(
+ "ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
+)
+check_header(
+ "ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
+)
+check_header(
+ "GZIP",
+ compress(unpacked, deflate.GZIP, 9),
+ b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
+)
+
+# Valid wbits values.
+compress_error(unpacked, deflate.RAW, -1)
+print(len(compress(unpacked, deflate.RAW, 0)))
+compress_error(unpacked, deflate.RAW, 1)
+compress_error(unpacked, deflate.RAW, 4)
+for i in range(5, 16):
+ print(len(compress(unpacked, deflate.RAW, i)))
+compress_error(unpacked, deflate.RAW, 16)
+
+# Invalid values for format.
+compress_error(unpacked, -1)
+compress_error(unpacked, 5)
+
+# Fill buf with a predictable pseudorandom sequence.
+buf = bytearray(1024)
+lfsr = 1 << 15 | 1
+for i in range(len(buf)):
+ bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
+ lfsr = (lfsr >> 1) | (bit << 15)
+ buf[i] = lfsr & 0xFF
+
+# Verify that compression improves as the window size increases.
+prev_len = len(buf)
+for wbits in range(5, 10):
+ result = compress(buf, deflate.RAW, wbits)
+ next_len = len(result)
+ print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
+ prev_len = next_len