diff --git a/honeypots/__init__.py b/honeypots/__init__.py index be50362..a9c1572 100644 --- a/honeypots/__init__.py +++ b/honeypots/__init__.py @@ -28,7 +28,7 @@ from .pjl_server import QPJLServer from .pop3_server import QPOP3Server from .postgres_server import QPostgresServer -from .qbsniffer import QBSniffer +from .sniffer import QSniffer from .rdp_server import QRDPServer from .redis_server import QRedisServer from .sip_server import QSIPServer @@ -41,7 +41,7 @@ from .vnc_server import QVNCServer __all__ = [ - "QBSniffer", + "QSniffer", "QDHCPServer", "QDNSServer", "QElasticServer", diff --git a/honeypots/__main__.py b/honeypots/__main__.py index b873504..7586be4 100755 --- a/honeypots/__main__.py +++ b/honeypots/__main__.py @@ -12,7 +12,6 @@ from pathlib import Path from signal import alarm, SIGALRM, SIGINT, signal, SIGTERM, SIGTSTP from subprocess import Popen -from sys import stdout from time import sleep from typing import Any from uuid import uuid4 @@ -21,7 +20,7 @@ from psutil import net_io_counters, Process from honeypots import ( - QBSniffer, + QSniffer, QDHCPServer, QDNSServer, QElasticServer, @@ -155,7 +154,7 @@ class HoneypotsManager: def __init__(self, options: Namespace, server_args: dict[str, str | int]): self.options = options self.server_args = server_args - self.config_data = self._load_config() if self.options.config else None + self.config_data = self._load_config() if self.options.config else {} self.auto = options.auto if geteuid() != 0 else False self.honeypots: list[tuple[Any, str, bool]] = [] @@ -171,9 +170,11 @@ def main(self): print(service) elif self.options.kill: clean_all() - elif self.options.chameleon and self.config_data is not None: + elif self.options.chameleon and self.config_data: self._start_chameleon_mode() elif self.options.setup: + if self.options.sniffer: + self._set_up_sniffer() self._set_up_honeypots() def _load_config(self): @@ -288,19 +289,6 @@ def _start_chameleon_mode(self): # noqa: C901,PLR0912 logger.error("logging must be configured with db_sqlite or db_postgres") sys.exit(1) - sniffer_filter = self.config_data.get("sniffer_filter") - sniffer_interface = self.config_data.get("sniffer_interface") - if not (sniffer_filter and sniffer_interface): - return - - if not self.options.test and self.options.sniffer: - _check_interfaces(sniffer_interface) - if self.options.iptables: - _fix_ip_tables() - logger.info("[x] Wait for 10 seconds...") - stdout.flush() - sleep(2) - if self.options.config != "": logger.warning( "[x] Config.json file overrides --ip, --port, --username and --password" @@ -330,7 +318,7 @@ def _start_chameleon_mode(self): # noqa: C901,PLR0912 sys.exit(1) if self.options.sniffer: - self._start_sniffer(sniffer_filter, sniffer_interface) + self._set_up_sniffer() if not self.options.test: logger.info("[x] Everything looks good!") @@ -347,15 +335,30 @@ def _setup_logging(self) -> logging.Logger: drop = True return setup_logger("main", uuid, self.options.config, drop) + def _set_up_sniffer(self): + sniffer_filter = self.config_data.get("sniffer_filter") + sniffer_interface = self.config_data.get("sniffer_interface") + if not sniffer_interface: + logger.error('If sniffer is enabled, "sniffer_interface" must be set in the config') + sys.exit(1) + if not self.options.test and self.options.sniffer: + _check_interfaces(sniffer_interface) + if self.options.iptables: + _fix_ip_tables() + logger.info("[x] Wait for iptables update...") + sleep(2) + self._start_sniffer(sniffer_filter, sniffer_interface) + def _start_sniffer(self, sniffer_filter, sniffer_interface): logger.info("[x] Starting sniffer") - sniffer = QBSniffer( + sniffer = QSniffer( filter_=sniffer_filter, interface=sniffer_interface, config=self.options.config, ) sniffer.run_sniffer(process=True) - self.honeypots.append((sniffer, "sniffer", True)) + sleep(0.1) + self.honeypots.append((sniffer, "sniffer", sniffer.server_is_alive())) def _stats_loop(self, logs): while True: diff --git a/honeypots/base_server.py b/honeypots/base_server.py index 5ce54b1..9a2604a 100644 --- a/honeypots/base_server.py +++ b/honeypots/base_server.py @@ -88,6 +88,9 @@ def kill_server(self): except TimeoutError: self._server_process.kill() + def server_is_alive(self) -> bool: + return self._server_process and self._server_process.is_alive() + @abstractmethod def server_main(self): pass diff --git a/honeypots/dhcp_server.py b/honeypots/dhcp_server.py index d247286..f2de1df 100644 --- a/honeypots/dhcp_server.py +++ b/honeypots/dhcp_server.py @@ -10,8 +10,8 @@ // ------------------------------------------------------------- """ -from socket import inet_aton import struct +from socket import inet_aton from twisted.internet import reactor from twisted.internet.protocol import DatagramProtocol diff --git a/honeypots/helper.py b/honeypots/helper.py index cea01fd..82320b2 100644 --- a/honeypots/helper.py +++ b/honeypots/helper.py @@ -130,7 +130,7 @@ def _parse_record(record: LogRecord, custom_filter: dict, type_: str) -> LogReco return record -def setup_logger(name: str, temp_name: str, config: str, drop: bool = False): +def setup_logger(name: str, temp_name: str, config: str | None, drop: bool = False): logs = "terminal" logs_location = "" config_data = {} diff --git a/honeypots/qbsniffer.py b/honeypots/sniffer.py similarity index 67% rename from honeypots/qbsniffer.py rename to honeypots/sniffer.py index 14ad1cb..b7deb6a 100644 --- a/honeypots/qbsniffer.py +++ b/honeypots/sniffer.py @@ -12,85 +12,83 @@ from __future__ import annotations +import re from binascii import hexlify from multiprocessing import Process -import re from sys import stdout from typing import Iterable, TYPE_CHECKING -from uuid import uuid4 from netifaces import AF_INET, AF_LINK, ifaddresses from scapy.layers.inet import IP, TCP from scapy.sendrecv import send, sniff -from honeypots.helper import server_arguments, setup_logger - +from honeypots.base_server import BaseServer +from honeypots.helper import server_arguments + +ICMP_CODES = [ + (0, 0, "Echo/Ping reply"), + (3, 0, "Destination network unreachable"), + (3, 1, "Destination host unreachable"), + (3, 2, "Destination protocol unreachable"), + (3, 3, "Destination port unreachable"), + (3, 4, "Fragmentation required"), + (3, 5, "Source route failed"), + (3, 6, "Destination network unknown"), + (3, 7, "Destination host unknown"), + (3, 8, "Source host isolated"), + (3, 9, "Network administratively prohibited"), + (3, 10, "Host administratively prohibited"), + (3, 11, "Network unreachable for TOS"), + (3, 12, "Host unreachable for TOS"), + (3, 13, "Communication administratively prohibited"), + (3, 14, "Host Precedence Violation"), + (3, 15, "Precedence cutoff in effect"), + (4, 0, "Source quench"), + (5, 0, "Redirect Datagram for the Network"), + (5, 1, "Redirect Datagram for the Host"), + (5, 2, "Redirect Datagram for the TOS & network"), + (5, 3, "Redirect Datagram for the TOS & host"), + (8, 0, "Echo/Ping Request"), + (9, 0, "Router advertisement"), + (10, 0, "Router discovery/selection/solicitation"), + (11, 0, "TTL expired in transit"), + (11, 1, "Fragment reassembly time exceeded"), + (12, 0, "Pointer indicates the error"), + (12, 1, "Missing a required option"), + (12, 2, "Bad length"), + (13, 0, "Timestamp"), + (14, 0, "Timestamp Reply"), + (15, 0, "Information Request"), + (16, 0, "Information Reply"), + (17, 0, "Address Mask Request"), + (18, 0, "Address Mask Reply"), + (30, 0, "Information Request"), +] TCP_SYN_FLAG = 0b10 if TYPE_CHECKING: from scapy.packet import Packet -class QBSniffer: - def __init__(self, filter_=None, interface=None, config=""): +class QSniffer(BaseServer): + NAME = "sniffer" + + def __init__(self, filter_=None, interface=None, config="", **kwargs): + super().__init__(config=config, **kwargs) self.current_ip = ifaddresses(interface)[AF_INET][0]["addr"].encode("utf-8") self.current_mac = ifaddresses(interface)[AF_LINK][0]["addr"].encode("utf-8") self.filter = filter_ self.interface = interface self.method = "TCPUDP" - self.ICMP_codes = [ - (0, 0, "Echo/Ping reply"), - (3, 0, "Destination network unreachable"), - (3, 1, "Destination host unreachable"), - (3, 2, "Destination protocol unreachable"), - (3, 3, "Destination port unreachable"), - (3, 4, "Fragmentation required"), - (3, 5, "Source route failed"), - (3, 6, "Destination network unknown"), - (3, 7, "Destination host unknown"), - (3, 8, "Source host isolated"), - (3, 9, "Network administratively prohibited"), - (3, 10, "Host administratively prohibited"), - (3, 11, "Network unreachable for TOS"), - (3, 12, "Host unreachable for TOS"), - (3, 13, "Communication administratively prohibited"), - (3, 14, "Host Precedence Violation"), - (3, 15, "Precedence cutoff in effect"), - (4, 0, "Source quench"), - (5, 0, "Redirect Datagram for the Network"), - (5, 1, "Redirect Datagram for the Host"), - (5, 2, "Redirect Datagram for the TOS & network"), - (5, 3, "Redirect Datagram for the TOS & host"), - (8, 0, "Echo/Ping Request"), - (9, 0, "Router advertisement"), - (10, 0, "Router discovery/selection/solicitation"), - (11, 0, "TTL expired in transit"), - (11, 1, "Fragment reassembly time exceeded"), - (12, 0, "Pointer indicates the error"), - (12, 1, "Missing a required option"), - (12, 2, "Bad length"), - (13, 0, "Timestamp"), - (14, 0, "Timestamp Reply"), - (15, 0, "Information Request"), - (16, 0, "Information Reply"), - (17, 0, "Address Mask Request"), - (18, 0, "Address Mask Reply"), - (30, 0, "Information Request"), - ] self.allowed_ports = [] self.allowed_ips = [] self.common = re.compile(rb"pass|user|login") - self.uuid = f"honeypotslogger_{__class__.__name__}_{str(uuid4())[:8]}" - self.config = config - if config: - self.logs = setup_logger(__class__.__name__, self.uuid, config) - else: - self.logs = setup_logger(__class__.__name__, self.uuid, None) - def find_icmp(self, x1, x2): - for _ in self.ICMP_codes: - if x1 == _[0] and x2 == _[1]: - return _[2] + @staticmethod + def find_icmp(type_, code): + for icmp_type, icmp_code, msg_type in ICMP_CODES: + if type_ == icmp_type and code == icmp_code: + return msg_type return "None" @staticmethod @@ -103,8 +101,11 @@ def get_layers(packet: Packet) -> Iterable[str]: except AttributeError: pass - def scapy_sniffer_main(self): - sniff(filter=self.filter, iface=self.interface, prn=self.capture_logic) + def server_main(self): + try: + sniff(filter=self.filter, iface=self.interface, prn=self.capture_logic) + except PermissionError as error: + self.logger.error(f"Could not start sniffer: {error}") def _get_payloads(self, layers: list[str], packet: Packet): hex_payloads, raw_payloads, _fields = {}, {}, {} @@ -115,7 +116,7 @@ def _get_payloads(self, layers: list[str], packet: Packet): raw_payloads[layer] = _fields[layer]["load"] hex_payloads[layer] = hexlify(_fields[layer]["load"]) if re.search(self.common, raw_payloads[layer]): - self._log( + self.log( { "action": "creds_check", "payload": raw_payloads[layer], @@ -132,7 +133,7 @@ def capture_logic(self, packet: Packet): try: if self.method == "ALL": try: - self._log( + self.log( { "action": "all", "layers": _layers, @@ -155,7 +156,7 @@ def capture_logic(self, packet: Packet): and packet.haslayer("ICMP") and packet["IP"].src != self.current_ip ): - self._log( + self.log( { "action": "icmp", "dest_ip": packet["IP"].src, @@ -180,7 +181,7 @@ def capture_logic(self, packet: Packet): stdout.flush() def _handle_tcp_scan(self, packet: Packet, hex_payloads: dict, raw_payloads: dict): - self._log( + self.log( { "action": "tcpscan", "dest_ip": packet["IP"].src, @@ -204,7 +205,7 @@ def _log_tcp_udp(self, packet: Packet, hex_payloads: dict, raw_payloads: dict): for layer in ["TCP", "UDP"]: if packet.haslayer(layer): try: - self._log( + self.log( { "action": f"{layer.lower()}payload", "dest_ip": packet["IP"].src, @@ -218,9 +219,9 @@ def _log_tcp_udp(self, packet: Packet, hex_payloads: dict, raw_payloads: dict): except Exception as error: self._log_error(error, 3) - def _log(self, log_data: dict): + def log(self, log_data: dict): log_data.update({"ip": self.current_ip, "mac": self.current_mac}) - self.logs.info(["sniffer", log_data]) + self.logs.info([self.NAME, log_data]) def _log_error(self, error: Exception, _id: int): self.logs.error( @@ -232,20 +233,16 @@ def _log_error(self, error: Exception, _id: int): def run_sniffer(self, process=None): if process: - self.process = Process(name="QSniffer_", target=self.scapy_sniffer_main) - self.process.start() + self._server_process = Process(name="QSniffer_", target=self.server_main) + self._server_process.start() else: - self.scapy_sniffer_main() - - def kill_sniffer(self): - self.process.terminate() - self.process.join() + self.server_main() if __name__ == "__main__": parsed = server_arguments() if parsed.docker or parsed.aws or parsed.custom: - qsniffer = QBSniffer( + qsniffer = QSniffer( filter_=parsed.filter, interface=parsed.interface, config=parsed.config ) qsniffer.run_sniffer()