diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client.py | 73 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client.py.exp | 8 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_cert_required_error.py | 76 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_cert_required_error.py.exp | 10 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_readline.py | 77 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_readline.py.exp | 10 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_verify_error.py | 77 | ||||
| -rw-r--r-- | tests/multi_net/asyncio_tls_server_client_verify_error.py.exp | 12 | ||||
| -rw-r--r-- | tests/net_inet/asyncio_tls_open_connection_readline.py | 59 | ||||
| -rw-r--r-- | tests/net_inet/isrg.der | bin | 0 -> 1391 bytes |
10 files changed, 402 insertions, 0 deletions
diff --git a/tests/multi_net/asyncio_tls_server_client.py b/tests/multi_net/asyncio_tls_server_client.py new file mode 100644 index 000000000..996cdb3e0 --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client.py @@ -0,0 +1,73 @@ +# Test asyncio TCP server and client with TLS, transferring some data. + +try: + import os + import asyncio + import ssl +except ImportError: + print("SKIP") + raise SystemExit + +PORT = 8000 + +# These are test certificates. See tests/README.md for details. +cert = cafile = "multi_net/rsa_cert.der" +key = "multi_net/rsa_key.der" + +try: + os.stat(cafile) + os.stat(key) +except OSError: + print("SKIP") + raise SystemExit + + +async def handle_connection(reader, writer): + data = await reader.read(100) + print("echo:", data) + writer.write(data) + await writer.drain() + + print("close") + writer.close() + await writer.wait_closed() + + print("done") + ev.set() + + +async def tcp_server(): + global ev + + server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + server_ctx.load_cert_chain(cert, key) + ev = asyncio.Event() + server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT, ssl=server_ctx) + print("server running") + multitest.next() + async with server: + await asyncio.wait_for(ev.wait(), 10) + + +async def tcp_client(message): + client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_ctx.verify_mode = ssl.CERT_REQUIRED + client_ctx.load_verify_locations(cafile=cafile) + reader, writer = await asyncio.open_connection( + IP, PORT, ssl=client_ctx, server_hostname="micropython.local" + ) + print("write:", message) + writer.write(message) + await writer.drain() + data = await reader.read(100) + print("read:", data) + + +def instance0(): + multitest.globals(IP=multitest.get_network_ip()) + asyncio.run(tcp_server()) + + +def instance1(): + multitest.next() + asyncio.run(tcp_client(b"client data")) diff --git a/tests/multi_net/asyncio_tls_server_client.py.exp b/tests/multi_net/asyncio_tls_server_client.py.exp new file mode 100644 index 000000000..6dc6a9bbc --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client.py.exp @@ -0,0 +1,8 @@ +--- instance0 --- +server running +echo: b'client data' +close +done +--- instance1 --- +write: b'client data' +read: b'client data' diff --git a/tests/multi_net/asyncio_tls_server_client_cert_required_error.py b/tests/multi_net/asyncio_tls_server_client_cert_required_error.py new file mode 100644 index 000000000..bd4d7b82e --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_cert_required_error.py @@ -0,0 +1,76 @@ +# Test asyncio TCP server and client with TLS, giving a cert required error. + +try: + import os + import asyncio + import ssl +except ImportError: + print("SKIP") + raise SystemExit + +PORT = 8000 + +# These are test certificates. See tests/README.md for details. +cert = cafile = "multi_net/rsa_cert.der" +key = "multi_net/rsa_key.der" + +try: + os.stat(cafile) + os.stat(key) +except OSError: + print("SKIP") + raise SystemExit + + +async def handle_connection(reader, writer): + print("handle connection") + try: + data = await reader.read(100) + except Exception as e: + print(e) + ev.set() + + +async def tcp_server(): + global ev + + server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + server_ctx.load_cert_chain(cert, key) + server_ctx.verify_mode = ssl.CERT_REQUIRED + server_ctx.load_verify_locations(cafile=cert) + ev = asyncio.Event() + server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT, ssl=server_ctx) + print("server running") + multitest.next() + async with server: + await asyncio.wait_for(ev.wait(), 10) + multitest.wait("finished") + print("server done") + + +async def tcp_client(message): + client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_ctx.verify_mode = ssl.CERT_REQUIRED + client_ctx.load_verify_locations(cafile=cafile) + reader, writer = await asyncio.open_connection( + IP, PORT, ssl=client_ctx, server_hostname="micropython.local" + ) + try: + print("write:", message) + writer.write(message) + print("drain") + await writer.drain() + except Exception as e: + print(e) + print("client done") + multitest.broadcast("finished") + + +def instance0(): + multitest.globals(IP=multitest.get_network_ip()) + asyncio.run(tcp_server()) + + +def instance1(): + multitest.next() + asyncio.run(tcp_client(b"client data")) diff --git a/tests/multi_net/asyncio_tls_server_client_cert_required_error.py.exp b/tests/multi_net/asyncio_tls_server_client_cert_required_error.py.exp new file mode 100644 index 000000000..0f905d0d2 --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_cert_required_error.py.exp @@ -0,0 +1,10 @@ +--- instance0 --- +server running +handle connection +(-29824, 'MBEDTLS_ERR_SSL_NO_CLIENT_CERTIFICATE') +server done +--- instance1 --- +write: b'client data' +drain +(-30592, 'MBEDTLS_ERR_SSL_FATAL_ALERT_MESSAGE') +client done diff --git a/tests/multi_net/asyncio_tls_server_client_readline.py b/tests/multi_net/asyncio_tls_server_client_readline.py new file mode 100644 index 000000000..28add38f5 --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_readline.py @@ -0,0 +1,77 @@ +# Test asyncio TCP server and client with TLS, using readline() to read data. + +try: + import os + import asyncio + import ssl +except ImportError: + print("SKIP") + raise SystemExit + +PORT = 8000 + +# These are test certificates. See tests/README.md for details. +cert = cafile = "multi_net/rsa_cert.der" +key = "multi_net/rsa_key.der" + +try: + os.stat(cafile) + os.stat(key) +except OSError: + print("SKIP") + raise SystemExit + + +async def handle_connection(reader, writer): + data = await reader.readline() + print("echo:", data) + data2 = await reader.readline() + print("echo:", data2) + writer.write(data + data2) + await writer.drain() + + print("close") + writer.close() + await writer.wait_closed() + + print("done") + ev.set() + + +async def tcp_server(): + global ev + + server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + server_ctx.load_cert_chain(cert, key) + ev = asyncio.Event() + server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT, ssl=server_ctx) + print("server running") + multitest.next() + async with server: + await asyncio.wait_for(ev.wait(), 10) + + +async def tcp_client(message): + client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_ctx.verify_mode = ssl.CERT_REQUIRED + client_ctx.load_verify_locations(cafile=cafile) + reader, writer = await asyncio.open_connection( + IP, PORT, ssl=client_ctx, server_hostname="micropython.local" + ) + print("write:", message) + writer.write(message) + await writer.drain() + data = await reader.readline() + print("read:", data) + data2 = await reader.readline() + print("read:", data2) + + +def instance0(): + multitest.globals(IP=multitest.get_network_ip()) + asyncio.run(tcp_server()) + + +def instance1(): + multitest.next() + asyncio.run(tcp_client(b"client data\nclient data2\n")) diff --git a/tests/multi_net/asyncio_tls_server_client_readline.py.exp b/tests/multi_net/asyncio_tls_server_client_readline.py.exp new file mode 100644 index 000000000..4c93c5729 --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_readline.py.exp @@ -0,0 +1,10 @@ +--- instance0 --- +server running +echo: b'client data\n' +echo: b'client data2\n' +close +done +--- instance1 --- +write: b'client data\nclient data2\n' +read: b'client data\n' +read: b'client data2\n' diff --git a/tests/multi_net/asyncio_tls_server_client_verify_error.py b/tests/multi_net/asyncio_tls_server_client_verify_error.py new file mode 100644 index 000000000..46a476add --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_verify_error.py @@ -0,0 +1,77 @@ +# Test asyncio TCP server and client with TLS, and an incorrect server_hostname. + +try: + import os + import asyncio + import ssl +except ImportError: + print("SKIP") + raise SystemExit + +PORT = 8000 + +# These are test certificates. See tests/README.md for details. +cert = cafile = "multi_net/rsa_cert.der" +key = "multi_net/rsa_key.der" + +try: + os.stat(cafile) + os.stat(key) +except OSError: + print("SKIP") + raise SystemExit + + +async def handle_connection(reader, writer): + print("handle connection") + try: + data = await reader.read(100) + except Exception as e: + print(e) + ev.set() + + +async def tcp_server(): + global ev + + server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + server_ctx.load_cert_chain(cert, key) + ev = asyncio.Event() + server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT, ssl=server_ctx) + print("server running") + multitest.next() + async with server: + await asyncio.wait_for(ev.wait(), 10) + print("server done") + multitest.broadcast("finished") + + +async def tcp_client(message): + client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_ctx.verify_mode = ssl.CERT_REQUIRED + client_ctx.load_verify_locations(cafile=cafile) + reader, writer = await asyncio.open_connection( + IP, + PORT, + ssl=client_ctx, + server_hostname="foobar.local", # incorrect server_hostname + ) + try: + print("write:", message) + writer.write(message) + print("drain") + await writer.drain() + except Exception as e: + print(e) + multitest.wait("finished") + print("client done") + + +def instance0(): + multitest.globals(IP=multitest.get_network_ip()) + asyncio.run(tcp_server()) + + +def instance1(): + multitest.next() + asyncio.run(tcp_client(b"client data")) diff --git a/tests/multi_net/asyncio_tls_server_client_verify_error.py.exp b/tests/multi_net/asyncio_tls_server_client_verify_error.py.exp new file mode 100644 index 000000000..36d0ab00f --- /dev/null +++ b/tests/multi_net/asyncio_tls_server_client_verify_error.py.exp @@ -0,0 +1,12 @@ +--- instance0 --- +server running +handle connection +(-30592, 'MBEDTLS_ERR_SSL_FATAL_ALERT_MESSAGE') +server done +--- instance1 --- +write: b'client data' +drain + +The certificate Common Name (CN) does not match with the expected CN + +client done diff --git a/tests/net_inet/asyncio_tls_open_connection_readline.py b/tests/net_inet/asyncio_tls_open_connection_readline.py new file mode 100644 index 000000000..70145d91a --- /dev/null +++ b/tests/net_inet/asyncio_tls_open_connection_readline.py @@ -0,0 +1,59 @@ +import ssl +import os +import asyncio + +# This certificate was obtained from micropython.org using openssl: +# $ openssl s_client -showcerts -connect micropython.org:443 </dev/null 2>/dev/null +# The certificate is from Let's Encrypt: +# 1 s:/C=US/O=Let's Encrypt/CN=R3 +# i:/C=US/O=Internet Security Research Group/CN=ISRG Root X1 +# Validity +# Not Before: Sep 4 00:00:00 2020 GMT +# Not After : Sep 15 16:00:00 2025 GMT +# Copy PEM content to a file (certmpy.pem) and convert to DER e.g. +# $ openssl x509 -in certmpy.pem -out certmpy.der -outform DER +# Then convert to hex format, eg using binascii.hexlify(data). + +# Note that the instructions above is to obtain an intermediate +# root CA cert that works for MicroPython. However CPython needs the ultimate root CA +# cert from ISRG + +ca_cert_chain = "isrg.der" + +try: + os.stat(ca_cert_chain) +except OSError: + print("SKIP") + raise SystemExit + +with open(ca_cert_chain, "rb") as ca: + cadata = ca.read() + +client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +client_ctx.verify_mode = ssl.CERT_REQUIRED +client_ctx.load_verify_locations(cadata=cadata) + + +async def http_get(url, port, sslctx): + reader, writer = await asyncio.open_connection(url, port, ssl=sslctx) + + print("write GET") + writer.write(b"GET / HTTP/1.0\r\n\r\n") + await writer.drain() + + print("read response") + while True: + data = await reader.readline() + # avoid printing datetime which makes the test fail + if b"GMT" not in data: + print("read:", data) + if not data: + break + + print("close") + writer.close() + await writer.wait_closed() + print("done") + + +asyncio.run(http_get("micropython.org", 443, client_ctx)) diff --git a/tests/net_inet/isrg.der b/tests/net_inet/isrg.der Binary files differnew file mode 100644 index 000000000..9d2132e7f --- /dev/null +++ b/tests/net_inet/isrg.der |
