add more tests, refactor sign and encrypt

Signed-off-by: Florian Brandes <florian.brandes@posteo.de>
This commit is contained in:
2024-07-06 20:14:49 +02:00
parent cb5590b2b5
commit 81fa631d16
6 changed files with 145 additions and 11 deletions

2
.gitignore vendored
View File

@@ -12,6 +12,8 @@ config.ini
*.pfx *.pfx
*.crt *.crt
*.pem *.pem
!tests/privkey.pem
!tests/signer.pem
# nix # nix
result result

View File

@@ -53,6 +53,7 @@ in
# look for code smell # look for code smell
pylint.enable = true; pylint.enable = true;
detect-private-keys.enable = true; detect-private-keys.enable = true;
detect-private-keys.excludes = [ "tests/" ];
}; };

View File

@@ -147,7 +147,7 @@ class Config:
class SMTPClient(SMTP): class SMTPClient(SMTP):
"""Client part of library """Client part of library
This will connect to an upstream SMTP server to deliver the mal This will connect to an upstream SMTP server to deliver the mail
""" """
@@ -206,7 +206,7 @@ class SMTPClient(SMTP):
message (Message): message object message (Message): message object
Returns: Returns:
bytes: The signed message including the From and To Header bytes: The signed message
""" """
with open(Path(self._config.smime_cert_private), "rb") as key_data: with open(Path(self._config.smime_cert_private), "rb") as key_data:
key = load_pem_private_key(key_data.read(), password=None) key = load_pem_private_key(key_data.read(), password=None)
@@ -220,12 +220,20 @@ class SMTPClient(SMTP):
.add_signer(cert, key, hashes.SHA512(), rsa_padding=padding.PKCS1v15()) .add_signer(cert, key, hashes.SHA512(), rsa_padding=padding.PKCS1v15())
.sign(Encoding.SMIME, [pkcs7.PKCS7Options.DetachedSignature]) .sign(Encoding.SMIME, [pkcs7.PKCS7Options.DetachedSignature])
) )
# Add correct headers return output
# new = b"From: " + self._config.sender.encode() + b"\r\n" + output
# new = b"To: " + ", ".join(self._config.recipients).encode() + b"\r\n" + new def _encrypt_and_sign(self, message: Message) -> bytes:
# new = b"Subject: " + message.get("Subject", "").encode() + b"\r\n" + new """Sign and encrypt the message
new = self._encrypt(output, message.get("Subject", ""))
return new Args:
message (Message): message object
Returns:
bytes: The signed and encrypted message including the From and To Header
"""
signed = self._sign(message)
encrypt = self._encrypt(signed, message.get("Subject", ""))
return encrypt
async def _send_message( async def _send_message(
self, message: bytes, sender: str, recipients: List[str] self, message: bytes, sender: str, recipients: List[str]
@@ -275,7 +283,7 @@ class SMTPClient(SMTP):
f"'{message.get('Subject', '')}'" f"'{message.get('Subject', '')}'"
) )
if self._config.smime_cert: if self._config.smime_cert:
message = self._sign(message) message = self._encrypt_and_sign(message)
async with ( async with (
self._lock self._lock
): # TODO: consumer task from spool queue, reusing connections ): # TODO: consumer task from spool queue, reusing connections

16
tests/privkey.pem Normal file
View File

@@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBALb6E084QJJVnRGg
Tij6Mc+ZzO3bV8Izt+A2RkpV4aYTjClVc2pLHh8KyjTA/bqnUYXfQCEDTHx2RsE5
DqmyaYxWejcptw3Ht1YkZO2RW7zLSNQjlwFfcgf92XH1uzUxido/Lbw8LkHnJqCO
4jemU9wP9b82JBgou91VEzqTfHPXAgMBAAECgYAc601d+fAKsMlQXdu8kj6JJy/C
cCZgpTfskeduHEC7tN80MTM6m4C5O0VWLSJs+8DgvbYvAYx3J2Jra48rtu0DYrd/
gWqijODMDr6l8jgEeA8DN0OwcWhTB6unowHzR7RadijzP0nOIiGyArPtguGk4XKw
NcRoU36bxb/smBCaIQJBAOFMt0xTx6celUUB+b7HKNLe9kZb2shSZXzKIuzqAPJd
8TafC2CSt6iFTfhwr+kARz300r0y/JrGG30a6B8kRb8CQQDP6PvDoY6atLU35h+l
WjuvkOKaR6ny/uarNP3nHapxgC9cNUkySFBDrSIJhNorYSGSMxqhEGUPWHzhH0PG
H0fpAkEAkelpXNl1mFpKOiMJZ/D8E3Wq8e5TRyF18NfIvr7eVhlZOxLN/4GFyHJt
CNWSV8iCWzHPuhDnYCWlb+SZKHIJaQJACs4i94HX9XZazLLrBh7wZylyfW4oCPby
agdxAqfqCcgNrg8e5LwZX8sJr9D1vbdolT6Orbw6ZFfG9bQ4Q32wsQJAZzbkDv+l
UBCR9qNqEpOtyOhBV8b8NzKgI/SBjgNbfzOki3r88qkwbJSMOZPZv3kkwmkjLTfz
x7eQLB/6eQFmFw==
-----END PRIVATE KEY-----

17
tests/signer.pem Normal file
View File

@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICsDCCAhmgAwIBAgIUF0xm6FBeRujH+4/G1506jAty04UwDQYJKoZIhvcNAQEL
BQAwajELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEjMCEGCSqGSIb3DQEJARYUc2VuZGVy
QGxvY2FsaG9zdC5jb20wHhcNMjQwNzA2MTEyMjU2WhcNMjUwNzA2MTEyMjU2WjBq
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMSMwIQYJKoZIhvcNAQkBFhRzZW5kZXJAbG9j
YWxob3N0LmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAtvoTTzhAklWd
EaBOKPoxz5nM7dtXwjO34DZGSlXhphOMKVVzakseHwrKNMD9uqdRhd9AIQNMfHZG
wTkOqbJpjFZ6Nym3Dce3ViRk7ZFbvMtI1COXAV9yB/3ZcfW7NTGJ2j8tvDwuQecm
oI7iN6ZT3A/1vzYkGCi73VUTOpN8c9cCAwEAAaNTMFEwHQYDVR0OBBYEFDFFLpTd
y1co5C6OVCLJRgIicstMMB8GA1UdIwQYMBaAFDFFLpTdy1co5C6OVCLJRgIicstM
MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADgYEATdAHRrnrl2aQ8SI3
kQgUsTp5hHc3M5b2+ZbNYCsz7SYQBtHniGId9vBh941gUs8R8X16Cdp0pjVAayeU
CCW1Zs47tA9IIT1hslOORibTcSQKr7TI+RprURyky8m2T9PbOSLgmnjlbydbxN7L
hxx8dg1pWRfEKBSvO1gOkcTo7SQ=
-----END CERTIFICATE-----

View File

@@ -21,13 +21,36 @@ Tests for smtprd_ng
# pylint: disable=protected-access # pylint: disable=protected-access
import configparser import configparser
import email
import email.message
from pathlib import Path from pathlib import Path
from M2Crypto import BIO, SMIME, X509
from smtprd_ng import smtprd from smtprd_ng import smtprd
def config() -> smtprd.ClientConfig:
"""Return method for settings used here in test"""
cfg = smtprd.ClientConfig(
hostname="localhost",
port=1234,
username="user",
password="pass",
use_tls=False,
start_tls=False,
sender="sender",
recipients=["recipient"],
set_reply_to="",
smime_to_cert=Path("tests", "signer.pem"),
smime_cert=Path("tests", "signer.pem"),
smime_cert_private=Path("tests", "privkey.pem"),
)
return cfg
def test_config_from_config(): def test_config_from_config():
"""Test opening and readig the config file""" """Test opening and reading the config file"""
config_parser: configparser.ConfigParser = configparser.ConfigParser() config_parser: configparser.ConfigParser = configparser.ConfigParser()
with open(Path("config.example"), "r", encoding="utf8") as fp: with open(Path("config.example"), "r", encoding="utf8") as fp:
config_parser.read_file(fp) config_parser.read_file(fp)
@@ -35,12 +58,79 @@ def test_config_from_config():
assert cfg.server.hostname == "localhost" assert cfg.server.hostname == "localhost"
assert cfg.client.port == 465 assert cfg.client.port == 465
assert cfg.client.smime_cert == "" assert cfg.client.smime_cert == ""
assert cfg.client.use_tls is True
def test_config_from_ini(): def test_config_from_ini():
"""Test parsing the config file and using fallbacks""" """Test parsing the config file and using fallbacks.
This is mostly redundant to the above test"""
cfg = smtprd.Config.from_ini(Path("config.example")) cfg = smtprd.Config.from_ini(Path("config.example"))
assert cfg.server.hostname == "localhost" assert cfg.server.hostname == "localhost"
assert cfg.client.port == 465 assert cfg.client.port == 465
assert cfg.client.smime_cert == "" assert cfg.client.smime_cert == ""
assert cfg.client.use_tls is True assert cfg.client.use_tls is True
def test_client_encrypt():
"""Test encryption. Keys were generated with
openssl req -newkey rsa:1024 -nodes -x509 -days 365 -out signer.pem
"""
client = smtprd.SMTPClient(config())
encrypted = client._encrypt(b"abc", "subject")
lines = encrypted.decode().splitlines()
# Test format of header
assert lines[0] == "From: " + str(config().sender)
assert lines[1] == "To: " + str(config().recipients[0])
assert lines[2] == "Subject: " + "subject"
assert lines[3] == "MIME-Version: 1.0"
assert lines[4] == 'Content-Disposition: attachment; filename="smime.p7m"'
assert (
lines[5]
== 'Content-Type: application/x-pkcs7-mime; smime-type=enveloped-data; name="smime.p7m"'
)
assert lines[6] == "Content-Transfer-Encoding: base64"
# new line is important to seperate header from body
assert lines[7] == ""
assert (
lines[8] == "MIIBdgYJKoZIhvcNAQcDoIIBZzCCAWMCAQAxggEeMIIBGgIBADCBgjBqMQswCQYD"
)
s = SMIME.SMIME()
s.load_key(config().smime_cert_private, config().smime_cert)
buf = BIO.MemoryBuffer(encrypted)
p7, _ = SMIME.smime_load_pkcs7_bio(buf)
out = s.decrypt(p7)
assert b"abc" in out
def test_client_sign():
"""Test signing"""
client = smtprd.SMTPClient(config())
message = email.message.EmailMessage()
message.set_content("Test to sign")
# header are needed, because email.message.Emailmessage will add a
# newline between the boundary and the content if no header is found
message.add_header("From", config().sender)
message.add_header("To", config().recipients[0])
message.add_header("Subject", "Test")
signed = client._sign(message)
assert "This is an S/MIME signed message" in signed.decode()
assert "Test to sign" in signed.decode()
s = SMIME.SMIME()
# Load the signer's cert.
x509 = X509.load_cert(Path(config().smime_cert))
sk = X509.X509_Stack()
sk.push(x509)
s.set_x509_stack(sk)
# Load the signer's CA cert. In this case, because the signer's
# cert is self-signed, it is the signer's cert itself.
st = X509.X509_Store()
st.load_info(str(config().smime_cert))
s.set_x509_store(st)
# Load the data, verify it.
buf = BIO.MemoryBuffer(signed)
p7, data = SMIME.smime_load_pkcs7_bio(buf)
v = s.verify(p7, data)
assert "Test to sign" in v.decode()