Skip to content

Commit

Permalink
Merge pull request #51 from fkie-cad/refactoring
Browse files Browse the repository at this point in the history
Reformat all files with ruff
  • Loading branch information
giga-a authored Jan 24, 2024
2 parents 73f4c3a + 056a4ee commit 13041a3
Show file tree
Hide file tree
Showing 37 changed files with 5,488 additions and 1,632 deletions.
61 changes: 60 additions & 1 deletion honeypots/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@
from .dns_server import QDNSServer
from .elastic_server import QElasticServer
from .ftp_server import QFTPServer
from .helper import (
check_privileges,
clean_all,
close_port_wrapper,
disable_logger,
get_free_port,
get_running_servers,
kill_server_wrapper,
kill_servers,
postgres_class,
server_arguments,
set_local_vars,
setup_logger,
)
from .http_proxy_server import QHTTPProxyServer
from .http_server import QHTTPServer
from .https_server import QHTTPSServer
Expand All @@ -30,4 +44,49 @@
from .ssh_server import QSSHServer
from .telnet_server import QTelnetServer
from .vnc_server import QVNCServer
from .helper import server_arguments, clean_all, kill_servers, get_free_port, close_port_wrapper, kill_server_wrapper, setup_logger, disable_logger, postgres_class, get_running_servers, set_local_vars, check_privileges

__all__ = [
"QBSniffer",
"QDHCPServer",
"QDNSServer",
"QElasticServer",
"QFTPServer",
"QHTTPProxyServer",
"QHTTPSServer",
"QHTTPServer",
"QIMAPServer",
"QIPPServer",
"QIRCServer",
"QLDAPServer",
"QMSSQLServer",
"QMemcacheServer",
"QMysqlServer",
"QNTPServer",
"QOracleServer",
"QPJLServer",
"QPOP3Server",
"QPostgresServer",
"QRDPServer",
"QRedisServer",
"QSIPServer",
"QSMBServer",
"QSMTPServer",
"QSNMPServer",
"QSOCKS5Server",
"QSSHServer",
"QTelnetServer",
"QVNCServer",
"check_privileges",
"clean_all",
"close_port_wrapper",
"disable_logger",
"get_free_port",
"get_running_servers",
"kill_server_wrapper",
"kill_servers",
"main_logic",
"postgres_class",
"server_arguments",
"set_local_vars",
"setup_logger",
]
350 changes: 233 additions & 117 deletions honeypots/__main__.py

Large diffs are not rendered by default.

155 changes: 116 additions & 39 deletions honeypots/dhcp_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'''
"""
// -------------------------------------------------------------
// author Giga
// project qeeqbox/honeypots
Expand All @@ -8,10 +8,11 @@
// -------------------------------------------------------------
// contributors list qeeqbox/honeypots/graphs/contributors
// -------------------------------------------------------------
'''
"""
from warnings import filterwarnings
filterwarnings(action='ignore', module='.*OpenSSL.*')
filterwarnings(action='ignore', module='.*socket.*')

filterwarnings(action="ignore", module=".*OpenSSL.*")
filterwarnings(action="ignore", module=".*socket.*")

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
Expand All @@ -20,66 +21,101 @@
from socket import inet_aton
from subprocess import Popen
from os import path, getenv
from honeypots.helper import close_port_wrapper, get_free_port, kill_server_wrapper, server_arguments, setup_logger, disable_logger, set_local_vars, check_if_server_is_running
from honeypots.helper import (
close_port_wrapper,
get_free_port,
kill_server_wrapper,
server_arguments,
setup_logger,
disable_logger,
set_local_vars,
check_if_server_is_running,
)
from uuid import uuid4


class QDHCPServer():
class QDHCPServer:
def __init__(self, **kwargs):
self.auto_disabled = None
self.process = None
self.uuid = 'honeypotslogger' + '_' + __class__.__name__ + '_' + str(uuid4())[:8]
self.config = kwargs.get('config', '')
self.uuid = "honeypotslogger" + "_" + __class__.__name__ + "_" + str(uuid4())[:8]
self.config = kwargs.get("config", "")
if self.config:
self.logs = setup_logger(__class__.__name__, self.uuid, self.config)
set_local_vars(self, self.config)
else:
self.logs = setup_logger(__class__.__name__, self.uuid, None)
self.ip = kwargs.get('ip', None) or (hasattr(self, 'ip') and self.ip) or '0.0.0.0'
self.port = (kwargs.get('port', None) and int(kwargs.get('port', None))) or (hasattr(self, 'port') and self.port) or 67
self.options = kwargs.get('options', '') or (hasattr(self, 'options') and self.options) or getenv('HONEYPOTS_OPTIONS', '') or ''
self.ip = kwargs.get("ip", None) or (hasattr(self, "ip") and self.ip) or "0.0.0.0"
self.port = (
(kwargs.get("port", None) and int(kwargs.get("port", None)))
or (hasattr(self, "port") and self.port)
or 67
)
self.options = (
kwargs.get("options", "")
or (hasattr(self, "options") and self.options)
or getenv("HONEYPOTS_OPTIONS", "")
or ""
)
disable_logger(1, tlog)

def dhcp_server_main(self):
_q_s = self

class CustomDatagramProtocolProtocol(DatagramProtocol):

def check_bytes(self, string):
if isinstance(string, bytes):
return string.decode()
else:
return str(string)

def payload(self, value, message):
op, htype, hlen, hops, xid, secs, flags, ciaddr, yiaddr, siaddr, giaddr, chaddr = unpack('1s1s1s1s4s2s2s4s4s4s4s16s', message[:44])
#op, htype, hlen, hops, xid, secs, flags, ciaddr
response = b'\x02\x01\x06\x00' + xid + b'\x00\x00\x00\x00\x00\x00\x00\x00'
#yiaddr, siaddr, giaddr, chaddr
response += inet_aton(_q_s.dhcp_ip_lease) + inet_aton(_q_s.dhcp_ip) + inet_aton('0.0.0.0') + chaddr
#sname, file, magic
response += b'\x00' * 64 + b'\x00' * 128 + b'\x63\x82\x53\x63'
(
op,
htype,
hlen,
hops,
xid,
secs,
flags,
ciaddr,
yiaddr,
siaddr,
giaddr,
chaddr,
) = unpack("1s1s1s1s4s2s2s4s4s4s4s16s", message[:44])
# op, htype, hlen, hops, xid, secs, flags, ciaddr
response = b"\x02\x01\x06\x00" + xid + b"\x00\x00\x00\x00\x00\x00\x00\x00"
# yiaddr, siaddr, giaddr, chaddr
response += (
inet_aton(_q_s.dhcp_ip_lease)
+ inet_aton(_q_s.dhcp_ip)
+ inet_aton("0.0.0.0")
+ chaddr
)
# sname, file, magic
response += b"\x00" * 64 + b"\x00" * 128 + b"\x63\x82\x53\x63"
# options
response += bytes([53, 1, value])
response += bytes([54, 4]) + inet_aton(_q_s.dhcp_ip)
response += bytes([1, 4]) + inet_aton(_q_s.subnet_mask)
response += bytes([3, 4]) + inet_aton(_q_s.router)
response += bytes([6, 4]) + inet_aton(_q_s.dns_server)
response += bytes([51, 4]) + b'\x00\x00\xa8\xc0' # lease
response += b'\xff'
response += bytes([51, 4]) + b"\x00\x00\xa8\xc0" # lease
response += b"\xff"
return response

def parse_options(self, raw):
options = {}
tag_name = None
tag_size = None
tag = ''
tag = ""
for idx, b in enumerate(raw):
if tag_name is None:
tag_name = b
elif tag_name is not None and tag_size is None:
tag_size = b
tag = ''
tag = ""
else:
if tag_size:
tag_size -= 1
Expand All @@ -88,24 +124,37 @@ def parse_options(self, raw):
options.update({self.check_bytes(tag_name): self.check_bytes(tag)})
tag_name = None
tag_size = None
tag = ''
tag = ""
return options

def datagramReceived(self, data, addr):
try:
mac_address = unpack('!28x6s', data[:34])[0].hex(':')
mac_address = unpack("!28x6s", data[:34])[0].hex(":")
except StructError:
mac_address = "None"
data = self.parse_options(data[240:])
data.update({'mac_address': mac_address})
_q_s.logs.info({'server': 'dhcp_server', 'action': 'query', 'status': 'success', 'src_ip': addr[0], 'src_port': addr[1], 'dest_ip': _q_s.ip, 'dest_port': _q_s.port, 'data': data})
data.update({"mac_address": mac_address})
_q_s.logs.info(
{
"server": "dhcp_server",
"action": "query",
"status": "success",
"src_ip": addr[0],
"src_port": addr[1],
"dest_ip": _q_s.ip,
"dest_port": _q_s.port,
"data": data,
}
)
self.transport.loseConnection()

reactor.listenUDP(port=self.port, protocol=CustomDatagramProtocolProtocol(), interface=self.ip)
reactor.listenUDP(
port=self.port, protocol=CustomDatagramProtocolProtocol(), interface=self.ip
)
reactor.run()

def run_server(self, process=False, auto=False):
status = 'error'
status = "error"
run = False
if process:
if auto and not self.auto_disabled:
Expand All @@ -117,13 +166,39 @@ def run_server(self, process=False, auto=False):
run = True

if run:
self.process = Popen(['python3', path.realpath(__file__), '--custom', '--ip', str(self.ip), '--port', str(self.port), '--options', str(self.options), '--config', str(self.config), '--uuid', str(self.uuid)])
self.process = Popen(
[
"python3",
path.realpath(__file__),
"--custom",
"--ip",
str(self.ip),
"--port",
str(self.port),
"--options",
str(self.options),
"--config",
str(self.config),
"--uuid",
str(self.uuid),
]
)
if self.process.poll() is None and check_if_server_is_running(self.uuid):
status = 'success'

self.logs.info({'server': 'dhcp_server', 'action': 'process', 'status': status, 'src_ip': self.ip, 'src_port': self.port, 'dest_ip': self.ip, 'dest_port': self.port})

if status == 'success':
status = "success"

self.logs.info(
{
"server": "dhcp_server",
"action": "process",
"status": status,
"src_ip": self.ip,
"src_port": self.port,
"dest_ip": self.ip,
"dest_port": self.port,
}
)

if status == "success":
return True
else:
self.kill_server()
Expand All @@ -132,19 +207,21 @@ def run_server(self, process=False, auto=False):
self.dhcp_server_main()

def close_port(self):
ret = close_port_wrapper('dhcp_server', self.ip, self.port, self.logs)
ret = close_port_wrapper("dhcp_server", self.ip, self.port, self.logs)
return ret

def kill_server(self):
ret = kill_server_wrapper('dhcp_server', self.uuid, self.process)
ret = kill_server_wrapper("dhcp_server", self.uuid, self.process)
return ret

def test_server(self, ip=None, port=None):
pass


if __name__ == '__main__':
if __name__ == "__main__":
parsed = server_arguments()
if parsed.docker or parsed.aws or parsed.custom:
qdhcpserver = QDHCPServer(ip=parsed.ip, port=parsed.port, options=parsed.options, config=parsed.config)
qdhcpserver = QDHCPServer(
ip=parsed.ip, port=parsed.port, options=parsed.options, config=parsed.config
)
qdhcpserver.run_server()
Loading

0 comments on commit 13041a3

Please sign in to comment.