Skip to content

Commit

Permalink
Port internetnl bit → nibble (#4026)
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth authored Jan 29, 2025
1 parent bdd2795 commit ee65372
Show file tree
Hide file tree
Showing 22 changed files with 539 additions and 142 deletions.
15 changes: 0 additions & 15 deletions octopoes/bits/https_availability/bit.py

This file was deleted.

21 changes: 0 additions & 21 deletions octopoes/bits/https_availability/https_availability.py

This file was deleted.

24 changes: 0 additions & 24 deletions octopoes/bits/internetnl/bit.py

This file was deleted.

50 changes: 0 additions & 50 deletions octopoes/bits/internetnl/internetnl.py

This file was deleted.

21 changes: 19 additions & 2 deletions octopoes/nibbles/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Callable, Iterable
from pathlib import Path
from types import MethodType, ModuleType
from typing import Any
from typing import Any, get_origin

import structlog
from pydantic import BaseModel
Expand All @@ -19,9 +19,10 @@


class NibbleParameter(BaseModel):
object_type: type[Any]
object_type: Any
parser: str = "[]"
optional: bool = False
additional: set[type[OOI]] = set()

def __eq__(self, other):
if isinstance(other, NibbleParameter):
Expand All @@ -31,12 +32,24 @@ def __eq__(self, other):
else:
return False

@property
def triggers(self) -> set[type[OOI]]:
if (
isinstance(self.object_type, type)
and get_origin(self.object_type) is None
and issubclass(self.object_type, OOI)
):
return {self.object_type} | self.additional
else:
return self.additional


class NibbleDefinition(BaseModel):
id: str
signature: list[NibbleParameter]
query: str | Callable[[list[Reference | None]], str] | None = None
enabled: bool = True
additional: set[type[OOI]] = set()
_payload: MethodType | None = None
_checksum: str | None = None

Expand All @@ -53,6 +66,10 @@ def __hash__(self):
def _ini(self) -> dict[str, Any]:
return {"id": self.id, "enabled": self.enabled, "checksum": self._checksum}

@property
def triggers(self) -> set[type[OOI]]:
return set.union(*[sgn.triggers for sgn in self.signature]) | self.additional


def get_nibble_definitions() -> dict[str, NibbleDefinition]:
nibble_definitions = {}
Expand Down
2 changes: 1 addition & 1 deletion octopoes/nibbles/disallowed_csp_hostnames/nibble.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def query(targets: list[Reference | None]) -> str:


NIBBLE = NibbleDefinition(
id="disallowed-csp-hostnames",
id="disallowed_csp_hostnames",
signature=[
NibbleParameter(object_type=HTTPHeaderHostname, parser="[*][?object_type == 'HTTPHeaderHostname'][]"),
NibbleParameter(object_type=Config, parser="[*][?object_type == 'Config'][]", optional=True),
Expand Down
19 changes: 19 additions & 0 deletions octopoes/nibbles/https_availability/https_availability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def nibble(ipaddress: IPAddress, ipport80: IPPort, website: Website, port443s: int) -> Iterator[OOI]:
_ = ipaddress
_ = ipport80
if port443s < 1:
ft = KATFindingType(id="KAT-HTTPS-NOT-AVAILABLE")
yield ft
yield Finding(
ooi=website.reference,
finding_type=ft.reference,
description="HTTP port is open, but HTTPS port is not open",
)
66 changes: 66 additions & 0 deletions octopoes/nibbles/https_availability/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [(pull ?ipaddress [*]) (pull ?ipport80 [*]) (pull ?website [*]) (- (count ?ipport443) 1)]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?website :Website/ip_service ?ip_service]
[?ipservice :IPService/ip_port ?ipport80]
[?ipport80 :IPPort/port 80]
[?ipport80 :IPPort/address ?ipaddress]
(or
(and [?ipport443 :IPPort/address ?ipaddress][?ipport443 :IPPort/port 443])
[(identity nil) ?ipport443]
)
"""
]

ref_queries = [
f'[?ipaddress :IPAddress/primary_key "{str(targets[0])}"]',
f'[?ipport80 :IPPort/primary_key "{str(targets[1])}"]',
f'[?website :Website/primary_key "{str(targets[2])}"]',
]

sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
if sgn == "1000":
return pull(ref_queries[0:1] + base_query)
elif sgn == "0100":
if int(str(targets[1]).split("|")[-1]) == 80:
return pull(ref_queries[1:2] + base_query)
else:
return pull(base_query)
elif sgn == "0010":
return pull(ref_queries[2:3] + base_query)
elif sgn == "1110":
return pull(ref_queries + base_query)
else:
return pull(base_query)


NIBBLE = NibbleDefinition(
id="https_availability",
signature=[
NibbleParameter(
object_type=IPAddress, parser="[*][?object_type == 'IPAddressV6' || object_type == 'IPAddressV4'][]"
),
NibbleParameter(object_type=IPPort, parser="[*][?object_type == 'IPPort'][]"),
NibbleParameter(object_type=Website, parser="[*][?object_type == 'Website'][]"),
NibbleParameter(object_type=int, parser="[*][-1][]"),
],
query=query,
)
File renamed without changes.
18 changes: 18 additions & 0 deletions octopoes/nibbles/internetnl/internetnl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.findings import Finding, KATFindingType


def nibble(hostname: Hostname, findings: list[Finding]) -> Iterator[OOI]:
result = "\n".join([str(finding.description) for finding in findings])

if result:
ft = KATFindingType(id="KAT-INTERNETNL")
yield ft
yield Finding(
finding_type=ft.reference,
ooi=hostname.reference,
description=f"This hostname has at least one website with the following finding(s): {result}",
)
104 changes: 104 additions & 0 deletions octopoes/nibbles/internetnl/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.dns.zone import Hostname, Network
from octopoes.models.ooi.findings import Finding

finding_types = [
"KAT-WEBSERVER-NO-IPV6",
"KAT-NAMESERVER-NO-TWO-IPV6",
"KAT-NO-DNSSEC",
"KAT-INVALID-DNSSEC",
"KAT-NO-HSTS",
"KAT-NO-CSP",
"KAT-NO-X-FRAME-OPTIONS",
"KAT-NO-X-CONTENT-TYPE-OPTIONS",
"KAT-CSP-VULNERABILITIES",
"KAT-HSTS-VULNERABILITIES",
"KAT-NO-CERTIFICATE",
"KAT-HTTPS-NOT-AVAILABLE",
"KAT-SSL-CERT-HOSTNAME-MISMATCH",
"KAT-HTTPS-REDIRECT",
]


def or_finding_types() -> str:
clauses = "".join([f'[?finding :Finding/finding_type "{ft}"]' for ft in finding_types])
return f"(or {clauses})"


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [
(pull ?hostname [*])
(pull ?website [*])
(pull ?finding [*])
]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?hostname :object_type "Hostname"]
[?website :Website/hostname ?hostname]
(or-join [?finding ?hostname ?website]
[?finding :Finding/ooi ?hostname]
(and
[?hostnamehttpurl :HostnameHTTPURL/netloc ?hostname]
[?finding :Finding/ooi ?hostnamehttpurl]
)
[?finding :Finding/ooi ?website]
(and
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?resource]
)
(and
[?header :HTTPHeader/resource ?resource]
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?header]
)
)
"""
]

null_query = '{:query {:find [(pull ?var [])] :where [[?var :object_type ""]]}}'
sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
ref_query = ["[?hostname :Hostname/primary_key]"]
if sgn.startswith("1"):
ref_query = [f'[?hostname :Hostname/primary_key "{str(targets[0])}"]']
elif sgn.endswith("1"):
ref = str(targets[1]).split("|")
if ref[-1] == "KAT-INTERNETNL":
return null_query
else:
tokens = ref[1:-1]
target_reference = Reference.from_str("|".join(tokens))
if tokens[0] == "Hostname":
hostname = target_reference.tokenized
elif tokens[0] in {"HostnameHTTPURL", "Website"}:
hostname = target_reference.tokenized.hostname
elif tokens[0] == "HTTPResource":
hostname = target_reference.tokenized.website.hostname
elif tokens[0] == "HTTPHeader":
hostname = target_reference.tokenized.resource.website.hostname
else:
return null_query
hostname_pk = Hostname(name=hostname.name, network=Network(name=hostname.network.name).reference).reference
ref_query = [f'[?hostname :Hostname/primary_key "{str(hostname_pk)}"]']
return pull(ref_query + base_query)


NIBBLE = NibbleDefinition(
id="internet_nl",
signature=[
NibbleParameter(object_type=Hostname, parser="[*][?object_type == 'Hostname'][]"),
NibbleParameter(object_type=list[Finding], parser="[[*][?object_type == 'Finding'][]]", additional={Finding}),
],
query=query,
)
Loading

0 comments on commit ee65372

Please sign in to comment.