From 81fa631d1696e2a8496b18f09428d2054fcffff7 Mon Sep 17 00:00:00 2001 From: Florian Brandes Date: Sat, 6 Jul 2024 20:14:49 +0200 Subject: [PATCH] add more tests, refactor sign and encrypt Signed-off-by: Florian Brandes --- .gitignore | 2 + devenv.nix | 1 + smtprd_ng/smtprd.py | 26 ++++++++---- tests/privkey.pem | 16 +++++++ tests/signer.pem | 17 ++++++++ tests/test_smtprd_ng.py | 94 ++++++++++++++++++++++++++++++++++++++++- 6 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 tests/privkey.pem create mode 100644 tests/signer.pem diff --git a/.gitignore b/.gitignore index ba9b650..29ee88c 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,8 @@ config.ini *.pfx *.crt *.pem +!tests/privkey.pem +!tests/signer.pem # nix result diff --git a/devenv.nix b/devenv.nix index 4618f7b..fc5c50b 100644 --- a/devenv.nix +++ b/devenv.nix @@ -53,6 +53,7 @@ in # look for code smell pylint.enable = true; detect-private-keys.enable = true; + detect-private-keys.excludes = [ "tests/" ]; }; diff --git a/smtprd_ng/smtprd.py b/smtprd_ng/smtprd.py index 61f7736..074ea46 100644 --- a/smtprd_ng/smtprd.py +++ b/smtprd_ng/smtprd.py @@ -147,7 +147,7 @@ class Config: class SMTPClient(SMTP): """Client part of library - This will connect to an upstream SMTP server to deliver the mal + This will connect to an upstream SMTP server to deliver the mail """ @@ -206,7 +206,7 @@ class SMTPClient(SMTP): message (Message): message object Returns: - bytes: The signed message including the From and To Header + bytes: The signed message """ with open(Path(self._config.smime_cert_private), "rb") as key_data: key = load_pem_private_key(key_data.read(), password=None) @@ -220,12 +220,20 @@ class SMTPClient(SMTP): .add_signer(cert, key, hashes.SHA512(), rsa_padding=padding.PKCS1v15()) .sign(Encoding.SMIME, [pkcs7.PKCS7Options.DetachedSignature]) ) - # Add correct headers - # new = b"From: " + self._config.sender.encode() + b"\r\n" + output - # new = b"To: " + ", ".join(self._config.recipients).encode() + b"\r\n" + new - # new = b"Subject: " + message.get("Subject", "").encode() + b"\r\n" + new - new = self._encrypt(output, message.get("Subject", "")) - return new + return output + + def _encrypt_and_sign(self, message: Message) -> bytes: + """Sign and encrypt the message + + Args: + message (Message): message object + + Returns: + bytes: The signed and encrypted message including the From and To Header + """ + signed = self._sign(message) + encrypt = self._encrypt(signed, message.get("Subject", "")) + return encrypt async def _send_message( self, message: bytes, sender: str, recipients: List[str] @@ -275,7 +283,7 @@ class SMTPClient(SMTP): f"'{message.get('Subject', '')}'" ) if self._config.smime_cert: - message = self._sign(message) + message = self._encrypt_and_sign(message) async with ( self._lock ): # TODO: consumer task from spool queue, reusing connections diff --git a/tests/privkey.pem b/tests/privkey.pem new file mode 100644 index 0000000..65a794b --- /dev/null +++ b/tests/privkey.pem @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBALb6E084QJJVnRGg +Tij6Mc+ZzO3bV8Izt+A2RkpV4aYTjClVc2pLHh8KyjTA/bqnUYXfQCEDTHx2RsE5 +DqmyaYxWejcptw3Ht1YkZO2RW7zLSNQjlwFfcgf92XH1uzUxido/Lbw8LkHnJqCO +4jemU9wP9b82JBgou91VEzqTfHPXAgMBAAECgYAc601d+fAKsMlQXdu8kj6JJy/C +cCZgpTfskeduHEC7tN80MTM6m4C5O0VWLSJs+8DgvbYvAYx3J2Jra48rtu0DYrd/ +gWqijODMDr6l8jgEeA8DN0OwcWhTB6unowHzR7RadijzP0nOIiGyArPtguGk4XKw +NcRoU36bxb/smBCaIQJBAOFMt0xTx6celUUB+b7HKNLe9kZb2shSZXzKIuzqAPJd +8TafC2CSt6iFTfhwr+kARz300r0y/JrGG30a6B8kRb8CQQDP6PvDoY6atLU35h+l +WjuvkOKaR6ny/uarNP3nHapxgC9cNUkySFBDrSIJhNorYSGSMxqhEGUPWHzhH0PG +H0fpAkEAkelpXNl1mFpKOiMJZ/D8E3Wq8e5TRyF18NfIvr7eVhlZOxLN/4GFyHJt +CNWSV8iCWzHPuhDnYCWlb+SZKHIJaQJACs4i94HX9XZazLLrBh7wZylyfW4oCPby +agdxAqfqCcgNrg8e5LwZX8sJr9D1vbdolT6Orbw6ZFfG9bQ4Q32wsQJAZzbkDv+l +UBCR9qNqEpOtyOhBV8b8NzKgI/SBjgNbfzOki3r88qkwbJSMOZPZv3kkwmkjLTfz +x7eQLB/6eQFmFw== +-----END PRIVATE KEY----- diff --git a/tests/signer.pem b/tests/signer.pem new file mode 100644 index 0000000..a9340ae --- /dev/null +++ b/tests/signer.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICsDCCAhmgAwIBAgIUF0xm6FBeRujH+4/G1506jAty04UwDQYJKoZIhvcNAQEL +BQAwajELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEjMCEGCSqGSIb3DQEJARYUc2VuZGVy +QGxvY2FsaG9zdC5jb20wHhcNMjQwNzA2MTEyMjU2WhcNMjUwNzA2MTEyMjU2WjBq +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMSMwIQYJKoZIhvcNAQkBFhRzZW5kZXJAbG9j +YWxob3N0LmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAtvoTTzhAklWd +EaBOKPoxz5nM7dtXwjO34DZGSlXhphOMKVVzakseHwrKNMD9uqdRhd9AIQNMfHZG +wTkOqbJpjFZ6Nym3Dce3ViRk7ZFbvMtI1COXAV9yB/3ZcfW7NTGJ2j8tvDwuQecm +oI7iN6ZT3A/1vzYkGCi73VUTOpN8c9cCAwEAAaNTMFEwHQYDVR0OBBYEFDFFLpTd +y1co5C6OVCLJRgIicstMMB8GA1UdIwQYMBaAFDFFLpTdy1co5C6OVCLJRgIicstM +MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADgYEATdAHRrnrl2aQ8SI3 +kQgUsTp5hHc3M5b2+ZbNYCsz7SYQBtHniGId9vBh941gUs8R8X16Cdp0pjVAayeU +CCW1Zs47tA9IIT1hslOORibTcSQKr7TI+RprURyky8m2T9PbOSLgmnjlbydbxN7L +hxx8dg1pWRfEKBSvO1gOkcTo7SQ= +-----END CERTIFICATE----- diff --git a/tests/test_smtprd_ng.py b/tests/test_smtprd_ng.py index ceb5486..2f35d34 100644 --- a/tests/test_smtprd_ng.py +++ b/tests/test_smtprd_ng.py @@ -21,13 +21,36 @@ Tests for smtprd_ng # pylint: disable=protected-access import configparser +import email +import email.message from pathlib import Path +from M2Crypto import BIO, SMIME, X509 + from smtprd_ng import smtprd +def config() -> smtprd.ClientConfig: + """Return method for settings used here in test""" + cfg = smtprd.ClientConfig( + hostname="localhost", + port=1234, + username="user", + password="pass", + use_tls=False, + start_tls=False, + sender="sender", + recipients=["recipient"], + set_reply_to="", + smime_to_cert=Path("tests", "signer.pem"), + smime_cert=Path("tests", "signer.pem"), + smime_cert_private=Path("tests", "privkey.pem"), + ) + return cfg + + def test_config_from_config(): - """Test opening and readig the config file""" + """Test opening and reading the config file""" config_parser: configparser.ConfigParser = configparser.ConfigParser() with open(Path("config.example"), "r", encoding="utf8") as fp: config_parser.read_file(fp) @@ -35,12 +58,79 @@ def test_config_from_config(): assert cfg.server.hostname == "localhost" assert cfg.client.port == 465 assert cfg.client.smime_cert == "" + assert cfg.client.use_tls is True def test_config_from_ini(): - """Test parsing the config file and using fallbacks""" + """Test parsing the config file and using fallbacks. + This is mostly redundant to the above test""" cfg = smtprd.Config.from_ini(Path("config.example")) assert cfg.server.hostname == "localhost" assert cfg.client.port == 465 assert cfg.client.smime_cert == "" assert cfg.client.use_tls is True + + +def test_client_encrypt(): + """Test encryption. Keys were generated with + openssl req -newkey rsa:1024 -nodes -x509 -days 365 -out signer.pem + """ + + client = smtprd.SMTPClient(config()) + encrypted = client._encrypt(b"abc", "subject") + lines = encrypted.decode().splitlines() + # Test format of header + assert lines[0] == "From: " + str(config().sender) + assert lines[1] == "To: " + str(config().recipients[0]) + assert lines[2] == "Subject: " + "subject" + assert lines[3] == "MIME-Version: 1.0" + assert lines[4] == 'Content-Disposition: attachment; filename="smime.p7m"' + assert ( + lines[5] + == 'Content-Type: application/x-pkcs7-mime; smime-type=enveloped-data; name="smime.p7m"' + ) + assert lines[6] == "Content-Transfer-Encoding: base64" + # new line is important to seperate header from body + assert lines[7] == "" + assert ( + lines[8] == "MIIBdgYJKoZIhvcNAQcDoIIBZzCCAWMCAQAxggEeMIIBGgIBADCBgjBqMQswCQYD" + ) + + s = SMIME.SMIME() + s.load_key(config().smime_cert_private, config().smime_cert) + buf = BIO.MemoryBuffer(encrypted) + p7, _ = SMIME.smime_load_pkcs7_bio(buf) + out = s.decrypt(p7) + assert b"abc" in out + + +def test_client_sign(): + """Test signing""" + client = smtprd.SMTPClient(config()) + message = email.message.EmailMessage() + message.set_content("Test to sign") + # header are needed, because email.message.Emailmessage will add a + # newline between the boundary and the content if no header is found + message.add_header("From", config().sender) + message.add_header("To", config().recipients[0]) + message.add_header("Subject", "Test") + signed = client._sign(message) + assert "This is an S/MIME signed message" in signed.decode() + assert "Test to sign" in signed.decode() + + s = SMIME.SMIME() + # Load the signer's cert. + x509 = X509.load_cert(Path(config().smime_cert)) + sk = X509.X509_Stack() + sk.push(x509) + s.set_x509_stack(sk) + # Load the signer's CA cert. In this case, because the signer's + # cert is self-signed, it is the signer's cert itself. + st = X509.X509_Store() + st.load_info(str(config().smime_cert)) + s.set_x509_store(st) + # Load the data, verify it. + buf = BIO.MemoryBuffer(signed) + p7, data = SMIME.smime_load_pkcs7_bio(buf) + v = s.verify(p7, data) + assert "Test to sign" in v.decode()