diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..b84285a --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,32 @@ +name: Tests CI +run-name: Tests CI +on: + pull_request: + branches: + - main + push: + branches: + - main + workflow_dispatch: + +jobs: + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11"] + fail-fast: false + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Display Python version + run: python -c "import sys; print(sys.version)" + - name: Install dependencies + run: python -m pip install --upgrade pip setuptools wheel + - name: Installation + run: python -m pip install ".[test]" + - name: Unit Tests + run: pytest -v ./tests diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..40b1dff --- /dev/null +++ b/.gitignore @@ -0,0 +1,67 @@ +### Python template +__pycache__/ + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDEs +.spyderproject +.spyproject +.idea/ +.vscode/ +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ diff --git a/honeypots/dhcp_server.py b/honeypots/dhcp_server.py index 759d8d9..89d0fdb 100644 --- a/honeypots/dhcp_server.py +++ b/honeypots/dhcp_server.py @@ -9,22 +9,19 @@ // contributors list qeeqbox/honeypots/graphs/contributors // ------------------------------------------------------------- ''' - from warnings import filterwarnings filterwarnings(action='ignore', module='.*OpenSSL.*') filterwarnings(action='ignore', module='.*socket.*') from twisted.internet.protocol import DatagramProtocol from twisted.internet import reactor -from time import time from twisted.python import log as tlog -from struct import unpack +from struct import unpack, error as StructError from socket import inet_aton from subprocess import Popen from os import path, getenv from honeypots.helper import close_port_wrapper, get_free_port, kill_server_wrapper, server_arguments, setup_logger, disable_logger, set_local_vars, check_if_server_is_running from uuid import uuid4 -from contextlib import suppress class QDHCPServer(): @@ -95,7 +92,10 @@ def parse_options(self, raw): return options def datagramReceived(self, data, addr): - mac_address = unpack('!28x6s', data[:34])[0].hex(':') + try: + mac_address = unpack('!28x6s', data[:34])[0].hex(':') + except StructError: + mac_address = "None" data = self.parse_options(data[240:]) data.update({'mac_address': mac_address}) _q_s.logs.info({'server': 'dhcp_server', 'action': 'query', 'status': 'success', 'src_ip': addr[0], 'src_port': addr[1], 'dest_ip': _q_s.ip, 'dest_port': _q_s.port, 'data': data}) diff --git a/honeypots/https_server.py b/honeypots/https_server.py index 05c3a72..30b4d85 100644 --- a/honeypots/https_server.py +++ b/honeypots/https_server.py @@ -177,7 +177,7 @@ def check_bytes(string): request.responseHeaders.addRawHeader('Server', _q_s.mocking_server) if request.method == b'GET' or request.method == b'POST': - _q_s.logs.info({'server': 'https_server', 'action': request.method.decode(), 'src_ip': request.client_ip, 'src_port': request.getClientAddress().port, 'dest_ip': _q_s.ip, 'dest_port': _q_s.port}) + _q_s.logs.info({'server': 'https_server', 'action': request.method.decode(), 'src_ip': client_ip, 'src_port': request.getClientAddress().port, 'dest_ip': _q_s.ip, 'dest_port': _q_s.port}) if request.method == b'GET': if request.uri == b'/login.html': diff --git a/setup.py b/setup.py index 859031d..4d7afde 100644 --- a/setup.py +++ b/setup.py @@ -4,37 +4,44 @@ long_description = f.read() setup( - name='honeypots', - author='QeeqBox', - author_email='gigaqeeq@gmail.com', + name="honeypots", + author="QeeqBox", + author_email="gigaqeeq@gmail.com", description=r"30 different honeypots in one package! (dhcp, dns, elastic, ftp, http proxy, https proxy, http, https, imap, ipp, irc, ldap, memcache, mssql, mysql, ntp, oracle, pjl, pop3, postgres, rdp, redis, sip, smb, smtp, snmp, socks5, ssh, telnet, vnc)", long_description=long_description, - version='0.64', + version="0.64", license="AGPL-3.0", - license_files=('LICENSE'), + license_files=("LICENSE"), url="https://github.com/qeeqbox/honeypots", - packages=['honeypots'], - entry_points={ - "console_scripts": [ - 'honeypots=honeypots.__main__:main_logic' - ] - }, + packages=["honeypots"], + entry_points={"console_scripts": ["honeypots=honeypots.__main__:main_logic"]}, include_package_data=True, install_requires=[ - 'twisted==21.7.0', - 'psutil==5.9.0', - 'psycopg2-binary==2.9.3', - 'pycrypto==2.6.1', - 'requests==2.28.2', - 'requests[socks]==2.28.2', - 'impacket==0.9.24', - 'paramiko==3.1.0', - 'scapy==2.4.5', - 'service_identity==21.1.0', - 'netifaces==0.11.0' + "twisted==21.7.0", + "psutil==5.9.0", + "psycopg2-binary==2.9.3", + "pycryptodome==3.19.0", + "requests==2.28.2", + "requests[socks]==2.28.2", + "impacket==0.9.24", + "paramiko==3.1.0", + "scapy==2.4.5", + "service_identity==21.1.0", + "netifaces==0.11.0", ], extras_require={ - 'test': ['redis', 'mysql-connector', 'elasticsearch', 'pymssql', 'ldap3', 'pysnmp'] + "test": [ + "redis", + "mysql-connector", + "dnspython==2.4.2", + "elasticsearch", + "pymssql", + "ldap3", + "pysnmplib", + "pytest", + "redis", + "vncdotool", + ] }, - python_requires='>=3.5' + python_requires=">=3.8", ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2358328 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import json +from contextlib import contextmanager +from multiprocessing import Process +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Iterator + +import pytest + +from .utils import IP, PASSWORD, USERNAME + + +@contextmanager +def config_for_testing(custom_config: dict) -> Iterator[Path]: + with TemporaryDirectory() as tmp_dir: + config = Path(tmp_dir) / "config.json" + logs_output_dir = Path(tmp_dir) / "logs" + logs_output_dir.mkdir() + testing_config = { + "logs": "file,terminal,json", + "logs_location": str(logs_output_dir.absolute()), + **custom_config, + } + config.write_text(json.dumps(testing_config)) + yield config + + +@pytest.fixture +def server_logs(request): + custom_config = request.param.get("custom_config", {}) + with config_for_testing(custom_config) as config_file: + _server = request.param["server"]( + ip=IP, + port=request.param["port"], + username=USERNAME, + password=PASSWORD, + options="", + config=str(config_file.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield config_file.parent / "logs" + server_process.terminate() + server_process.join() diff --git a/tests/test_dhcp_server.py b/tests/test_dhcp_server.py new file mode 100644 index 0000000..c43511f --- /dev/null +++ b/tests/test_dhcp_server.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QDHCPServer +from .utils import ( + connect_to, + EXPECTED_KEYS, + IP, + load_logs_from_file, +) + +PORT = "50067" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QDHCPServer, "port": PORT}], + indirect=True, +) +def test_dhcp_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT, udp=True) as connection: + connection.send(b"\x03" * 240) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 1 + (query,) = logs + assert all(k in query for k in EXPECTED_KEYS) + assert query["action"] == "query" + assert query["status"] == "success" + assert query["data"] == {"mac_address": "03:03:03:03:03:03"} diff --git a/tests/test_dns_server.py b/tests/test_dns_server.py new file mode 100644 index 0000000..6ecde93 --- /dev/null +++ b/tests/test_dns_server.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from time import sleep + +import pytest +from dns.resolver import Resolver + +from honeypots import QDNSServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, +) + +PORT = "50053" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QDNSServer, "port": PORT}], + indirect=True, +) +def test_dns_server(server_logs): + sleep(1) # give the server some time to start + + resolver = Resolver(configure=False) + resolver.nameservers = [IP] + resolver.port = int(PORT) + domain = "example.org" + response = resolver.resolve(domain, "a") + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert "data" in query + assert "\x01,\x0cA \x00\xff\xff\x7f\x08\x00" + "\x00\x01\x00\x00\xc8\x00J\x00\x00\x14\x00AA\xa7C\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" + f"(DESCRIPTION=(CONNECT_DATA=(SERVICE_NAME={SERVICE})(CID=(PROGRAM={PROGRAM})(HOST=xxxxxxxxxxxxxx)" + f"(USER={USERNAME}))(CONNECTION_ID=xxxxxxxxxxxxxxxxxxxxxxxx))(ADDRESS=(PROTOCOL=tcp)(HOST={IP})(PORT={PORT})))" + ) + with connect_to(IP, PORT) as connection: + connection.send(payload.encode()) + response, _ = connection.recvfrom(10000) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + + assert login["action"] == "login" + assert login["data"] == {"local_user": USERNAME, "program": PROGRAM, "service_name": SERVICE} + + assert response == b"\x00\x08\x00\x00\x04\x00\x00\x00" diff --git a/tests/test_pjl_server.py b/tests/test_pjl_server.py new file mode 100644 index 0000000..7940060 --- /dev/null +++ b/tests/test_pjl_server.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QPJLServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, +) + +PORT = "59100" +SERVER_CONFIG = { + "honeypots": { + "pjl": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPJLServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_pjl_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.send(b"\x1b%-12345X@PJL prodinfo") + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert query["data"] == {"command": "@PJL prodinfo"} + assert query["status"] == "success" diff --git a/tests/test_pop3_server.py b/tests/test_pop3_server.py new file mode 100644 index 0000000..220a02c --- /dev/null +++ b/tests/test_pop3_server.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from contextlib import suppress +from poplib import error_proto, POP3 +from time import sleep + +import pytest + +from honeypots import QPOP3Server +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50110" +SERVER_CONFIG = { + "honeypots": { + "pop3": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPOP3Server, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_pop3_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(error_proto): + client = POP3(IP, int(PORT)) + client.user(USERNAME) + client.pass_(PASSWORD) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 4 + connect, cmd1, cmd2, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) + + assert cmd1["action"] == "command" + assert cmd1["data"] == {"args": "testing", "cmd": "USER"} + assert cmd2["action"] == "command" + assert cmd2["data"] == {"args": "testing", "cmd": "PASS"} diff --git a/tests/test_postgres_server.py b/tests/test_postgres_server.py new file mode 100644 index 0000000..0e84791 --- /dev/null +++ b/tests/test_postgres_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pytest +from psycopg2 import connect, OperationalError + +from honeypots import QPostgresServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "55432" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPostgresServer, "port": PORT}], + indirect=True, +) +def test_postgres_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(OperationalError): + db = connect(host=IP, port=PORT, user=USERNAME, password=PASSWORD) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect_, login = logs + assert_connect_is_logged(connect_, PORT) + assert_login_is_logged(login) diff --git a/tests/test_rdp_server.py b/tests/test_rdp_server.py new file mode 100644 index 0000000..774c878 --- /dev/null +++ b/tests/test_rdp_server.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QRDPServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, +) + +PORT = "53389" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QRDPServer, "port": PORT}], + indirect=True, +) +def test_rdp_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.send(b"test") + connection.send( + b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00" + ) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, stshash = logs + assert_connect_is_logged(connect, PORT) + + assert stshash["action"] == "stshash" + assert stshash["mstshash"] == "success" + assert "stshash" in stshash["data"] + assert "foobar" in stshash["data"]["stshash"] diff --git a/tests/test_redis_server.py b/tests/test_redis_server.py new file mode 100644 index 0000000..4ab3294 --- /dev/null +++ b/tests/test_redis_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pytest +from redis import AuthenticationError, StrictRedis + +from honeypots import QRedisServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "56379" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QRedisServer, "port": PORT}], + indirect=True, +) +def test_redis_server(server_logs): + with suppress(AuthenticationError): + redis = StrictRedis.from_url(f"redis://{USERNAME}:{PASSWORD}@{IP}:{PORT}/1") + for _ in redis.scan_iter("user:*"): + pass + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_sip_server.py b/tests/test_sip_server.py new file mode 100644 index 0000000..111cb6d --- /dev/null +++ b/tests/test_sip_server.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QSIPServer +from .utils import connect_to, IP, load_logs_from_file + +PORT = "55060" +EXPECTED_KEYS = ("action", "server", "src_ip", "src_port", "timestamp") +CALL_ID = "1@0.0.0.0" +CONTACT = "sip:user_3@test.test.test" +FROM = f"{CONTACT};tag=none" +TO = "" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSIPServer, "port": PORT}], + indirect=True, +) +def test_sip_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT, udp=True) as connection: + payload = ( + "INVITE sip:user_1@test.test SIP/2.0\r\n" + f"To: {TO}\r\n" + f"From: {FROM}\r\n" + f"Call-ID: {CALL_ID}\r\n" + "CSeq: 1 INVITE\r\n" + f"Contact: {CONTACT}\r\n" + "Via: SIP/2.0/TCP 0.0.0.0;branch=34uiddhjczqw3mq23\r\n" + "Content-Length: 1\r\n" + "\r\n" + "T" + ) + connection.send(payload.encode()) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, request = logs + + assert all(k in connect for k in EXPECTED_KEYS) + assert connect["action"] == "connection" + assert connect["server"] == "sip_server" + + assert request["action"] == "request" + assert request["src_ip"] == IP + assert "data" in request + assert request["data"]["call-id"] == CALL_ID + assert request["data"]["contact"] == CONTACT + assert request["data"]["from"] == FROM + assert request["data"]["to"] == TO diff --git a/tests/test_smb_server.py b/tests/test_smb_server.py new file mode 100644 index 0000000..883871e --- /dev/null +++ b/tests/test_smb_server.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from time import sleep + +import pytest +from impacket.smbconnection import SMBConnection + +from honeypots import QSMBServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50445" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSMBServer, "port": PORT}], + indirect=True, +) +def test_smb_server(server_logs): + sleep(5) # give the server some time to start + + smb_client = SMBConnection(IP, IP, sess_port=PORT) + smb_client.login(USERNAME, PASSWORD) + smb_client.close() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 3 + for entry in logs: + assert_connect_is_logged(entry, PORT) + + assert "Incoming connection" in logs[0]["data"] + assert "AUTHENTICATE_MESSAGE" in logs[1]["data"] + assert "authenticated successfully" in logs[2]["data"] diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py new file mode 100644 index 0000000..ed041c0 --- /dev/null +++ b/tests/test_smtp_server.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from base64 import b64decode +from smtplib import SMTP +from time import sleep + +import pytest + +from honeypots import QSMTPServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50025" +SERVER_CONFIG = { + "honeypots": { + "smtp": { + "options": ["capture_commands"], + }, + } +} +EXPECTED_DATA = [ + {"arg": "FROM:", "command": "MAIL", "data": "None"}, + {"arg": "TO:", "command": "RCPT", "data": "None"}, + {"arg": "None", "command": "DATA", "data": "None"}, + {"arg": "None", "command": "NOTHING", "data": "None"}, + {"arg": "None", "command": "QUIT", "data": "None"}, +] + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSMTPServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_smtp_server(server_logs): + sleep(1) # give server time to start + + client = SMTP(IP, int(PORT)) + client.ehlo() + client.login(USERNAME, PASSWORD) + client.sendmail("fromtest", "totest", "Nothing") + client.quit() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 8 + connect, auth, login, *additional = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) + + assert auth["data"]["command"] == "AUTH" + assert b64decode(auth["data"]["data"]).decode() == f"\x00{USERNAME}\x00{PASSWORD}" + + for entry, expected in zip(additional, EXPECTED_DATA): + assert entry diff --git a/tests/test_snmp_server.py b/tests/test_snmp_server.py new file mode 100644 index 0000000..2ad05c7 --- /dev/null +++ b/tests/test_snmp_server.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import pytest +from pysnmp.hlapi import CommunityData, ContextData, getCmd, ObjectIdentity, ObjectType, SnmpEngine, UdpTransportTarget + +from honeypots import QSNMPServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, +) + +PORT = "50161" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSNMPServer, "port": PORT}], + indirect=True, +) +def test_snmp_server(server_logs): + g = getCmd( + SnmpEngine(), + CommunityData("public"), + UdpTransportTarget((IP, int(PORT))), + ContextData(), + ObjectType(ObjectIdentity("1.3.6.1.4.1.9.9.618.1.4.1.0")), + ) + next(g) + + logs = load_logs_from_file(server_logs) + + assert len(logs) >= 2 + connect, query, *_ = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert query["data"] == {"community": "public", "oids": "1.3.6.1.4.1.9.9.618.1.4.1.0", "version": "1"} diff --git a/tests/test_socks5_server.py b/tests/test_socks5_server.py new file mode 100644 index 0000000..b8de9a9 --- /dev/null +++ b/tests/test_socks5_server.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pytest +import requests + +from honeypots import QSOCKS5Server +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "51080" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSOCKS5Server, "port": PORT}], + indirect=True, +) +def test_socks5_server(server_logs): + with suppress(requests.exceptions.ConnectionError): + requests.get( + "http://127.0.0.1/", + proxies={"http": f"socks5://{USERNAME}:{PASSWORD}@{IP}:{PORT}"}, + ) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py new file mode 100644 index 0000000..5f32cfc --- /dev/null +++ b/tests/test_ssh_server.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from time import sleep + +import pytest +from paramiko import AutoAddPolicy, SSHClient + +from honeypots import QSSHServer +from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD, USERNAME + +PORT = 50022 +SERVER_CONFIG = { + "honeypots": { + "ssh": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSSHServer, "port": str(PORT), "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_ssh_server(server_logs): + sleep(1) # give the server some time to start + + ssh = SSHClient() + ssh.set_missing_host_key_policy(AutoAddPolicy()) + ssh.connect(IP, port=PORT, username=USERNAME, password=PASSWORD) + ssh.close() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, str(PORT)) + + assert login["action"] == "login" + assert login["username"] == USERNAME diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py new file mode 100644 index 0000000..a21d339 --- /dev/null +++ b/tests/test_telnet_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from telnetlib import Telnet +from time import sleep + +import pytest + +from honeypots import QTelnetServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50023" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QTelnetServer, "port": PORT}], + indirect=True, +) +def test_telnet_server(server_logs): + telnet_client = Telnet(IP, int(PORT)) + telnet_client.read_until(b"login: ") + telnet_client.write(USERNAME.encode() + b"\n") + telnet_client.read_until(b"Password: ") + telnet_client.write(PASSWORD.encode() + b"\n") + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_vnc_server.py b/tests/test_vnc_server.py new file mode 100644 index 0000000..834f40f --- /dev/null +++ b/tests/test_vnc_server.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from multiprocessing import Process +from time import sleep + +import pytest +from vncdotool import api + +from honeypots import QVNCServer +from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD + +PORT = "55900" + + +def _connect_to_vnc(): + client = api.connect(f"{IP}::{PORT}", password=PASSWORD) + client.disconnect() + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QVNCServer, "port": PORT}], + indirect=True, +) +def test_vnc_server(server_logs): + # This VNC API creates a blocking daemon thread that can't be trivially stopped, + # so we just run it in a process and terminate that instead + process = Process(target=_connect_to_vnc) + process.start() + sleep(1) # give the server process some time to write logs + process.terminate() + process.join(timeout=5) + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 1 + assert_connect_is_logged(logs[0], PORT) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..a273a82 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import json +from contextlib import contextmanager +from pathlib import Path +from socket import AF_INET, IPPROTO_UDP, SOCK_DGRAM, SOCK_STREAM, socket + +IP = "127.0.0.1" +USERNAME = "testing" +PASSWORD = "testing" +EXPECTED_KEYS = ("action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp") + + +def load_logs_from_file(log_folder: Path) -> list[dict]: + log_files = [f for f in log_folder.iterdir()] + assert len(log_files) == 1 + log_file = log_files[0] + logs = [] + for line in log_file.read_text().splitlines(): + if not line: + continue + logs.append(json.loads(line)) + return logs + + +@contextmanager +def connect_to(host: str, port: str, udp: bool = False) -> socket: + connection = None + try: + if udp: + connection = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + else: + connection = socket(AF_INET, SOCK_STREAM) + connection.connect((host, int(port))) + yield connection + finally: + if connection: + connection.close() + + +def assert_connect_is_logged( + connect: dict[str, str], port: str, expected_keys: list[str] | tuple[str, ...] = EXPECTED_KEYS +): + assert all(k in connect for k in expected_keys) + assert connect["dest_ip"] == IP + assert connect["dest_port"] == port + assert connect["action"] == "connection" + + +def assert_login_is_logged(login: dict[str, str]): + assert all(k in login for k in ("username", "password")) + assert login["action"] == "login" + assert login["username"] == USERNAME + assert login["password"] == PASSWORD + assert login["status"] == "success"