Skip to content

Commit

Permalink
Fixed smtp for python 3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
giga-a authored Sep 12, 2024
1 parent d452b94 commit 83df0c3
Showing 1 changed file with 58 additions and 75 deletions.
133 changes: 58 additions & 75 deletions honeypots/smtp_server.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from __future__ import annotations

import socket
from asyncore import loop
from base64 import b64decode
from contextlib import suppress
from smtpd import SMTPChannel, SMTPServer

from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.mail.smtp import ESMTP
from honeypots.base_server import BaseServer
from honeypots.helper import (
server_arguments,
)

from honeypots.helper import check_bytes, server_arguments

class QSMTPServer(BaseServer):
NAME = "smtp_server"
DEFAULT_PORT = 25

def __init__(self, **kwargs):
super().__init__(**kwargs)

def server_main(self): # noqa: C901
_q_s = self

class CustomSMTPChannel(SMTPChannel):
class CustomSMTPProtocol(ESMTP):
def __init__(self, *args, **kwargs):
fun = None
try:
Expand All @@ -29,75 +31,57 @@ def __init__(self, *args, **kwargs):
if fun:
socket.getfqdn = fun

def found_terminator(self):
with suppress(Exception):
if "capture_commands" in _q_s.options:
line = self._emptystring.join(self.received_lines).decode(errors="ignore")
command, *rest = line.split(" ")
arg = rest[0] if rest else None
data = rest[1] if len(rest) > 1 else None
if command.upper() not in {"HELO", "EHLO"}:
_q_s.log(
{
"action": "connection",
"src_ip": self.addr[0],
"src_port": self.addr[1],
"data": {"command": command, "arg": arg, "data": data},
}
)
super().found_terminator()

def smtp_EHLO(self, arg): # noqa: N802
_q_s.log(
{
"action": "connection",
"src_ip": self.addr[0],
"src_port": self.addr[1],
}
)
if not arg:
self.push("501 Syntax: HELO hostname")
if self.seen_greeting:
self.push("503 Duplicate HELO/EHLO")
else:
self.seen_greeting = arg
self.push(f"250-{self.fqdn} Hello {arg}")
self.push("250-8BITMIME")
self.push("250-AUTH LOGIN PLAIN")
self.push("250 STARTTLS")

def smtp_AUTH(self, arg): # noqa: N802
with suppress(Exception):
if arg.startswith("PLAIN "):
_, username, password = (
b64decode(arg.split(" ")[1].strip())
.decode("utf-8", errors="replace")
.split("\0")
)
_q_s.check_login(username, password, *self.addr)

self.push("235 Authentication successful")

def __getattr__(self, name):
self.smtp_QUIT(0)

class CustomSMTPServer(SMTPServer):
def process_message(self, *_, **__):
return

def handle_accept(self):
conn, addr = self.accept()
def connectionMade(self): # noqa: N802
_q_s.log(
{
"action": "connection",
"src_ip": addr[0],
"src_port": addr[1],
"src_ip": self.transport.getPeer().host,
"src_port": self.transport.getPeer().port,
}
)
CustomSMTPChannel(self, conn, addr)

CustomSMTPServer((self.ip, self.port), None)
loop(timeout=1.1, use_poll=True)
super().connectionMade()

def state_COMMAND(self, line):
command, *rest = line.split(b" ")
arg = rest[0] if rest else None
data = rest[1] if len(rest) > 1 else None
if command.upper() not in {b"HELO", b"EHLO"}:
_q_s.log(
{
"action": "connection",
"src_ip": self.transport.getPeer().host,
"src_port": self.transport.getPeer().port,
"data": {"command": command, "arg": arg, "data": data},
}
)
super().state_COMMAND(line)

def do_EHLO(self, arg):
self.sendCode(250, f"ip-127-0-0-1.ec2.internal Hello {arg}\n8BITMIME\nAUTH LOGIN PLAIN\nSTARTTLS".encode())

def ext_AUTH(self, arg):
if arg.startswith(b"PLAIN "):
_, username, password = (
b64decode(arg.split(b" ")[1].strip())
.decode("utf-8", errors="replace")
.split("\0")
)
_q_s.check_login(username, password, self.transport.getPeer().host, self.transport.getPeer().port)
self.sendCode(235,b'Authentication successful.')

class CustomSMTPFactory(Factory):
protocol = CustomSMTPProtocol
portal = None

def buildProtocol(self, address): # noqa: N802,ARG002
p = self.protocol()
p.portal = self.portal
p.factory = self
return p

factory = CustomSMTPFactory()
reactor.listenTCP(port=self.port, factory=factory, interface=self.ip)
reactor.run()

def test_server(self, ip=None, port=None, username=None, password=None):
with suppress(Exception):
Expand All @@ -110,19 +94,18 @@ def test_server(self, ip=None, port=None, username=None, password=None):
s = SMTP(_ip, _port)
s.ehlo()
s.login(_username, _password)
s.sendmail("fromtest", "totest", "Nothing")
s.quit()


if __name__ == "__main__":
parsed = server_arguments()
if parsed.docker or parsed.aws or parsed.custom:
qsmtpserver = QSMTPServer(
QSMTPserver = QSMTPServer(
ip=parsed.ip,
port=parsed.port,
username=parsed.username,
password=parsed.password,
options=parsed.options,
config=parsed.config,
)
qsmtpserver.run_server()
QSMTPserver.run_server()

0 comments on commit 83df0c3

Please sign in to comment.