forked from LukasForst/fides
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmodule.py
149 lines (124 loc) · 6.92 KB
/
module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import json
import sys
from dataclasses import asdict
from multiprocessing import Process
from fides.messaging.message_handler import MessageHandler
from fides.messaging.network_bridge import NetworkBridge
from fides.model.configuration import load_configuration
from fides.model.threat_intelligence import SlipsThreatIntelligence
from fides.protocols.alert import AlertProtocol
from fides.protocols.initial_trusl import InitialTrustProtocol
from fides.protocols.opinion import OpinionAggregator
from fides.protocols.peer_list import PeerListUpdateProtocol
from fides.protocols.recommendation import RecommendationProtocol
from fides.protocols.threat_intelligence import ThreatIntelligenceProtocol
from fides.utils.logger import LoggerPrintCallbacks, Logger
from slips.messaging.queue import RedisQueue, RedisSimplexQueue
from slips.originals.abstracts import Module
from slips.originals.database import __database__
from slips.persistance.threat_intelligence import SlipsThreatIntelligenceDatabase
from slips.persistance.trust import SlipsTrustDatabase
logger = Logger("SlipsFidesModule")
class SlipsFidesModule(Module, Process):
# Name: short name of the module. Do not use spaces
name = 'GlobalP2P'
description = 'Global p2p Threat Intelligence Sharing Module'
authors = ['Lukas Forst', 'Martin Repa']
def __init__(self, output_queue, slips_conf):
Process.__init__(self)
self.__output = output_queue
# TODO: [S+] add path to trust model configuration yaml to the slips conf
self.__slips_config = slips_conf
# connect to slips database
__database__.start(slips_conf)
# now setup logging
LoggerPrintCallbacks.clear()
LoggerPrintCallbacks.append(self.__format_and_print)
# load trust model configuration
self.__trust_model_config = load_configuration(self.__slips_config.trust_model_path)
# prepare variables for global protocols
self.__bridge: NetworkBridge
self.__intelligence: ThreatIntelligenceProtocol
self.__alerts: AlertProtocol
self.__slips_fides: RedisQueue
def __setup_trust_model(self):
r = __database__.r
# TODO: [S] launch network layer binary if necessary
# create database wrappers for Slips using Redis
trust_db = SlipsTrustDatabase(self.__trust_model_config, r)
ti_db = SlipsThreatIntelligenceDatabase(self.__trust_model_config, r)
# create queues
# TODO: [S] check if we need to use duplex or simplex queue for communication with network module
network_fides_queue = RedisSimplexQueue(r, send_channel='fides2network', received_channel='network2fides')
slips_fides_queue = RedisSimplexQueue(r, send_channel='fides2slips', received_channel='slips2fides')
bridge = NetworkBridge(network_fides_queue)
recommendations = RecommendationProtocol(self.__trust_model_config, trust_db, bridge)
trust = InitialTrustProtocol(trust_db, self.__trust_model_config, recommendations)
peer_list = PeerListUpdateProtocol(trust_db, bridge, recommendations, trust)
opinion = OpinionAggregator(self.__trust_model_config, ti_db, self.__trust_model_config.ti_aggregation_strategy)
intelligence = ThreatIntelligenceProtocol(trust_db, ti_db, bridge, self.__trust_model_config, opinion, trust,
self.__slips_config.interaction_evaluation_strategy,
self.__network_opinion_callback)
alert = AlertProtocol(trust_db, bridge, trust, self.__trust_model_config, opinion,
self.__network_opinion_callback)
# TODO: [S+] add on_unknown and on_error handlers if necessary
message_handler = MessageHandler(
on_peer_list_update=peer_list.handle_peer_list_updated,
on_recommendation_request=recommendations.handle_recommendation_request,
on_recommendation_response=recommendations.handle_recommendation_response,
on_alert=alert.handle_alert,
on_intelligence_request=intelligence.handle_intelligence_request,
on_intelligence_response=intelligence.handle_intelligence_response,
on_unknown=None,
on_error=None
)
# bind local vars
self.__bridge = bridge
self.__intelligence = intelligence
self.__alerts = alert
self.__slips_fides = slips_fides_queue
# and finally execute listener
self.__bridge.listen(message_handler, block=False)
def __network_opinion_callback(self, ti: SlipsThreatIntelligence):
"""This is executed every time when trust model was able to create an aggregated network opinion."""
logger.info(f'Callback: Target: {ti.target}, Score: {ti.score}, Confidence: {ti.confidence}.')
# TODO: [S+] document that we're sending this type
self.__slips_fides.send(json.dumps(asdict(ti)))
def __format_and_print(self, level: str, msg: str):
# TODO: [S+] determine correct level for trust model log levels
self.__output.put(f"33|{self.name}|{level} {msg}")
def run(self):
# as a first thing we need to set up all dependencies and bind listeners
self.__setup_trust_model()
# main loop for handling data coming from Slips
while True:
try:
message = self.__slips_fides.get_message(timeout_seconds=0.1)
# if there's no string data message we can continue in waiting
if not message \
or not message['data'] \
or type(message['data']) != str:
continue
# handle case when the Slips decide to stop the process
if message['data'] == 'stop_process':
# Confirm that the module is done processing
__database__.publish('finished_modules', self.name)
return True
data = json.loads(message['data'])
# TODO: [S+] document that we need this structure
# data types
if data['type'] == 'alert':
self.__alerts.dispatch_alert(target=data['target'],
confidence=data['confidence'],
score=data['score'])
elif data['type'] == 'intelligence_request':
self.__intelligence.request_data(target=data['target'])
else:
logger.warn(f"Unhandled message! {message['data']}", message)
except KeyboardInterrupt:
# On KeyboardInterrupt, slips.py sends a stop_process msg to all modules, so continue to receive it
continue
except Exception as ex:
exception_line = sys.exc_info()[2].tb_lineno
logger.error(f'Problem on the run() line {exception_line}, {ex}.')
return True