add a configurable list of recipients/cert dict

Signed-off-by: Florian Brandes <florian.brandes@posteo.de>
This commit is contained in:
2024-07-09 22:06:58 +02:00
parent d9053e36bc
commit 14c87f03ce
5 changed files with 105 additions and 47 deletions

17
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File test",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"env": {"PYTHONPATH": "${workspaceRoot}:${env:PYTHONPATH}"},
}
]
}

7
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -8,10 +8,16 @@ port = 465
username = foo@example.com username = foo@example.com
password = s3cr3t password = s3cr3t
sender = foo@example.com sender = foo@example.com
recipients = monitoring.foo@example.com, John Foobar <foo@baz.org>
# set_reply_to = false # set_reply_to = false
# use_tls = true # use_tls = true
# start_tls = false # start_tls = false
# smime_cert = /path/to/cert # smime_cert = /path/to/cert
# smime_cert_private = /path/to/cert.key # smime_cert_private = /path/to/cert.key
# smime_to_cert = /path/to/recipient_cert # smime_to_cert = /path/to/recipient_cert
[emails]
# If you want to encrypt outgoing email, add the recipient certificate to the email
# if not, add an empty string
# uses email = Path to certificate
# monitoring.foo@example.com = /path/to/recipient_cert
# unencrypted@example.com = ""

View File

@@ -70,7 +70,6 @@ class ClientConfig:
hostname: str hostname: str
port: int port: int
sender: str sender: str
recipients: List[str]
username: Optional[str] username: Optional[str]
password: Optional[str] password: Optional[str]
set_reply_to: bool set_reply_to: bool
@@ -78,7 +77,13 @@ class ClientConfig:
start_tls: bool start_tls: bool
smime_cert: str smime_cert: str
smime_cert_private: str smime_cert_private: str
smime_to_cert: str
@dataclass(frozen=True)
class EmailConfig:
"""email to cert matching"""
email_certs: dict
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -92,9 +97,19 @@ class Config:
server: ServerConfig server: ServerConfig
client: ClientConfig client: ClientConfig
emails: EmailConfig
@classmethod
def _get_emails_and_certs(cls, emailcertlist: list[tuple[str, str]]) -> dict:
"""Take the List from the INI file and convert to dict"""
email_certs = {}
for email, cert in emailcertlist:
email_certs[email] = cert
return email_certs
@classmethod @classmethod
def _from_config(cls, config: configparser.RawConfigParser) -> "Config": def _from_config(cls, config: configparser.RawConfigParser) -> "Config":
email_certs = cls._get_emails_and_certs(config.items("emails"))
return cls( return cls(
server=ServerConfig( server=ServerConfig(
hostname=config.get("server", "hostname", fallback="localhost"), hostname=config.get("server", "hostname", fallback="localhost"),
@@ -104,9 +119,6 @@ class Config:
hostname=config.get("client", "hostname"), hostname=config.get("client", "hostname"),
port=config.getint("client", "port"), port=config.getint("client", "port"),
sender=config.get("client", "sender"), sender=config.get("client", "sender"),
recipients=[
_.strip() for _ in config.get("client", "recipients").split(",")
],
username=config.get("client", "username"), username=config.get("client", "username"),
password=config.get("client", "password"), password=config.get("client", "password"),
set_reply_to=config.getboolean( set_reply_to=config.getboolean(
@@ -118,8 +130,8 @@ class Config:
smime_cert_private=config.get( smime_cert_private=config.get(
"client", "smime_cert_private", fallback="" "client", "smime_cert_private", fallback=""
), ),
smime_to_cert=config.get("client", "smime_to_cert", fallback=""),
), ),
emails=EmailConfig(email_certs=email_certs),
) )
@classmethod @classmethod
@@ -151,8 +163,9 @@ class SMTPClient(SMTP):
""" """
def __init__(self, config: ClientConfig) -> None: def __init__(self, config: ClientConfig, emails: EmailConfig) -> None:
self._config: ClientConfig = config self._config: ClientConfig = config
self._emails: EmailConfig = emails
self._lock: asyncio.Lock = asyncio.Lock() self._lock: asyncio.Lock = asyncio.Lock()
super().__init__( super().__init__(
hostname=self._config.hostname, hostname=self._config.hostname,
@@ -163,11 +176,13 @@ class SMTPClient(SMTP):
start_tls=self._config.start_tls, start_tls=self._config.start_tls,
) )
def _encrypt(self, message: bytes, subject: str) -> bytes: def _encrypt(self, message: bytes, subject: str, recipient: str) -> bytes:
"""Encrypt the message """Encrypt the message
Args: Args:
message (bytes): message in bytes format (can/should be signed) message (bytes): message in bytes format (can/should be signed)
subject (str): Subject of the message
recipient: Recipient of the message
Returns: Returns:
bytes: Encrypted message bytes: Encrypted message
@@ -176,7 +191,7 @@ class SMTPClient(SMTP):
s = SMIME.SMIME() s = SMIME.SMIME()
# Load target cert to encrypt to. # Load target cert to encrypt to.
x509 = X509.load_cert(self._config.smime_to_cert) x509 = X509.load_cert(self._emails.email_certs[recipient])
sk = X509.X509_Stack() sk = X509.X509_Stack()
sk.push(x509) sk.push(x509)
s.set_x509_stack(sk) s.set_x509_stack(sk)
@@ -191,7 +206,7 @@ class SMTPClient(SMTP):
# Add header # Add header
out.write("From: " + self._config.sender + "\r\n") out.write("From: " + self._config.sender + "\r\n")
out.write("To: " + ", ".join(self._config.recipients) + "\r\n") out.write("To: " + recipient + "\r\n")
out.write("Subject: " + subject + "\r\n") out.write("Subject: " + subject + "\r\n")
s.write(out, p7) s.write(out, p7)
@@ -222,7 +237,7 @@ class SMTPClient(SMTP):
) )
return output return output
def _encrypt_and_sign(self, message: Message) -> bytes: def _encrypt_and_sign(self, message: Message, recipient: str) -> bytes:
"""Sign and encrypt the message """Sign and encrypt the message
Args: Args:
@@ -232,7 +247,7 @@ class SMTPClient(SMTP):
bytes: The signed and encrypted message including the From and To Header bytes: The signed and encrypted message including the From and To Header
""" """
signed = self._sign(message) signed = self._sign(message)
encrypt = self._encrypt(signed, message.get("Subject", "")) encrypt = self._encrypt(signed, message.get("Subject", ""), recipient)
return encrypt return encrypt
async def _send_message( async def _send_message(
@@ -276,30 +291,28 @@ class SMTPClient(SMTP):
del message["Reply-To"] del message["Reply-To"]
message["Original-Sender"] = sender if sender is not None else "" message["Original-Sender"] = sender if sender is not None else ""
message["Original-Recipient"] = ", ".join(recipients) message["Original-Recipient"] = ", ".join(recipients)
for recipient in self._emails.email_certs:
print( print(
f"{self._config.sender} ({message['Original-Sender']}) -> " f"{self._config.sender} ({message['Original-Sender']}) -> "
f"{', '.join(self._config.recipients)} ({message['Original-Recipient']}): " f"{recipient} ({message['Original-Recipient']}): "
f"'{message.get('Subject', '')}'" f"'{message.get('Subject', '')}'"
) )
if self._config.smime_cert: if self._config.smime_cert and self._emails.email_certs[recipient] != "":
message = self._encrypt_and_sign(message) message = self._encrypt_and_sign(message, recipient)
async with ( async with (
self._lock self._lock
): # TODO: consumer task from spool queue, reusing connections ): # TODO: consumer task from spool queue, reusing connections
try: try:
await self.connect() await self.connect()
await self._send_message( await self._send_message(message, self._config.sender, recipient)
message, self._config.sender, self._config.recipients except SMTPRecipientsRefused as e:
) raise RuntimeError(
except SMTPRecipientsRefused as e: f"Recipients refused: {', '.join(_.recipient for _ in e.recipients)}"
raise RuntimeError( ) from e
f"Recipients refused: {', '.join(_.recipient for _ in e.recipients)}" except (SMTPException, OSError) as e:
) from e raise RuntimeError(str(e)) from e
except (SMTPException, OSError) as e: finally:
raise RuntimeError(str(e)) from e self.close()
finally:
self.close()
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
@@ -443,7 +456,7 @@ def main(args=None) -> int:
print(str(e), file=sys.stderr) print(str(e), file=sys.stderr)
return 1 return 1
controller = SMTPServer( controller = SMTPServer(
config=config.server, handler=Handler(SMTPClient(config.client)) config=config.server, handler=Handler(SMTPClient(config.client, config.emails))
) )
return 0 if controller.run() else 1 return 0 if controller.run() else 1

View File

@@ -42,15 +42,22 @@ def config() -> smtprd.ClientConfig:
use_tls=False, use_tls=False,
start_tls=False, start_tls=False,
sender="sender", sender="sender",
recipients=["recipient"],
set_reply_to="", set_reply_to="",
smime_to_cert=Path("tests", "signer.pem"),
smime_cert=Path("tests", "signer.pem"), smime_cert=Path("tests", "signer.pem"),
smime_cert_private=Path("tests", "privkey.pem"), smime_cert_private=Path("tests", "privkey.pem"),
) )
return cfg return cfg
def emails() -> smtprd.EmailConfig:
"""Return the email dict"""
cfg = smtprd.EmailConfig(
email_certs={"recipient": Path("tests", "signer.pem")},
)
return cfg
def test_config_from_config(): def test_config_from_config():
"""Test opening and reading the config file""" """Test opening and reading the config file"""
config_parser: configparser.ConfigParser = configparser.ConfigParser() config_parser: configparser.ConfigParser = configparser.ConfigParser()
@@ -73,17 +80,24 @@ def test_config_from_ini():
assert cfg.client.use_tls is True assert cfg.client.use_tls is True
def test_config_emailcerts():
"""test retrieval of email/cert pair"""
email_list = [("email1", "cert1"), ("email2", "cert2")]
cfg = smtprd.Config._get_emails_and_certs(email_list)
assert cfg["email2"] == "cert2"
def test_client_encrypt(): def test_client_encrypt():
"""Test encryption. Keys were generated with """Test encryption. Keys were generated with
openssl req -newkey rsa:1024 -nodes -x509 -days 365 -out signer.pem openssl req -newkey rsa:1024 -nodes -x509 -days 365 -out signer.pem
""" """
client = smtprd.SMTPClient(config()) client = smtprd.SMTPClient(config(), emails())
encrypted = client._encrypt(b"abc", "subject") encrypted = client._encrypt(b"abc", "subject", "recipient")
lines = encrypted.decode().splitlines() lines = encrypted.decode().splitlines()
# Test format of header # Test format of header
assert lines[0] == "From: " + str(config().sender) assert lines[0] == "From: " + str(config().sender)
assert lines[1] == "To: " + str(config().recipients[0]) assert lines[1] == "To: recipient"
assert lines[2] == "Subject: " + "subject" assert lines[2] == "Subject: " + "subject"
assert lines[3] == "MIME-Version: 1.0" assert lines[3] == "MIME-Version: 1.0"
assert lines[4] == 'Content-Disposition: attachment; filename="smime.p7m"' assert lines[4] == 'Content-Disposition: attachment; filename="smime.p7m"'
@@ -98,8 +112,9 @@ def test_client_encrypt():
lines[8] == "MIIBdgYJKoZIhvcNAQcDoIIBZzCCAWMCAQAxggEeMIIBGgIBADCBgjBqMQswCQYD" lines[8] == "MIIBdgYJKoZIhvcNAQcDoIIBZzCCAWMCAQAxggEeMIIBGgIBADCBgjBqMQswCQYD"
) )
# test decryption
s = SMIME.SMIME() s = SMIME.SMIME()
s.load_key(config().smime_cert_private, config().smime_cert) s.load_key(config().smime_cert_private, emails().email_certs["recipient"])
buf = BIO.MemoryBuffer(encrypted) buf = BIO.MemoryBuffer(encrypted)
p7, _ = SMIME.smime_load_pkcs7_bio(buf) p7, _ = SMIME.smime_load_pkcs7_bio(buf)
out = s.decrypt(p7) out = s.decrypt(p7)
@@ -108,13 +123,13 @@ def test_client_encrypt():
def test_client_sign(): def test_client_sign():
"""Test signing""" """Test signing"""
client = smtprd.SMTPClient(config()) client = smtprd.SMTPClient(config(), emails())
message = email.message.EmailMessage() message = email.message.EmailMessage()
message.set_content("Test to sign") message.set_content("Test to sign")
# header are needed, because email.message.Emailmessage will add a # header are needed, because email.message.Emailmessage will add a
# newline between the boundary and the content if no header is found # newline between the boundary and the content if no header is found
message.add_header("From", config().sender) message.add_header("From", config().sender)
message.add_header("To", config().recipients[0]) message.add_header("To", "recipient")
message.add_header("Subject", "Test") message.add_header("Subject", "Test")
signed = client._sign(message) signed = client._sign(message)
assert "This is an S/MIME signed message" in signed.decode() assert "This is an S/MIME signed message" in signed.decode()