From 9d2097516d8f5f0da3c908b04e8e8f9bd02bf6ab Mon Sep 17 00:00:00 2001 From: alya Date: Wed, 7 Aug 2024 14:36:23 +0300 Subject: [PATCH] ground_truth: print the line + the line number when an exception occurs --- parsers/ground_truth.py | 45 +++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/parsers/ground_truth.py b/parsers/ground_truth.py index 0c1071f..e04de5c 100644 --- a/parsers/ground_truth.py +++ b/parsers/ground_truth.py @@ -2,7 +2,12 @@ import traceback import utils.timestamp_handler -from typing import Tuple, Dict, List +from typing import ( + Tuple, + Dict, + List, + Optional, + ) from re import findall from parsers.config import ConfigurationParser from utils.timewindow_handler import TimewindowHandler @@ -123,21 +128,28 @@ def handle_icmp(self, flow: dict) -> dict: return flow - def get_flow(self, line): + def get_flow(self, line) -> Optional[dict]: """ given a tab or json line, extracts the src and dst addr, sport and proto from the line :param line: is a str if the type of given file is tab separated, or a dict if it's json - :return: dict with {'saddr', 'sport':.. , 'daddr', 'proto'} + :return: dict with {'saddr', 'sport':.. , 'daddr', 'proto'} or + None when there's a problem extracting flow """ + if len(line) < 5: + # invalid line + return + if self.zeek_file_type == 'json': flow: dict = self.extract_json_fields(line) elif self.zeek_file_type == 'tab-separated': flow: dict = self.extract_tab_fields(line) - + else: + return + if not flow: - return False + return flow = self.handle_icmp(flow) return flow @@ -192,17 +204,18 @@ def handle_zeek_json(self, line:str) -> Tuple[str,str,str,str]: return label, aid, line['ts'], line['id.orig_h'] - def handle_getting_aid(self, line: list): + def handle_getting_aid(self, line: list) -> Optional[str]: # first extract fields if flow := self.get_flow(line): # we managed to extract the fields needed to calc the community id return self.hash.get_aid(flow) - return False + return - def handle_zeek_tabs(self, line:str) -> Tuple[str,str,str,str]: + def handle_zeek_tabs(self, line:str) -> Optional[Tuple[str,str,str,str]]: """ :param line: tab separated line as read from the zeek file - :return: returns a tuple of label, aid ts and srcip + :return: returns a tuple of label, aid ts and srcip or None if + unable to extract the flow """ label = self.extract_label_from_line(line) self.update_labels_ctr(label) @@ -216,11 +229,11 @@ def handle_zeek_tabs(self, line:str) -> Tuple[str,str,str,str]: aid = self.handle_getting_aid(line) if not aid: - return False + return return label, aid, line[0], line[2] - def extract_fields(self, line: str) -> dict: + def extract_fields(self, line: str) -> Optional[dict]: """ extracts the label and community id from the given line uses zeek_file_type to extract fields based on the type of the given zeek dir @@ -241,8 +254,8 @@ def extract_fields(self, line: str) -> dict: 'srcip': flow[3], } except (IndexError, TypeError): - # one of the above 2 methods returned an invalid line! - return False + # one of the above 2 methods failed to parse the given line + return def register_timewindow(self, ts) -> dict: @@ -360,13 +373,19 @@ def parse_file(self, filename: str): fullpath = self.get_full_path(filename) self.total_flows_read = 0 gt_file = open(fullpath) + line_number = 0 while line := gt_file.readline(): + line_number += 1 # skip comments if line.startswith('#'): continue flow = self.extract_fields(line) if not flow: + self.log(f"Problem extracting flow " + f"from line number {line_number}: ", + line, + error=True) continue tw_registration_stats: dict = self.register_timewindow(