Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port internetnl bit → nibble #4026

Merged
merged 23 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions octopoes/bits/https_availability/bit.py

This file was deleted.

21 changes: 0 additions & 21 deletions octopoes/bits/https_availability/https_availability.py

This file was deleted.

24 changes: 0 additions & 24 deletions octopoes/bits/internetnl/bit.py

This file was deleted.

50 changes: 0 additions & 50 deletions octopoes/bits/internetnl/internetnl.py

This file was deleted.

21 changes: 19 additions & 2 deletions octopoes/nibbles/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Callable, Iterable
from pathlib import Path
from types import MethodType, ModuleType
from typing import Any
from typing import Any, get_origin

import structlog
from pydantic import BaseModel
Expand All @@ -19,9 +19,10 @@


class NibbleParameter(BaseModel):
object_type: type[Any]
object_type: Any
parser: str = "[]"
optional: bool = False
additional: set[type[OOI]] = set()

def __eq__(self, other):
if isinstance(other, NibbleParameter):
Expand All @@ -31,12 +32,24 @@ def __eq__(self, other):
else:
return False

@property
def triggers(self) -> set[type[OOI]]:
if (
isinstance(self.object_type, type)
and get_origin(self.object_type) is None
and issubclass(self.object_type, OOI)
):
return {self.object_type} | self.additional
else:
return self.additional


class NibbleDefinition(BaseModel):
id: str
signature: list[NibbleParameter]
query: str | Callable[[list[Reference | None]], str] | None = None
enabled: bool = True
additional: set[type[OOI]] = set()
_payload: MethodType | None = None
_checksum: str | None = None

Expand All @@ -53,6 +66,10 @@ def __hash__(self):
def _ini(self) -> dict[str, Any]:
return {"id": self.id, "enabled": self.enabled, "checksum": self._checksum}

@property
def triggers(self) -> set[type[OOI]]:
return set.union(*[sgn.triggers for sgn in self.signature]) | self.additional


def get_nibble_definitions() -> dict[str, NibbleDefinition]:
nibble_definitions = {}
Expand Down
2 changes: 1 addition & 1 deletion octopoes/nibbles/disallowed_csp_hostnames/nibble.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def query(targets: list[Reference | None]) -> str:


NIBBLE = NibbleDefinition(
id="disallowed-csp-hostnames",
id="disallowed_csp_hostnames",
signature=[
NibbleParameter(object_type=HTTPHeaderHostname, parser="[*][?object_type == 'HTTPHeaderHostname'][]"),
NibbleParameter(object_type=Config, parser="[*][?object_type == 'Config'][]", optional=True),
Expand Down
19 changes: 19 additions & 0 deletions octopoes/nibbles/https_availability/https_availability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def nibble(ipaddress: IPAddress, ipport80: IPPort, website: Website, port443s: int) -> Iterator[OOI]:
_ = ipaddress
_ = ipport80
if port443s < 1:
ft = KATFindingType(id="KAT-HTTPS-NOT-AVAILABLE")
yield ft
yield Finding(
ooi=website.reference,
finding_type=ft.reference,
description="HTTP port is open, but HTTPS port is not open",
)
66 changes: 66 additions & 0 deletions octopoes/nibbles/https_availability/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [(pull ?ipaddress [*]) (pull ?ipport80 [*]) (pull ?website [*]) (- (count ?ipport443) 1)]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?website :Website/ip_service ?ip_service]
[?ipservice :IPService/ip_port ?ipport80]
[?ipport80 :IPPort/port 80]
[?ipport80 :IPPort/address ?ipaddress]
(or
(and [?ipport443 :IPPort/address ?ipaddress][?ipport443 :IPPort/port 443])
[(identity nil) ?ipport443]
)
"""
]

ref_queries = [
f'[?ipaddress :IPAddress/primary_key "{str(targets[0])}"]',
f'[?ipport80 :IPPort/primary_key "{str(targets[1])}"]',
f'[?website :Website/primary_key "{str(targets[2])}"]',
]

sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
if sgn == "1000":
return pull(ref_queries[0:1] + base_query)
elif sgn == "0100":
if int(str(targets[1]).split("|")[-1]) == 80:
return pull(ref_queries[1:2] + base_query)
else:
return pull(base_query)
elif sgn == "0010":
return pull(ref_queries[2:3] + base_query)
elif sgn == "1110":
return pull(ref_queries + base_query)
else:
return pull(base_query)


NIBBLE = NibbleDefinition(
id="https_availability",
signature=[
NibbleParameter(
object_type=IPAddress, parser="[*][?object_type == 'IPAddressV6' || object_type == 'IPAddressV4'][]"
),
NibbleParameter(object_type=IPPort, parser="[*][?object_type == 'IPPort'][]"),
NibbleParameter(object_type=Website, parser="[*][?object_type == 'Website'][]"),
NibbleParameter(object_type=int, parser="[*][-1][]"),
],
query=query,
)
18 changes: 18 additions & 0 deletions octopoes/nibbles/internetnl/internetnl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.findings import Finding, KATFindingType


def nibble(hostname: Hostname, findings: list[Finding]) -> Iterator[OOI]:
result = "\n".join([str(finding.description) for finding in findings])

if result:
ft = KATFindingType(id="KAT-INTERNETNL")
yield ft
yield Finding(
finding_type=ft.reference,
ooi=hostname.reference,
description=f"This hostname has at least one website with the following finding(s): {result}",
)
104 changes: 104 additions & 0 deletions octopoes/nibbles/internetnl/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.dns.zone import Hostname, Network
from octopoes.models.ooi.findings import Finding

finding_types = [
"KAT-WEBSERVER-NO-IPV6",
"KAT-NAMESERVER-NO-TWO-IPV6",
"KAT-NO-DNSSEC",
"KAT-INVALID-DNSSEC",
"KAT-NO-HSTS",
"KAT-NO-CSP",
"KAT-NO-X-FRAME-OPTIONS",
"KAT-NO-X-CONTENT-TYPE-OPTIONS",
"KAT-CSP-VULNERABILITIES",
"KAT-HSTS-VULNERABILITIES",
"KAT-NO-CERTIFICATE",
"KAT-HTTPS-NOT-AVAILABLE",
"KAT-SSL-CERT-HOSTNAME-MISMATCH",
"KAT-HTTPS-REDIRECT",
]


def or_finding_types() -> str:
clauses = "".join([f'[?finding :Finding/finding_type "{ft}"]' for ft in finding_types])
return f"(or {clauses})"


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [
(pull ?hostname [*])
(pull ?website [*])
(pull ?finding [*])
]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?hostname :object_type "Hostname"]
[?website :Website/hostname ?hostname]
(or-join [?finding ?hostname ?website]
[?finding :Finding/ooi ?hostname]
(and
[?hostnamehttpurl :HostnameHTTPURL/netloc ?hostname]
[?finding :Finding/ooi ?hostnamehttpurl]
)
[?finding :Finding/ooi ?website]
(and
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?resource]
)
(and
[?header :HTTPHeader/resource ?resource]
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?header]
)
)
"""
]

null_query = '{:query {:find [(pull ?var [])] :where [[?var :object_type ""]]}}'
sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
ref_query = ["[?hostname :Hostname/primary_key]"]
if sgn.startswith("1"):
ref_query = [f'[?hostname :Hostname/primary_key "{str(targets[0])}"]']
elif sgn.endswith("1"):
ref = str(targets[1]).split("|")
if ref[-1] == "KAT-INTERNETNL":
return null_query
else:
tokens = ref[1:-1]
target_reference = Reference.from_str("|".join(tokens))
if tokens[0] == "Hostname":
hostname = target_reference.tokenized
elif tokens[0] in {"HostnameHTTPURL", "Website"}:
hostname = target_reference.tokenized.hostname
elif tokens[0] == "HTTPResource":
hostname = target_reference.tokenized.website.hostname
elif tokens[0] == "HTTPHeader":
hostname = target_reference.tokenized.resource.website.hostname
else:
return null_query
hostname_pk = Hostname(name=hostname.name, network=Network(name=hostname.network.name).reference).reference
ref_query = [f'[?hostname :Hostname/primary_key "{str(hostname_pk)}"]']
return pull(ref_query + base_query)


NIBBLE = NibbleDefinition(
id="internet_nl",
signature=[
NibbleParameter(object_type=Hostname, parser="[*][?object_type == 'Hostname'][]"),
NibbleParameter(object_type=list[Finding], parser="[[*][?object_type == 'Finding'][]]", additional={Finding}),
],
query=query,
)
Loading
Loading