Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use prefix instead of different ports in redis database - incomplete #309

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2f486fe
Modified redis manager to use prefix as id instead of port
A-atmos Apr 14, 2023
713d698
Modified to use id as prefix instead of port
A-atmos Apr 14, 2023
ff351d6
Changed database to publish and store keys with prefix
A-atmos Apr 14, 2023
7ce3260
Changed modules to use prefix instead of port
A-atmos Apr 14, 2023
9cd2573
Remove keys from the default database instead of killing the server
A-atmos Apr 14, 2023
b5ae49d
Argument to use custom prefix instead of self generated
A-atmos Apr 14, 2023
885f5f9
Set prefix on database before processing
A-atmos Apr 14, 2023
2e8a949
Modified database tests to use prefix
A-atmos Apr 14, 2023
52d199e
Modified redis manager to use prefix as id instead of port
A-atmos Apr 14, 2023
0c71d95
Modified to use id as prefix instead of port
A-atmos Apr 14, 2023
9702263
Changed database to publish and store keys with prefix
A-atmos Apr 14, 2023
0802642
Changed modules to use prefix instead of port
A-atmos Apr 14, 2023
e416731
Remove keys from the default database instead of killing the server
A-atmos Apr 14, 2023
a377c97
Argument to use custom prefix instead of self generated
A-atmos Apr 14, 2023
8e890d2
Set prefix on database before processing
A-atmos Apr 14, 2023
c48096c
Modified database tests to use prefix
A-atmos Apr 14, 2023
32a066f
Merge remote-tracking branch 'A-atmos/enhance-redis-database' into a-…
AlyaGomaa Apr 14, 2023
6f059e0
Changed the is_msg_intended_for
A-atmos Apr 18, 2023
34ca085
Refactored modules according to the changes made
A-atmos Apr 18, 2023
ad9555b
Merge branch 'develop' into enhance-redis-database
A-atmos Apr 18, 2023
7677d7a
Resolved error due to merge conflict
A-atmos Apr 18, 2023
f4361a5
Resolved errors in ARP
A-atmos Apr 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def check_given_flags(self):

# kill all open unused redis servers if the parameter was included
if self.main.args.killall:
self.main.redis_man.close_open_redis_servers()
self.main.redis_man.close_open_redis_id()
self.main.terminate_slips()

if self.main.args.version:
Expand Down Expand Up @@ -147,7 +147,7 @@ def clear_redis_cache(self):
self.main.redis_man.clear_redis_cache_database()
self.main.input_information = ''
self.main.zeek_folder = ''
self.main.log_redis_server_PID(6379, self.main.redis_man.get_pid_of_redis_server(6379))
# self.main.log_redis_server_PID(6379, self.main.redis_man.get_pid_of_redis_server(6379))
self.main.terminate_slips()

def check_output_redirection(self) -> tuple:
Expand Down
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import os, sys, inspect
from multiprocessing import Queue

import uuid

# add parent dir to path for imports to work
current_dir = os.path.dirname(
Expand Down Expand Up @@ -47,7 +47,7 @@ def profilerQueue():
@pytest.fixture
def database(outputQueue):
from slips_files.core.database.database import __database__
__database__.start(1234)
__database__.start(str(uuid.uuid4()))
__database__.outputqueue = outputQueue
__database__.print = do_nothing
return __database__
2 changes: 1 addition & 1 deletion modules/CESNET/CESNET.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def run(self):

# in case of an interface or a file, push every time we get an alert
if (
utils.is_msg_intended_for(message, 'export_evidence')
__database__.is_msg_intended_for(message, 'export_evidence')
and self.send_to_warden
):
evidence = json.loads(message['data'])
Expand Down
2 changes: 1 addition & 1 deletion modules/RiskIQ/RiskIQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_ip'):
if __database__.is_msg_intended_for(message, 'new_ip'):
ip = message['data']
if utils.is_ignored_ip(ip):
continue
Expand Down
11 changes: 7 additions & 4 deletions modules/arp/arp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ class Module(Module, multiprocessing.Process):
description = 'Detect arp attacks'
authors = ['Alya Gomaa']

def __init__(self, outputqueue, redis_port):
def __init__(self, outputqueue, prefix:str, redis_port='6379'):
multiprocessing.Process.__init__(self)
# All the printing output should be sent to the outputqueue.
# The outputqueue is connected to another process called OutputProcess
self.outputqueue = outputqueue
# Start the DB
__database__.start(redis_port)
__database__.start(prefix, redis_port)
self.c1 = __database__.subscribe('new_arp')
self.c2 = __database__.subscribe('tw_closed')
self.read_configuration()
Expand Down Expand Up @@ -397,17 +397,20 @@ def run(self):
self.arp_ts = time.time()

message = __database__.get_message(self.c1)

if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_arp'):

if __database__.is_msg_intended_for(message, 'new_arp'):
flow_details = json.loads(message['data'])
profileid = flow_details['profileid']
twid = flow_details['twid']
# this is the actual arp flow
flow: dict = flow_details['flow']
ts = flow['starttime']

daddr = flow['daddr']
saddr = flow['saddr']
dst_mac = flow['dmac']
Expand Down Expand Up @@ -456,7 +459,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'tw_closed'):
if __database__.is_msg_intended_for(message, 'tw_closed'):
profileid_tw = message['data']
# when a tw is closed, this means that it's too old so we don't check for arp scan in this time
# range anymore
Expand Down
2 changes: 1 addition & 1 deletion modules/blocking/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def run(self):
self.shutdown_gracefully()
return True
# There's an IP that needs to be blocked
if utils.is_msg_intended_for(message, 'new_blocking'):
if __database__.is_msg_intended_for(message, 'new_blocking'):
# message['data'] in the new_blocking channel is a dictionary that contains
# the ip and the blocking options
# Example of the data dictionary to block or unblock an ip:
Expand Down
2 changes: 1 addition & 1 deletion modules/exporting_alerts/exporting_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(msg, 'export_evidence'):
if __database__.is_msg_intended_for(msg, 'export_evidence'):
evidence = json.loads(msg['data'])
description = evidence['description']
if 'slack' in self.export_to and hasattr(self, 'BOT_TOKEN'):
Expand Down
31 changes: 20 additions & 11 deletions modules/flowalerts/flowalerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_flow'):
if __database__.is_msg_intended_for(message, 'new_flow'):
new_flow = json.loads(message['data'])
profileid = new_flow['profileid']
twid = new_flow['twid']
Expand Down Expand Up @@ -1885,7 +1885,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_ssh'):
if __database__.is_msg_intended_for(message, 'new_ssh'):
data = message['data']
data = json.loads(data)
profileid = data['profileid']
Expand Down Expand Up @@ -1922,7 +1922,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_notice'):
if __database__.is_msg_intended_for(message, 'new_notice'):
data = message['data']
# Convert from json to dict
data = json.loads(data)
Expand Down Expand Up @@ -1978,7 +1978,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_ssl'):
if __database__.is_msg_intended_for(message, 'new_ssl'):
# Check for self signed certificates in new_ssl channel (ssl.log)
data = message['data']
# Convert from json to dict
Expand Down Expand Up @@ -2037,11 +2037,17 @@ def run(self):


message = __database__.get_message(self.c5)
# if message and 'stop_process' in message['data']:

if message and message['data'] == 'stop_process':
with open('/home/ac/Desktop/workspace/message.txt', 'w') as file:
file.write(message['data'])
file.close()

self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'tw_closed'):
if __database__.is_msg_intended_for(message, 'tw_closed'):
profileid_tw = message['data'].split('_')
profileid, twid = f'{profileid_tw[0]}_{profileid_tw[1]}', profileid_tw[-1]
self.detect_data_upload_in_twid(profileid, twid)
Expand All @@ -2051,7 +2057,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_dns_flow'):
if __database__.is_msg_intended_for(message, 'new_dns_flow'):
data = json.loads(message['data'])
profileid = data['profileid']
twid = data['twid']
Expand Down Expand Up @@ -2094,7 +2100,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_downloaded_file'):
if __database__.is_msg_intended_for(message, 'new_downloaded_file'):
ssl_info = json.loads(message['data'])
self.check_malicious_ssl(ssl_info)

Expand All @@ -2103,12 +2109,14 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_smtp'):

if __database__.is_msg_intended_for(message, 'new_smtp'):
smtp_info = json.loads(message['data'])
profileid = smtp_info['profileid']
twid = smtp_info['twid']
flow: dict = smtp_info['flow']


self.check_smtp_bruteforce(
profileid,
twid,
Expand All @@ -2120,7 +2128,8 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_software'):

if __database__.is_msg_intended_for(message, 'new_software'):
msg = json.loads(message['data'])
flow:dict = msg['sw_flow']
twid = msg['twid']
Expand All @@ -2140,7 +2149,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_weird'):
if __database__.is_msg_intended_for(message, 'new_weird'):
msg = json.loads(message['data'])
self.check_weird_http_method(msg)

Expand All @@ -2149,7 +2158,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_tunnel'):
if __database__.is_msg_intended_for(message, 'new_tunnel'):
msg = json.loads(message['data'])
self.check_GRE_tunnel(msg)
except KeyboardInterrupt:
Expand Down
3 changes: 1 addition & 2 deletions modules/flowmldetection/flowmldetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_flow'):
if __database__.is_msg_intended_for(message, 'new_flow'):
data = message['data']
# Convert from json to dict
data = json.loads(data)
Expand Down Expand Up @@ -441,7 +441,6 @@ def run(self):
# Predict
pred = self.detect()
label = self.flow_dict['label']

# Report
if (
label
Expand Down
2 changes: 1 addition & 1 deletion modules/http_analyzer/http_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_http'):
if __database__.is_msg_intended_for(message, 'new_http'):
message = json.loads(message['data'])
profileid = message['profileid']
twid = message['twid']
Expand Down
6 changes: 3 additions & 3 deletions modules/ip_info/ip_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True
if utils.is_msg_intended_for(message, 'new_MAC'):
if __database__.is_msg_intended_for(message, 'new_MAC'):
data = json.loads(message['data'])
mac_addr = data['MAC']
host_name = data.get('host_name', False)
Expand All @@ -509,7 +509,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_dns_flow'):
if __database__.is_msg_intended_for(message, 'new_dns_flow'):
data = message['data']
data = json.loads(data)
# profileid = data['profileid']
Expand All @@ -528,7 +528,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_ip'):
if __database__.is_msg_intended_for(message, 'new_ip'):
# Get the IP from the message
ip = message['data']
try:
Expand Down
6 changes: 3 additions & 3 deletions modules/network_discovery/network_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'tw_modified'):
if __database__.is_msg_intended_for(message, 'tw_modified'):
# Get the profileid and twid
profileid = message['data'].split(':')[0]
twid = message['data'].split(':')[1]
Expand Down Expand Up @@ -756,7 +756,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_notice'):
if __database__.is_msg_intended_for(message, 'new_notice'):
data = message['data']
if type(data) != str:
continue
Expand All @@ -781,7 +781,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_dhcp'):
if __database__.is_msg_intended_for(message, 'new_dhcp'):
flow = json.loads(message['data'])
self.check_dhcp_scan(flow)

Expand Down
6 changes: 3 additions & 3 deletions modules/p2ptrust/p2ptrust.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,23 +635,23 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'report_to_peers'):
if __database__.is_msg_intended_for(message, 'report_to_peers'):
self.new_evidence_callback(message)

message = __database__.get_message(self.c2)
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, self.p2p_data_request_channel):
if __database__.is_msg_intended_for(message, self.p2p_data_request_channel):
self.data_request_callback(message)

message = __database__.get_message(self.c3)
if message and message['data'] == 'stop_process':
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, self.gopy_channel):
if __database__.is_msg_intended_for(message, self.gopy_channel):
self.gopy_callback(message)

ret_code = self.pigeon.poll()
Expand Down
2 changes: 1 addition & 1 deletion modules/rnn-cc-detection/rnn-cc-detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def run(self, model_file='modules/rnn-cc-detection/rnn_model.h5'):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_letters'):
if __database__.is_msg_intended_for(message, 'new_letters'):
data = message['data']
data = json.loads(data)
pre_behavioral_model = data['new_symbol']
Expand Down
4 changes: 2 additions & 2 deletions modules/threat_intelligence/threat_intelligence.py
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ def run(self):
self.should_shutdown = True

# The channel now can receive an IP address or a domain name
if utils.is_msg_intended_for(
if __database__.is_msg_intended_for(
message, 'give_threat_intelligence'
):
# Data is sent in the channel as a json dict so we need to deserialize it first
Expand Down Expand Up @@ -1013,7 +1013,7 @@ def run(self):
if message and message['data'] == 'stop_process':
self.should_shutdown = True

if utils.is_msg_intended_for(message, 'new_downloaded_file'):
if __database__.is_msg_intended_for(message, 'new_downloaded_file'):
file_info = json.loads(message['data'])
if file_info['type'] == 'zeek':
self.is_malicious_hash(file_info)
Expand Down
2 changes: 1 addition & 1 deletion modules/timeline/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def run(self):
self.shutdown_gracefully()
return True

if utils.is_msg_intended_for(message, 'new_flow'):
if __database__.is_msg_intended_for(message, 'new_flow'):
mdata = message['data']
# Convert from json to dict
mdata = json.loads(mdata)
Expand Down
2 changes: 1 addition & 1 deletion modules/update_manager/update_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def run(self):
try:
message = __database__.get_message(self.c1)
# Check that the message is for you. Probably unnecessary...
if message and message['data'] == 'stop_process':
if message and ('stop_process' in message['data']):
self.shutdown_gracefully()
return True

Expand Down
Loading