fix pylint issues

Signed-off-by: Florian Brandes <florian.brandes@posteo.de>
This commit is contained in:
2024-07-05 12:41:22 +02:00
parent f4e6356792
commit 9cc0e2be50
3 changed files with 727 additions and 21 deletions

107
smtprd.py
View File

@@ -33,17 +33,22 @@ from cryptography.hazmat.primitives.serialization import (
pkcs7,
)
from cryptography.x509 import load_pem_x509_certificate
from envelope import Envelope
from envelope import Envelope as EnvelopeEnvelope
@dataclass(frozen=True)
class ServerConfig:
"""Serverconfig"""
hostname: str
port: int
# pylint: disable=too-many-instance-attributes
@dataclass(frozen=True)
class ClientConfig:
"""Clientconfig"""
hostname: str
port: int
sender: str
@@ -60,6 +65,13 @@ class ClientConfig:
@dataclass(frozen=True)
class Config:
"""Config main module
Raises:
RuntimeError: When config file cannot be parsed
"""
server: ServerConfig
client: ClientConfig
@@ -94,9 +106,20 @@ class Config:
@classmethod
def from_ini(cls, filename: str) -> "Config":
"""Opens and reads .INI config file
Args:
filename (str): config file path
Raises:
RuntimeError: on error in config file
Returns:
Config: ConfigParser object
"""
try:
config_parser: configparser.ConfigParser = configparser.ConfigParser()
with open(filename, "r") as fp:
with open(filename, "r", encoding="utf8") as fp:
config_parser.read_file(fp)
return cls._from_config(config_parser)
except (OSError, configparser.Error, UnicodeDecodeError) as e:
@@ -104,6 +127,12 @@ class Config:
class SMTPClient(SMTP):
"""Client part of library
This will connect to an upstream SMTP server to deliver the mal
"""
def __init__(self, config: ClientConfig) -> None:
self._config: ClientConfig = config
self._lock: asyncio.Lock = asyncio.Lock()
@@ -116,10 +145,10 @@ class SMTPClient(SMTP):
start_tls=self._config.start_tls,
)
def _encrypt_and_sign(self, message: Message, recipients: List[str]) -> Message:
def _encrypt_and_sign(self, message: Message) -> Message:
# Currently does NOT work
new_message = (
Envelope()
EnvelopeEnvelope()
.smime()
.load(message)
.sign(
@@ -168,8 +197,8 @@ class SMTPClient(SMTP):
) -> None:
# if we use send_message, it expects a message object
# This will add a newline after the signing process. The message object is fine
# but as soon as _as_bytes or _as_string is called, it will add a new line between the boundary of the
# signed message, which will ruin the signature
# but as soon as _as_bytes or _as_string is called, it will add a new line
# between the boundary of the signed message, which will ruin the signature
failed_recipients: Set[str] = set(
(
await self.sendmail(
@@ -179,7 +208,7 @@ class SMTPClient(SMTP):
)
)[0].keys()
)
if len(failed_recipients): # raise also when not all have been refused
if len(failed_recipients) > 0: # raise also when not all have been refused
raise SMTPRecipientsRefused(
[SMTPRecipientRefused(0, "", _) for _ in failed_recipients]
)
@@ -187,6 +216,18 @@ class SMTPClient(SMTP):
async def forward_message(
self, message: Message, sender: Optional[str], recipients: List[str]
) -> None:
"""Modify message to be passed on.
Remove original From, To and Reply-To and replace with "Original-X" header.
Call _sign function, if there is a cert saved in the config.
Pass it on to the actual send function
Args:
message (Message): message
sender (Optional[str]): Original sender
recipients (List[str]): Original recipients
"""
del message["From"]
del message["To"]
del message["Reply-To"]
@@ -216,13 +257,30 @@ class SMTPClient(SMTP):
self.close()
# pylint: disable=too-few-public-methods
class Handler:
"""Reimplement Handler"""
def __init__(self, client: SMTPClient) -> None:
"""Impelent our SMTPClient"""
self._client: SMTPClient = client
async def handle_DATA(
self, server: Server, session: Session, envelope: Envelope
async def handle_DATA( # pylint: disable=invalid-name
self,
server: Server,
session: Session, # pylint: disable=unused-argument
envelope: Envelope,
) -> str:
"""Forward the message from our server to our client function
Args:
server (Server): Our server
session (Session): Current session (Not used here)
envelope (Envelope): message envelope
Returns:
str: Status to client
"""
try:
await self._client.forward_message(
self._prepare_message(server, envelope),
@@ -232,17 +290,15 @@ class Handler:
except (RuntimeError, ValueError) as e:
print(f"Cannot forward: {str(e)}", file=sys.stderr)
return f"550 {e.__cause__.__class__.__name__ if e.__cause__ is not None else e.__class__.__name__}"
else:
return "250 OK"
return "250 OK"
@classmethod
def _parse_message(cls, data: Optional[Union[bytes, str]]) -> Message:
if isinstance(data, bytes):
return message_from_bytes(data, policy=EmailMessagePolicy)
elif isinstance(data, str):
if isinstance(data, str):
return message_from_string(data, policy=EmailMessagePolicy)
else:
raise ValueError(str(type(data)))
raise ValueError(str(type(data)))
def _prepare_message(self, server: Server, envelope: Envelope) -> Message:
message: Message = self._parse_message(envelope.content)
@@ -267,6 +323,8 @@ class Handler:
class SMTPServer(Controller):
"""Implementation of local SMTP Server Controller"""
def __init__(self, config: ServerConfig, handler: Handler) -> None:
super().__init__(
handler=handler,
@@ -276,9 +334,14 @@ class SMTPServer(Controller):
)
def run(self) -> bool:
"""Run routine
Returns:
bool: True on finished routine
"""
shutdown_requested: threading.Event = threading.Event()
def _handler(signum: int, frame) -> None:
def _handler(signum: int, frame) -> None: # pylint: disable=unused-argument
shutdown_requested.set()
signal.signal(signal.SIGINT, _handler)
@@ -293,6 +356,11 @@ class SMTPServer(Controller):
def main() -> int:
"""Main routine
Returns:
int: exit code
"""
parser = argparse.ArgumentParser(
description=__doc__.strip(),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
@@ -312,11 +380,10 @@ def main() -> int:
except RuntimeError as e:
print(str(e), file=sys.stderr)
return 1
else:
controller = SMTPServer(
config=config.server, handler=Handler(SMTPClient(config.client))
)
return 0 if controller.run() else 1
controller = SMTPServer(
config=config.server, handler=Handler(SMTPClient(config.client))
)
return 0 if controller.run() else 1
if __name__ == "__main__":