From 374b1d975d94f6bb8c6e6549574144ff6db4e576 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 22 Jan 2024 11:26:41 +0100 Subject: [PATCH 1/5] improve config error handling --- honeypots/__main__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/honeypots/__main__.py b/honeypots/__main__.py index fb70227..76d6ed3 100755 --- a/honeypots/__main__.py +++ b/honeypots/__main__.py @@ -7,7 +7,7 @@ from argparse import ArgumentParser, SUPPRESS, Namespace from atexit import register from functools import wraps -from json import loads +from json import JSONDecodeError, loads from os import geteuid from pathlib import Path from signal import alarm, SIGALRM, SIGINT, signal, SIGTERM, SIGTSTP @@ -182,9 +182,14 @@ def _load_config(self): logger.error(f'Config file "{config_path}" not found') sys.exit(1) try: - return loads(config_path.read_text()) - except Exception as error: - logger.exception(f"Unable to load or parse config.json file: {error}") + config_data = loads(config_path.read_text()) + logger.info(f"Successfully loaded config file {config_path}") + return config_data + except FileNotFoundError: + logger.error(f"Unable to load config file: File {config_path} not found") + sys.exit(1) + except JSONDecodeError as error: + logger.error(f"Unable to parse config file as JSON: {error}") sys.exit(1) def _set_up_honeypots(self): # noqa: C901 From 297bdb69f939a1ce678fddeb5d39b36618d92612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 22 Jan 2024 11:28:41 +0100 Subject: [PATCH 2/5] http proxy: added custom template option --- honeypots/http_proxy_server.py | 124 ++++++++++++++++---------------- tests/data/test_template.html | 10 +++ tests/test_http_proxy_server.py | 39 +++++++++- 3 files changed, 112 insertions(+), 61 deletions(-) create mode 100644 tests/data/test_template.html diff --git a/honeypots/http_proxy_server.py b/honeypots/http_proxy_server.py index ee1a418..7feb9a2 100644 --- a/honeypots/http_proxy_server.py +++ b/honeypots/http_proxy_server.py @@ -9,19 +9,23 @@ // contributors list qeeqbox/honeypots/graphs/contributors // ------------------------------------------------------------- """ +from __future__ import annotations + from pathlib import Path +from shlex import split -from dns.resolver import query as dsnquery +from dns.resolver import resolve as dsnquery from twisted.internet import reactor from twisted.internet.protocol import Protocol, Factory from subprocess import Popen from email.parser import BytesParser -from os import path, getenv +from os import getenv from honeypots.helper import ( close_port_wrapper, get_free_port, kill_server_wrapper, server_arguments, + set_up_error_logging, setup_logger, set_local_vars, check_if_server_is_running, @@ -34,11 +38,14 @@ class QHTTPProxyServer: + NAME = "http_proxy_server" + def __init__(self, **kwargs): self.auto_disabled = None self.process = None self.uuid = "honeypotslogger" + "_" + __class__.__name__ + "_" + str(uuid4())[:8] self.config = kwargs.get("config", "") + self.template: str | None = None if self.config: self.logs = setup_logger(__class__.__name__, self.uuid, self.config) set_local_vars(self, self.config) @@ -56,6 +63,20 @@ def __init__(self, **kwargs): or getenv("HONEYPOTS_OPTIONS", "") or "" ) + self.logger = set_up_error_logging() + self.template_contents: str | None = self._load_template() + + def _load_template(self) -> str | None: + if self.template: + try: + template_contents = Path(self.template).read_text(errors="replace") + self.logger.debug( + f"[{self.NAME}]: Successfully loaded custom template {self.template}" + ) + return template_contents + except FileNotFoundError: + self.logger.error(f"[{self.NAME}]: Template file {self.template} not found") + return None def http_proxy_server_main(self): _q_s = self @@ -72,7 +93,7 @@ def resolve_domain(self, request_string): host = headers["host"].split(":") _q_s.logs.info( { - "server": "http_proxy_server", + "server": _q_s.NAME, "action": "query", "src_ip": self.transport.getPeer().host, "src_port": self.transport.getPeer().port, @@ -81,14 +102,13 @@ def resolve_domain(self, request_string): "data": host[0], } ) - # return '127.0.0.1' return dsnquery(host[0], "A")[0].address return None - def dataReceived(self, data): + def dataReceived(self, data): # noqa: N802 _q_s.logs.info( { - "server": "http_proxy_server", + "server": _q_s.NAME, "action": "connection", "src_ip": self.transport.getPeer().host, "src_port": self.transport.getPeer().port, @@ -98,7 +118,7 @@ def dataReceived(self, data): ) ip = self.resolve_domain(data) if ip: - self.write(_create_dummy_response(DUMMY_TEMPLATE)) + self.write(_create_dummy_response(_q_s.template_contents or DUMMY_TEMPLATE)) else: self.transport.loseConnection() @@ -110,74 +130,58 @@ def dataReceived(self, data): def write(self, data): self.transport.write(data) - def write(self, data): - self.transport.write(data) - factory = Factory() factory.protocol = CustomProtocolParent reactor.listenTCP(port=self.port, factory=factory, interface=self.ip) reactor.run() - def run_server(self, process=False, auto=False): + def run_server(self, process=False, auto=False) -> bool | None: status = "error" run = False - if process: - if auto and not self.auto_disabled: - port = get_free_port() - if port > 0: - self.port = port - run = True - elif self.close_port() and self.kill_server(): - run = True + if not process: + self.http_proxy_server_main() + return None - if run: - self.process = Popen( - [ - "python3", - path.realpath(__file__), - "--custom", - "--ip", - str(self.ip), - "--port", - str(self.port), - "--options", - str(self.options), - "--config", - str(self.config), - "--uuid", - str(self.uuid), - ] + if auto and not self.auto_disabled: + port = get_free_port() + if port > 0: + self.port = port + run = True + elif self.close_port() and self.kill_server(): + run = True + + if run: + self.process = Popen( + split( + f"python3 {Path(__file__)} --custom --ip {self.ip} --port {self.port} " + f"--options {self.options} --config {self.config} --uuid {self.uuid}" ) - if self.process.poll() is None and check_if_server_is_running(self.uuid): - status = "success" - - self.logs.info( - { - "server": "http_proxy_server", - "action": "process", - "status": status, - "src_ip": self.ip, - "src_port": self.port, - "dest_ip": self.ip, - "dest_port": self.port, - } ) + if self.process.poll() is None and check_if_server_is_running(self.uuid): + status = "success" + + self.logs.info( + { + "server": self.NAME, + "action": "process", + "status": status, + "src_ip": self.ip, + "src_port": self.port, + "dest_ip": self.ip, + "dest_port": self.port, + } + ) - if status == "success": - return True - else: - self.kill_server() - return False - else: - self.http_proxy_server_main() + if status == "success": + return True + self.kill_server() + return False def close_port(self): - ret = close_port_wrapper("http_proxy_server", self.ip, self.port, self.logs) - return ret + return close_port_wrapper(self.NAME, self.ip, self.port, self.logs) def kill_server(self): - ret = kill_server_wrapper("http_proxy_server", self.uuid, self.process) - return ret + return kill_server_wrapper(self.NAME, self.uuid, self.process) def test_server(self, ip=None, port=None, domain=None): with suppress(Exception): diff --git a/tests/data/test_template.html b/tests/data/test_template.html new file mode 100644 index 0000000..36727f6 --- /dev/null +++ b/tests/data/test_template.html @@ -0,0 +1,10 @@ + + + + + Test Template Title + + +

This is a template for testing!

+ + diff --git a/tests/test_http_proxy_server.py b/tests/test_http_proxy_server.py index 4310925..bee0f7b 100644 --- a/tests/test_http_proxy_server.py +++ b/tests/test_http_proxy_server.py @@ -1,5 +1,6 @@ from __future__ import annotations +from pathlib import Path from time import sleep import pytest @@ -40,7 +41,7 @@ def test_http_proxy_server(server_logs): logs = load_logs_from_file(server_logs) - assert len(logs) == 2 + assert len(logs) == 2 # noqa: PLR2004 connect, query = logs assert_connect_is_logged(connect, PORT) @@ -49,3 +50,39 @@ def test_http_proxy_server(server_logs): assert response.ok assert "Example Website" in response.text, "dummy response is missing" + + +CUSTOM_TEMPLATE = Path(__file__).parent / "data" / "test_template.html" +CUSTOM_TEMPLATE_CONFIG = { + "honeypots": { + "httpproxy": { + "template": str(CUSTOM_TEMPLATE.absolute()), + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [ + { + "server": QHTTPProxyServer, + "port": str(int(PORT) + 1), + "custom_config": CUSTOM_TEMPLATE_CONFIG, + } + ], + indirect=True, +) +def test_custom_template(server_logs): # noqa: ARG001 + sleep(1) # give the server some time to start + + response = requests.get( + "http://example.com/", + proxies={"http": f"http://{IP}:{int(PORT) + 1!s}"}, + timeout=2, + ) + + sleep(1) # give the server process some time to write logs + + assert response.ok + assert "This is a template for testing!" in response.text, "custom template is missing" From a097421db08b1f625d5caabca0d85ab8a3134795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 22 Jan 2024 11:29:18 +0100 Subject: [PATCH 3/5] updated readme for custom template option + fixed typos --- README.md | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 3da7fe6..f0ce5b6 100644 --- a/README.md +++ b/README.md @@ -253,19 +253,21 @@ qsshserver.kill_server() - Port: 21/tcp - Lib: Twisted.ftp - Logs: ip, port, username and password (default) - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QHTTPProxyServer - Server: HTTP Proxy - Port: 8080/tcp - Lib: Twisted (low level emulation) - Logs: ip, port and data - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) + - Returns a dummy template by default + - A custom template can be provided by setting `"template"` for this server in `config.json` (should be an absolute path) - QHTTPServer - Server: HTTP - Port: 80/tcp - Lib: Twisted.http - Logs: ip, port, username and password - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QHTTPSServer - Server: HTTPS - Port: 443/tcp @@ -276,7 +278,7 @@ qsshserver.kill_server() - Port: 143/tcp - Lib: Twisted.imap - Logs: ip, port, username and password (default) - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QMysqlServer - Emulator: Mysql - Port: 3306/tcp @@ -287,7 +289,7 @@ qsshserver.kill_server() - Port: 110/tcp - Lib: Twisted.pop3 - Logs: ip, port, username and password (default) - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QPostgresServer - Emulator: Postgres - Port: 5432/tcp @@ -308,7 +310,7 @@ qsshserver.kill_server() - Port: 25/tcp - Lib: smtpd - Logs: ip, port, username and password (default) - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QSOCKS5Server - Server: SOCK5 - Port: 1080/tcp @@ -319,7 +321,7 @@ qsshserver.kill_server() - Port: 22/tcp - Lib: paramiko - Logs: ip, port, username and password - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QTelnetServer - Server: Telnet - Port: 23/tcp @@ -359,7 +361,7 @@ qsshserver.kill_server() - Emulator: Oracle - Port: 1521/tcp - Lib: Twisted (low level emulation) - - Logs: ip, port and connet data + - Logs: ip, port and connect data - QSNMPServer - Emulator: SNMP - Port: 161/udp @@ -370,31 +372,31 @@ qsshserver.kill_server() - Port: 5060/udp - Lib: Twisted.sip - Logs: ip, port and data - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QIRCServer - Emulator: IRC - Port: 6667/tcp - Lib: Twisted.irc - Logs: ip, port, username and password - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QPJLServer - Emulator: PJL - Port: 9100/tcp - Lib: Twisted - Logs: ip, port - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QIPPServer - Emulator: IPP - Port: 631/tcp - Lib: Twisted - - Logs: ip, por - - Options: Capture all threat actor commands and data (avalible) + - Logs: ip, port + - Options: Capture all threat actor commands and data (available) - QRDPServer - Emulator: RDP - Port: 3389/tcp - Lib: Sockets - Logs: ip, port, username and password - - Options: Capture all threat actor commands and data (avalible) + - Options: Capture all threat actor commands and data (available) - QDHCPServer - Emulator: DHCP - Port: 67/udp From 583327728d6b41462d8c38eab87f7d84fcc676ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 22 Jan 2024 13:39:28 +0100 Subject: [PATCH 4/5] tests: speed up tests by optimizing waiting time --- pyproject.toml | 4 ++++ tests/test_dhcp_server.py | 9 ++------ tests/test_dns_server.py | 26 ++++++++++------------- tests/test_elastic_server.py | 8 ++----- tests/test_ftp_server.py | 17 +++++++-------- tests/test_http_proxy_server.py | 37 ++++++++++++++------------------- tests/test_http_server.py | 14 +++++-------- tests/test_https_server.py | 14 +++++-------- tests/test_imap_server.py | 8 ++----- tests/test_ipp_server.py | 37 +++++++++++++++------------------ tests/test_irc_server.py | 9 ++------ tests/test_ldap_server.py | 8 ++----- tests/test_memcache_server.py | 9 ++------ tests/test_mssql_server.py | 8 ++----- tests/test_mysql_server.py | 8 ++----- tests/test_ntp_server.py | 9 +++----- tests/test_oracle_server.py | 29 ++++++++++++-------------- tests/test_pjl_server.py | 9 ++------ tests/test_pop3_server.py | 8 ++----- tests/test_postgres_server.py | 10 +++------ tests/test_rdp_server.py | 12 ++++------- tests/test_redis_server.py | 6 ++---- tests/test_sip_server.py | 10 ++------- tests/test_smb_server.py | 14 +++++-------- tests/test_smtp_server.py | 19 +++++++---------- tests/test_snmp_server.py | 18 +++++++++------- tests/test_socks5_server.py | 6 ++---- tests/test_ssh_server.py | 24 +++++++++++---------- tests/test_telnet_server.py | 17 +++++++-------- tests/test_vnc_server.py | 9 ++++---- tests/utils.py | 31 +++++++++++++++++++++++++-- 31 files changed, 190 insertions(+), 257 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8e7dbf7..19d98bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,3 +144,7 @@ exclude = [ ] line-length = 99 target-version = "py38" + +[tool.ruff.lint.per-file-ignores] +# don't check for "magic value" in tests +"tests/*" = ["PLR2004"] diff --git a/tests/test_dhcp_server.py b/tests/test_dhcp_server.py index c43511f..743d365 100644 --- a/tests/test_dhcp_server.py +++ b/tests/test_dhcp_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QDHCPServer @@ -10,6 +8,7 @@ EXPECTED_KEYS, IP, load_logs_from_file, + wait_for_server, ) PORT = "50067" @@ -21,13 +20,9 @@ indirect=True, ) def test_dhcp_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT, udp=True) as connection: + with wait_for_server(PORT), connect_to(IP, PORT, udp=True) as connection: connection.send(b"\x03" * 240) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 1 diff --git a/tests/test_dns_server.py b/tests/test_dns_server.py index c8ad214..47ace22 100644 --- a/tests/test_dns_server.py +++ b/tests/test_dns_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from dns.resolver import Resolver @@ -10,6 +8,7 @@ assert_connect_is_logged, IP, load_logs_from_file, + wait_for_server, ) PORT = "50053" @@ -21,22 +20,19 @@ indirect=True, ) def test_dns_server(server_logs): - sleep(1) # give the server some time to start - - resolver = Resolver(configure=False) - resolver.nameservers = [IP] - resolver.port = int(PORT) - domain = "example.org" - responses = [ - resolver.resolve(domain, "a", tcp=False), - resolver.resolve(domain, "a", tcp=True), - ] - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + resolver = Resolver(configure=False) + resolver.nameservers = [IP] + resolver.port = int(PORT) + domain = "example.org" + responses = [ + resolver.resolve(domain, "a", tcp=False), + resolver.resolve(domain, "a", tcp=True), + ] logs = load_logs_from_file(server_logs) - assert len(logs) == 3 # noqa: PLR2004 + assert len(logs) == 3 connect, *queries = logs assert_connect_is_logged(connect, PORT) diff --git a/tests/test_elastic_server.py b/tests/test_elastic_server.py index 397180c..5d358e9 100644 --- a/tests/test_elastic_server.py +++ b/tests/test_elastic_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pytest from elasticsearch import Elasticsearch, NotFoundError @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "59200" @@ -25,9 +25,7 @@ indirect=True, ) def test_elastic_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(NotFoundError): + with wait_for_server(PORT), suppress(NotFoundError): elastic = Elasticsearch( [f"https://{IP}:{PORT}"], basic_auth=(USERNAME, PASSWORD), @@ -35,8 +33,6 @@ def test_elastic_server(server_logs): ) elastic.search(index="test", body={}, size=99) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_ftp_server.py b/tests/test_ftp_server.py index afeb808..2361752 100644 --- a/tests/test_ftp_server.py +++ b/tests/test_ftp_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from ftplib import FTP -from time import sleep import pytest @@ -13,6 +12,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50021" @@ -31,15 +31,12 @@ indirect=True, ) def test_ftp_server(server_logs): - sleep(1) # give the server some time to start - - client = FTP() - client.connect(IP, int(PORT)) - client.login(USERNAME, PASSWORD) - client.pwd() - client.quit() - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + client = FTP() + client.connect(IP, int(PORT)) + client.login(USERNAME, PASSWORD) + client.pwd() + client.quit() logs = load_logs_from_file(server_logs) diff --git a/tests/test_http_proxy_server.py b/tests/test_http_proxy_server.py index bee0f7b..7ccc7b0 100644 --- a/tests/test_http_proxy_server.py +++ b/tests/test_http_proxy_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from pathlib import Path -from time import sleep import pytest import requests @@ -11,9 +10,11 @@ assert_connect_is_logged, IP, load_logs_from_file, + wait_for_server, ) PORT = "58080" +PORT_2 = "58081" SERVER_CONFIG = { "honeypots": { "httpproxy": { @@ -29,19 +30,16 @@ indirect=True, ) def test_http_proxy_server(server_logs): - sleep(1) # give the server some time to start - - response = requests.get( - "http://example.com/", - proxies={"http": f"http://{IP}:{PORT}"}, - timeout=2, - ) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + response = requests.get( + "http://example.com/", + proxies={"http": f"http://{IP}:{PORT}"}, + timeout=2, + ) logs = load_logs_from_file(server_logs) - assert len(logs) == 2 # noqa: PLR2004 + assert len(logs) == 2 connect, query = logs assert_connect_is_logged(connect, PORT) @@ -67,22 +65,19 @@ def test_http_proxy_server(server_logs): [ { "server": QHTTPProxyServer, - "port": str(int(PORT) + 1), + "port": PORT_2, "custom_config": CUSTOM_TEMPLATE_CONFIG, } ], indirect=True, ) def test_custom_template(server_logs): # noqa: ARG001 - sleep(1) # give the server some time to start - - response = requests.get( - "http://example.com/", - proxies={"http": f"http://{IP}:{int(PORT) + 1!s}"}, - timeout=2, - ) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT_2): + response = requests.get( + "http://example.com/", + proxies={"http": f"http://{IP}:{PORT_2}"}, + timeout=2, + ) assert response.ok assert "This is a template for testing!" in response.text, "custom template is missing" diff --git a/tests/test_http_server.py b/tests/test_http_server.py index 1064248..2513698 100644 --- a/tests/test_http_server.py +++ b/tests/test_http_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest import requests @@ -13,6 +11,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50080" @@ -31,13 +30,10 @@ indirect=True, ) def test_http_server(server_logs): - sleep(1) # give the server some time to start - - url = f"http://{IP}:{PORT}" - data = {"username": USERNAME, "password": PASSWORD} - requests.post(f"{url}/login.html", verify=False, data=data) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + url = f"http://{IP}:{PORT}" + data = {"username": USERNAME, "password": PASSWORD} + requests.post(f"{url}/login.html", verify=False, data=data) logs = load_logs_from_file(server_logs) diff --git a/tests/test_https_server.py b/tests/test_https_server.py index 18a97ba..5f976d3 100644 --- a/tests/test_https_server.py +++ b/tests/test_https_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest import requests @@ -13,6 +11,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50443" @@ -31,13 +30,10 @@ indirect=True, ) def test_https_server(server_logs): - sleep(1) # give the server some time to start - - url = f"https://{IP}:{PORT}" - data = {"username": USERNAME, "password": PASSWORD} - requests.post(f"{url}/login.html", verify=False, data=data) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + url = f"https://{IP}:{PORT}" + data = {"username": USERNAME, "password": PASSWORD} + requests.post(f"{url}/login.html", verify=False, data=data) logs = load_logs_from_file(server_logs) diff --git a/tests/test_imap_server.py b/tests/test_imap_server.py index f0a74e4..b3cc410 100644 --- a/tests/test_imap_server.py +++ b/tests/test_imap_server.py @@ -2,7 +2,6 @@ from contextlib import suppress from imaplib import IMAP4 -from time import sleep import pytest @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50143" @@ -32,14 +32,10 @@ indirect=True, ) def test_imap_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(IMAP4.error): + with wait_for_server(PORT), suppress(IMAP4.error): imap_test = IMAP4(IP, int(PORT)) imap_test.login(USERNAME, PASSWORD) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 4 diff --git a/tests/test_ipp_server.py b/tests/test_ipp_server.py index b50ce16..a55b24f 100644 --- a/tests/test_ipp_server.py +++ b/tests/test_ipp_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest import requests @@ -10,6 +8,7 @@ assert_connect_is_logged, IP, load_logs_from_file, + wait_for_server, ) PORT = "50631" @@ -28,22 +27,19 @@ indirect=True, ) def test_ipp_server(server_logs): - sleep(1) # give the server some time to start - - body = ( - b"\x02\x00\x00\x0b\x00\x01/p\x01G\x00\x12attributes-charset\x00\x05utf-8H\x00\x1b" - b"attributes-natural-language\x00\x02enE\x00\x0bprinter-uri\x00\x15" - b"ipp://127.0.0.1:631/D\x00\x14requested-attributes\x00\x03allD\x00\x00\x00\x12media-col-database\x03" - ) - headers = { - "Content-Type": "application/x-www-form-urlencoded", - "Content-Length": f"{len(body)}", - "Host": f"{IP}:{PORT}", - "Connection": "close", - } - requests.post(f"http://{IP}:{PORT}/", data=body, headers=headers) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + body = ( + b"\x02\x00\x00\x0b\x00\x01/p\x01G\x00\x12attributes-charset\x00\x05utf-8H\x00\x1b" + b"attributes-natural-language\x00\x02enE\x00\x0bprinter-uri\x00\x15" + b"ipp://127.0.0.1:631/D\x00\x14requested-attributes\x00\x03allD\x00\x00\x00\x12media-col-database\x03" + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Content-Length": f"{len(body)}", + "Host": f"{IP}:{PORT}", + "Connection": "close", + } + requests.post(f"http://{IP}:{PORT}/", data=body, headers=headers) logs = load_logs_from_file(server_logs) @@ -54,7 +50,8 @@ def test_ipp_server(server_logs): assert query["action"] == "query" assert query["data"] == { "request": ( - "VERSION 2.0|REQUEST 0x12f70|OPERATION Get-Printer-Attributes|GROUP operation-attributes-tag|" - "ATTR attributes-charset utf-8|ATTR attributes-natural-language en|ATTR printer-uri ipp://127.0.0.1:631/D" + "VERSION 2.0|REQUEST 0x12f70|OPERATION Get-Printer-Attributes|GROUP " + "operation-attributes-tag|ATTR attributes-charset utf-8|ATTR " + "attributes-natural-language en|ATTR printer-uri ipp://127.0.0.1:631/D" ) } diff --git a/tests/test_irc_server.py b/tests/test_irc_server.py index c7afa8f..950761f 100644 --- a/tests/test_irc_server.py +++ b/tests/test_irc_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QIRCServer @@ -11,6 +9,7 @@ IP, load_logs_from_file, PASSWORD, + wait_for_server, ) PORT = "56667" @@ -29,14 +28,10 @@ indirect=True, ) def test_irc_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT) as connection: + with wait_for_server(PORT), connect_to(IP, PORT) as connection: connection.setblocking(False) connection.send(f"PASS {PASSWORD}\n".encode()) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_ldap_server.py b/tests/test_ldap_server.py index a83f47e..3a4126b 100644 --- a/tests/test_ldap_server.py +++ b/tests/test_ldap_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pytest from ldap3 import ALL, Connection, Server @@ -15,6 +14,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50389" @@ -26,9 +26,7 @@ indirect=True, ) def test_ldap_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(LDAPInsufficientAccessRightsResult): + with wait_for_server(PORT), suppress(LDAPInsufficientAccessRightsResult): connection = Connection( Server(IP, port=int(PORT), get_info=ALL), authentication="SIMPLE", @@ -42,8 +40,6 @@ def test_ldap_server(server_logs): connection.open() connection.bind() - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_memcache_server.py b/tests/test_memcache_server.py index 688a850..8cb409f 100644 --- a/tests/test_memcache_server.py +++ b/tests/test_memcache_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QMemcacheServer @@ -10,6 +8,7 @@ connect_to, IP, load_logs_from_file, + wait_for_server, ) PORT = "61211" @@ -21,14 +20,10 @@ indirect=True, ) def test_memcache_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT) as connection: + with wait_for_server(PORT), connect_to(IP, PORT) as connection: connection.send(b"stats\r\n") data, _ = connection.recvfrom(10000) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_mssql_server.py b/tests/test_mssql_server.py index c1a2a53..15ba222 100644 --- a/tests/test_mssql_server.py +++ b/tests/test_mssql_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pymssql import pytest @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "51433" @@ -25,9 +25,7 @@ indirect=True, ) def test_mssql_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(pymssql.OperationalError): + with wait_for_server(PORT), suppress(pymssql.OperationalError): connection = pymssql.connect( host=IP, port=str(PORT), @@ -37,8 +35,6 @@ def test_mssql_server(server_logs): ) connection.close() - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_mysql_server.py b/tests/test_mysql_server.py index 9736add..bdb3d5f 100644 --- a/tests/test_mysql_server.py +++ b/tests/test_mysql_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import mysql.connector import pytest @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "53306" @@ -25,9 +25,7 @@ indirect=True, ) def test_mysql_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(mysql.connector.errors.OperationalError): + with wait_for_server(PORT), suppress(mysql.connector.errors.OperationalError): connection = mysql.connector.connect( user=USERNAME, password=PASSWORD, @@ -38,8 +36,6 @@ def test_mysql_server(server_logs): ) connection.close() - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_ntp_server.py b/tests/test_ntp_server.py index 3fbe003..e84da23 100644 --- a/tests/test_ntp_server.py +++ b/tests/test_ntp_server.py @@ -2,7 +2,7 @@ import math from struct import unpack -from time import sleep, time +from time import time import pytest @@ -11,6 +11,7 @@ connect_to, IP, load_logs_from_file, + wait_for_server, ) PORT = "50123" @@ -22,15 +23,11 @@ indirect=True, ) def test_ntp_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT, udp=True) as connection: + with wait_for_server(PORT), connect_to(IP, PORT, udp=True) as connection: connection.send(b"\x1b" + 47 * b"\0") data, _ = connection.recvfrom(256) output_time = unpack("!12I", data)[10] - 2208988800 - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_oracle_server.py b/tests/test_oracle_server.py index ab9cdd1..ec2c367 100644 --- a/tests/test_oracle_server.py +++ b/tests/test_oracle_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QOracleServer @@ -11,6 +9,7 @@ IP, load_logs_from_file, USERNAME, + wait_for_server, ) PORT = "51521" @@ -24,23 +23,21 @@ indirect=True, ) def test_oracle_server(server_logs): - sleep(1) # give the server some time to start - - payload = ( - "\x00\x00\x03\x04\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x01F\xb9\xd9@\x00@\x06\x81\xd6" - "\x7f\x00\x00\x01\x7f\x00\x00\x01\xbf\xce\x06\x13\xacW\xde\xc0Z\xb5\x0cI\x80\x18\x02\x00\xff:\x00\x00" - "\x01\x01\x08\n\x1bdZ^\x1bdZ^\x01\x12\x00\x00\x01\x00\x00\x00\x01>\x01,\x0cA \x00\xff\xff\x7f\x08\x00" - "\x00\x01\x00\x00\xc8\x00J\x00\x00\x14\x00AA\xa7C\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" - f"(DESCRIPTION=(CONNECT_DATA=(SERVICE_NAME={SERVICE})(CID=(PROGRAM={PROGRAM})(HOST=xxxxxxxxxxxxxx)" - f"(USER={USERNAME}))(CONNECTION_ID=xxxxxxxxxxxxxxxxxxxxxxxx))(ADDRESS=(PROTOCOL=tcp)(HOST={IP})(PORT={PORT})))" - ) - with connect_to(IP, PORT) as connection: + with wait_for_server(PORT), connect_to(IP, PORT) as connection: + payload = ( + "\x00\x00\x03\x04\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x01F\xb9\xd9@" + "\x00@\x06\x81\xd6\x7f\x00\x00\x01\x7f\x00\x00\x01\xbf\xce\x06\x13\xacW\xde\xc0Z\xb5" + "\x0cI\x80\x18\x02\x00\xff:\x00\x00\x01\x01\x08\n\x1bdZ^\x1bdZ^\x01\x12\x00\x00\x01" + "\x00\x00\x00\x01>\x01,\x0cA \x00\xff\xff\x7f\x08\x00\x00\x01\x00\x00\xc8\x00J\x00" + "\x00\x14\x00AA\xa7C\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" + f"(DESCRIPTION=(CONNECT_DATA=(SERVICE_NAME={SERVICE})(CID=(PROGRAM={PROGRAM})" + f"(HOST=xxxxxxxxxxxxxx)(USER={USERNAME}))(CONNECTION_ID=xxxxxxxxxxxxxxxxxxxxxxxx))" + f"(ADDRESS=(PROTOCOL=tcp)(HOST={IP})(PORT={PORT})))" + ) connection.send(payload.encode()) response, _ = connection.recvfrom(10000) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_pjl_server.py b/tests/test_pjl_server.py index 7940060..45fe00b 100644 --- a/tests/test_pjl_server.py +++ b/tests/test_pjl_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QPJLServer @@ -10,6 +8,7 @@ connect_to, IP, load_logs_from_file, + wait_for_server, ) PORT = "59100" @@ -28,13 +27,9 @@ indirect=True, ) def test_pjl_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT) as connection: + with wait_for_server(PORT), connect_to(IP, PORT) as connection: connection.send(b"\x1b%-12345X@PJL prodinfo") - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_pop3_server.py b/tests/test_pop3_server.py index 220a02c..ba78fa9 100644 --- a/tests/test_pop3_server.py +++ b/tests/test_pop3_server.py @@ -2,7 +2,6 @@ from contextlib import suppress from poplib import error_proto, POP3 -from time import sleep import pytest @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50110" @@ -32,15 +32,11 @@ indirect=True, ) def test_pop3_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(error_proto): + with wait_for_server(PORT), suppress(error_proto): client = POP3(IP, int(PORT)) client.user(USERNAME) client.pass_(PASSWORD) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 4 diff --git a/tests/test_postgres_server.py b/tests/test_postgres_server.py index 0e84791..58dfe9b 100644 --- a/tests/test_postgres_server.py +++ b/tests/test_postgres_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pytest from psycopg2 import connect, OperationalError @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "55432" @@ -25,12 +25,8 @@ indirect=True, ) def test_postgres_server(server_logs): - sleep(1) # give the server some time to start - - with suppress(OperationalError): - db = connect(host=IP, port=PORT, user=USERNAME, password=PASSWORD) - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT), suppress(OperationalError): + connect(host=IP, port=PORT, user=USERNAME, password=PASSWORD) logs = load_logs_from_file(server_logs) diff --git a/tests/test_rdp_server.py b/tests/test_rdp_server.py index 774c878..60b9cd2 100644 --- a/tests/test_rdp_server.py +++ b/tests/test_rdp_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QRDPServer @@ -10,6 +8,7 @@ connect_to, IP, load_logs_from_file, + wait_for_server, ) PORT = "53389" @@ -21,16 +20,13 @@ indirect=True, ) def test_rdp_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT) as connection: + with wait_for_server(PORT), connect_to(IP, PORT) as connection: connection.send(b"test") connection.send( - b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00" + b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: " + b"mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00" ) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_redis_server.py b/tests/test_redis_server.py index 4ab3294..d265baf 100644 --- a/tests/test_redis_server.py +++ b/tests/test_redis_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pytest from redis import AuthenticationError, StrictRedis @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "56379" @@ -25,13 +25,11 @@ indirect=True, ) def test_redis_server(server_logs): - with suppress(AuthenticationError): + with wait_for_server(PORT), suppress(AuthenticationError): redis = StrictRedis.from_url(f"redis://{USERNAME}:{PASSWORD}@{IP}:{PORT}/1") for _ in redis.scan_iter("user:*"): pass - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_sip_server.py b/tests/test_sip_server.py index 111cb6d..4aa4d78 100644 --- a/tests/test_sip_server.py +++ b/tests/test_sip_server.py @@ -1,11 +1,9 @@ from __future__ import annotations -from time import sleep - import pytest from honeypots import QSIPServer -from .utils import connect_to, IP, load_logs_from_file +from .utils import connect_to, IP, load_logs_from_file, wait_for_server PORT = "55060" EXPECTED_KEYS = ("action", "server", "src_ip", "src_port", "timestamp") @@ -21,9 +19,7 @@ indirect=True, ) def test_sip_server(server_logs): - sleep(1) # give the server some time to start - - with connect_to(IP, PORT, udp=True) as connection: + with wait_for_server(PORT), connect_to(IP, PORT, udp=True) as connection: payload = ( "INVITE sip:user_1@test.test SIP/2.0\r\n" f"To: {TO}\r\n" @@ -38,8 +34,6 @@ def test_sip_server(server_logs): ) connection.send(payload.encode()) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_smb_server.py b/tests/test_smb_server.py index 883871e..17a2711 100644 --- a/tests/test_smb_server.py +++ b/tests/test_smb_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from time import sleep - import pytest from impacket.smbconnection import SMBConnection @@ -12,6 +10,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50445" @@ -23,13 +22,10 @@ indirect=True, ) def test_smb_server(server_logs): - sleep(5) # give the server some time to start - - smb_client = SMBConnection(IP, IP, sess_port=PORT) - smb_client.login(USERNAME, PASSWORD) - smb_client.close() - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + smb_client = SMBConnection(IP, IP, sess_port=PORT) + smb_client.login(USERNAME, PASSWORD) + smb_client.close() logs = load_logs_from_file(server_logs) diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py index 546b30f..e1d3ac0 100644 --- a/tests/test_smtp_server.py +++ b/tests/test_smtp_server.py @@ -2,7 +2,6 @@ from base64 import b64decode from smtplib import SMTP -from time import sleep import pytest @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50025" @@ -39,15 +39,12 @@ indirect=True, ) def test_smtp_server(server_logs): - sleep(1) # give server time to start - - client = SMTP(IP, int(PORT)) - client.ehlo() - client.login(USERNAME, PASSWORD) - client.sendmail("fromtest", "totest", "Nothing") - client.quit() - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + client = SMTP(IP, int(PORT)) + client.ehlo() + client.login(USERNAME, PASSWORD) + client.sendmail("fromtest", "totest", "Nothing") + client.quit() logs = load_logs_from_file(server_logs) @@ -61,4 +58,4 @@ def test_smtp_server(server_logs): assert b64decode(auth["data"]["data"]).decode() == f"\x00{USERNAME}\x00{PASSWORD}" for entry, expected in zip(additional, EXPECTED_DATA): - assert entry + assert entry["data"] == expected diff --git a/tests/test_snmp_server.py b/tests/test_snmp_server.py index 72af237..528d54f 100644 --- a/tests/test_snmp_server.py +++ b/tests/test_snmp_server.py @@ -16,6 +16,7 @@ assert_connect_is_logged, IP, load_logs_from_file, + wait_for_server, ) PORT = "50161" @@ -27,14 +28,15 @@ indirect=True, ) def test_snmp_server(server_logs): - g = getCmd( - SnmpEngine(), - CommunityData("public"), - UdpTransportTarget((IP, int(PORT))), - ContextData(), - ObjectType(ObjectIdentity("1.3.6.1.4.1.9.9.618.1.4.1.0")), - ) - next(g) + with wait_for_server(PORT): + g = getCmd( + SnmpEngine(), + CommunityData("public"), + UdpTransportTarget((IP, int(PORT))), + ContextData(), + ObjectType(ObjectIdentity("1.3.6.1.4.1.9.9.618.1.4.1.0")), + ) + next(g) logs = load_logs_from_file(server_logs) diff --git a/tests/test_socks5_server.py b/tests/test_socks5_server.py index b8de9a9..2dedf9a 100644 --- a/tests/test_socks5_server.py +++ b/tests/test_socks5_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import suppress -from time import sleep import pytest import requests @@ -14,6 +13,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "51080" @@ -25,14 +25,12 @@ indirect=True, ) def test_socks5_server(server_logs): - with suppress(requests.exceptions.ConnectionError): + with wait_for_server(PORT), suppress(requests.exceptions.ConnectionError): requests.get( "http://127.0.0.1/", proxies={"http": f"socks5://{USERNAME}:{PASSWORD}@{IP}:{PORT}"}, ) - sleep(1) # give the server process some time to write logs - logs = load_logs_from_file(server_logs) assert len(logs) == 2 diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py index 5f32cfc..98b3162 100644 --- a/tests/test_ssh_server.py +++ b/tests/test_ssh_server.py @@ -1,12 +1,17 @@ from __future__ import annotations -from time import sleep - import pytest from paramiko import AutoAddPolicy, SSHClient from honeypots import QSSHServer -from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD, USERNAME +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, + wait_for_server, +) PORT = 50022 SERVER_CONFIG = { @@ -24,14 +29,11 @@ indirect=True, ) def test_ssh_server(server_logs): - sleep(1) # give the server some time to start - - ssh = SSHClient() - ssh.set_missing_host_key_policy(AutoAddPolicy()) - ssh.connect(IP, port=PORT, username=USERNAME, password=PASSWORD) - ssh.close() - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + ssh = SSHClient() + ssh.set_missing_host_key_policy(AutoAddPolicy()) + ssh.connect(IP, port=PORT, username=USERNAME, password=PASSWORD) + ssh.close() logs = load_logs_from_file(server_logs) diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py index 98073b6..d353fdf 100644 --- a/tests/test_telnet_server.py +++ b/tests/test_telnet_server.py @@ -1,7 +1,6 @@ from __future__ import annotations from telnetlib import Telnet -from time import sleep import pytest @@ -13,6 +12,7 @@ load_logs_from_file, PASSWORD, USERNAME, + wait_for_server, ) PORT = "50023" @@ -24,15 +24,12 @@ indirect=True, ) def test_telnet_server(server_logs): - sleep(1) # give the server some time to start - - telnet_client = Telnet(IP, int(PORT)) - telnet_client.read_until(b"login: ") - telnet_client.write(USERNAME.encode() + b"\n") - telnet_client.read_until(b"Password: ") - telnet_client.write(PASSWORD.encode() + b"\n") - - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + telnet_client = Telnet(IP, int(PORT)) + telnet_client.read_until(b"login: ") + telnet_client.write(USERNAME.encode() + b"\n") + telnet_client.read_until(b"Password: ") + telnet_client.write(PASSWORD.encode() + b"\n") logs = load_logs_from_file(server_logs) diff --git a/tests/test_vnc_server.py b/tests/test_vnc_server.py index 834f40f..632b6dc 100644 --- a/tests/test_vnc_server.py +++ b/tests/test_vnc_server.py @@ -1,13 +1,12 @@ from __future__ import annotations from multiprocessing import Process -from time import sleep import pytest from vncdotool import api from honeypots import QVNCServer -from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD +from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD, wait_for_server PORT = "55900" @@ -25,9 +24,9 @@ def _connect_to_vnc(): def test_vnc_server(server_logs): # This VNC API creates a blocking daemon thread that can't be trivially stopped, # so we just run it in a process and terminate that instead - process = Process(target=_connect_to_vnc) - process.start() - sleep(1) # give the server process some time to write logs + with wait_for_server(PORT): + process = Process(target=_connect_to_vnc) + process.start() process.terminate() process.join(timeout=5) diff --git a/tests/utils.py b/tests/utils.py index a273a82..6d453ae 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,8 +2,14 @@ import json from contextlib import contextmanager -from pathlib import Path from socket import AF_INET, IPPROTO_UDP, SOCK_DGRAM, SOCK_STREAM, socket +from time import sleep, time +from typing import TYPE_CHECKING + +import psutil + +if TYPE_CHECKING: + from pathlib import Path IP = "127.0.0.1" USERNAME = "testing" @@ -12,7 +18,7 @@ def load_logs_from_file(log_folder: Path) -> list[dict]: - log_files = [f for f in log_folder.iterdir()] + log_files = list(log_folder.iterdir()) assert len(log_files) == 1 log_file = log_files[0] logs = [] @@ -53,3 +59,24 @@ def assert_login_is_logged(login: dict[str, str]): assert login["username"] == USERNAME assert login["password"] == PASSWORD assert login["status"] == "success" + + +@contextmanager +def wait_for_server(port: str | int): + _wait_for_service(int(port)) + yield + sleep(0.5) # give the server process some time to write logs + + +def _wait_for_service(port: int, interval: float = 0.1, timeout: int = 5.0): + start_time = time() + while True: + if _service_runs(port): + return + sleep(interval) + if time() - start_time > timeout: + raise TimeoutError() + + +def _service_runs(port: int) -> bool: + return any(service.laddr.port == port for service in psutil.net_connections()) From 8a0bfbb9c22d36eb991ea27ac49cd28bf413fcbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Tue, 23 Jan 2024 13:54:46 +0100 Subject: [PATCH 5/5] empty options bug fix --- honeypots/http_proxy_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/honeypots/http_proxy_server.py b/honeypots/http_proxy_server.py index 7feb9a2..66c63f2 100644 --- a/honeypots/http_proxy_server.py +++ b/honeypots/http_proxy_server.py @@ -154,7 +154,7 @@ def run_server(self, process=False, auto=False) -> bool | None: self.process = Popen( split( f"python3 {Path(__file__)} --custom --ip {self.ip} --port {self.port} " - f"--options {self.options} --config {self.config} --uuid {self.uuid}" + f"--options '{self.options}' --config '{self.config}' --uuid {self.uuid}" ) ) if self.process.poll() is None and check_if_server_is_running(self.uuid):