-
-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactored config path retrieval, removed deprecated files, and impro… #211
Conversation
…ved readability Reworked how Get.py gets the path of the config.ini (for Flag.py) - Now it gets it perfectly using proper logic and not depending on execution script path, Removed init files for IDEA Removed deprecated feature _generate_data.py Minor Updates for FileManagement.py and config.ini for readability purposes Allowing _test_gpu_acceleration.py to be imported by function check_gpu(), this although is experimental and is still planned to be removed in the future! Signed-off-by: Shahm Najeeb <[email protected]>
Implemented a function `get_network_info()` that collects various network statistics using the psutil library. Network data is saved to files including interface stats, connections, interfaces' IP addresses, and bandwidth usage. Executed external network commands (e.g., `ipconfig`) and saved output. Added process information for network connections with corresponding process names and PIDs. Included logging for debugging and error handling to track the process flow. Used a custom save function to store collected data in the "network_data" directory. Delayed dump_memory.py TODO to v3.4.1 Signed-off-by: Shahm Najeeb <[email protected]>
Implemented basic caching to speed it up Signed-off-by: Shahm Najeeb <[email protected]>
Signed-off-by: Shahm Najeeb <[email protected]>
…a reminder for me :>
Warning Rate limit exceeded@DefinetlyNotAI has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 5 minutes and 37 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (4)
WalkthroughThis update refactors various modules across the codebase. Many functions now return values instead of printing directly, and deprecated code has been removed. A unified logging approach is applied by replacing explicit instantiations of Changes
Sequence Diagram(s)sequenceDiagram
participant U as User
participant NI as NetworkInfo
participant FS as Internal Fetchers
U->>NI: Call get()
NI->>FS: __fetch_network_io_stats()
NI->>FS: __fetch_network_connections()
NI->>FS: __fetch_network_interface_addresses()
NI->>FS: __measure_network_bandwidth_usage()
NI->>FS: __fetch_hostname_and_ip()
NI->>U: Return collected network data
sequenceDiagram
participant U as User
participant VS as VulnScan
participant SDS as SensitiveDataScanner
participant Exec as ThreadPoolExecutor
U->>VS: scan_directory(scan_paths)
VS->>SDS: load_model() & load_vectorizer()
SDS-->>VS: Model & vectorizer cached
VS->>Exec: Submit scan_file tasks
Exec->>SDS: is_sensitive(file_content)
SDS-->>Exec: Return sensitivity result
Exec-->>VS: Aggregate scan results
VS->>U: Return aggregated scan outcomes
Suggested labels
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 1
🧹 Nitpick comments (10)
CODE/logicytics/Get.py (1)
57-68
: Don't bail too fast withexit(1)
Right now, ifconfig.ini
isn’t found, we just print a message and quit. That works, but it might be nicer to raise an exception or use logging so upstream code can handle it.CODE/vulnscan.py (3)
39-70
: Awesome model-loading with a fallback
Your code checks for.pkl
,.safetensors
, and.pth
. The caching logic is super helpful. Just watch out for possible version mismatches in the future, but for now, it’s all good.
120-132
: Thread locks FTW
Syncing model loads helps avoid weird race conditions. Sweet design, but maybe add a quick comment in the code on why the locks are needed for people new to concurrency.
233-233
: Heads up about memory
Telling users that loading files can be slow or huge is helpful. Maybe also mention possible memory-based exceptions if things get too wild.CODE/network_psutil.py (3)
13-16
: Add error handling to catch file write issues.What if the disk is full or you don't have write permissions? Let's catch those errors!
def __save_data(filename: str, data: str, father_dir_name: str = "network_data"): os.makedirs(father_dir_name, exist_ok=True) - with open(os.path.join(father_dir_name, filename), "w") as f: - f.write(data) + try: + with open(os.path.join(father_dir_name, filename), "w") as f: + f.write(data) + except IOError as e: + log.error(f"Failed to save {filename}: {e}")
78-85
: The bandwidth measurement could be more accurate.Using
sleep(1)
for bandwidth measurement isn't super reliable because:
- It only measures for 1 second (too short!)
- Other processes might affect the timing
- Network spikes could give weird results
log.debug("Measuring network bandwidth usage...") -net1 = psutil.net_io_counters() -time.sleep(1) -net2 = psutil.net_io_counters() -bandwidth_data = f"Upload Speed: {(net2.bytes_sent - net1.bytes_sent) / 1024} KB/s\n" -bandwidth_data += f"Download Speed: {(net2.bytes_recv - net1.bytes_recv) / 1024} KB/s\n" +# Take multiple samples over 5 seconds +samples = [] +for _ in range(5): + net1 = psutil.net_io_counters() + time.sleep(1) + net2 = psutil.net_io_counters() + samples.append({ + 'up': (net2.bytes_sent - net1.bytes_sent) / 1024, + 'down': (net2.bytes_recv - net1.bytes_recv) / 1024 + }) + +# Calculate average speeds +avg_up = sum(s['up'] for s in samples) / len(samples) +avg_down = sum(s['down'] for s in samples) / len(samples) +bandwidth_data = f"Average Upload Speed: {avg_up:.2f} KB/s\n" +bandwidth_data += f"Average Download Speed: {avg_down:.2f} KB/s\n"
87-91
: Add IPv6 support and error handling for DNS resolution.The current code only gets IPv4 addresses and might break if DNS resolution fails.
-hostname = socket.gethostname() -ip_address = socket.gethostbyname(hostname) -ip_config_data = f"Hostname: {hostname}\nIP Address: {ip_address}\n" +try: + hostname = socket.gethostname() + # Get both IPv4 and IPv6 addresses + ip_addresses = [] + for res in socket.getaddrinfo(hostname, None): + ip = res[4][0] + if ip not in ip_addresses: # Avoid duplicates + ip_addresses.append(ip) + + ip_config_data = f"Hostname: {hostname}\n" + ip_config_data += "IP Addresses:\n" + for ip in ip_addresses: + ip_config_data += f" - {ip}\n" +except socket.gaierror as e: + log.error(f"Failed to resolve hostname: {e}") + ip_config_data = f"Hostname: {hostname}\nFailed to resolve IP addresses\n"CODE/logicytics/FileManagement.py (2)
130-147
: Make error handling more specific.You're catching all exceptions with a generic handler. Let's be more specific about what can go wrong!
- except Exception as e: - return f"Error: {e}" + except PermissionError as e: + return f"Permission denied: {e}" + except FileNotFoundError as e: + return f"File not found: {e}" + except Exception as e: + return f"Unexpected error: {e}"
206-207
: Use a more descriptive variable name than 'time'.'time' is a bit vague. Let's make it clearer what this represents!
-time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') -filename = f"Logicytics_{name}_{flag}_{time}" +timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') +filename = f"Logicytics_{name}_{flag}_{timestamp}"CODE/config.ini (1)
22-22
: That's one looong line! Let's make it easier to read.The files list is getting super long. Would be easier to read if we split it into multiple lines!
-files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, network_psutil.py, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, logicytics\__init__.py, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl, VulnScan\tools\_study_network.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py" +files = """ + bluetooth_details.py, + bluetooth_logger.py, + browser_miner.ps1, + cmd_commands.py, + config.ini, + # ... (continue with one file per line) + VulnScan\v3\_train.py +"""
🛑 Comments failed to post (1)
CODE/network_psutil.py (1)
9-10:
⚠️ Potential issueHey! Move the log initialization outside the
if __name__ == "__main__"
block.The log initialization should be at the module level since it's used by the
get_network_info
function. Right now, it'll raise a NameError if someone imports and uses the function.-if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +log = Log({"log_level": DEBUG})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.log = Log({"log_level": DEBUG})
…gleton pattern across multiple scripts Minor changes in some scripts to make it simpler and way more DRY Made log in __init__.py rather than being forced to be imported, this thus simplifies development, while also still allowing more experienced people set their own log, although variable MUST be different named for dev purposes, also utilised a __new__ check to make sure the log var is only init once!! Also, the reset method makes sure to reset previous configurations Deleted a path that is not needed in browser_miner.ps1 Logger.py sets bold_red rather than red for critical errors Logic fix: _debug.py now only shows "All required files are present." if no extra/missing files Signed-off-by: Shahm Najeeb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 3
🔭 Outside diff range comments (7)
CODE/registry.py (1)
23-23
:⚠️ Potential issueOops! There's a bug in the error handler! 🐛
The
result
variable is used in the error message on line 23, but ifsubprocess.run()
fails,result
won't exist yet. This could cause another error to pop up and hide the real problem!Here's how to fix it:
except subprocess.CalledProcessError as e: - log.error(f"Failed to back up the registry: {e}. More details: {result.stderr}") + log.error(f"Failed to back up the registry: {e}. More details: {e.stderr}")CODE/ssh_miner.py (1)
37-41
: 🛠️ Refactor suggestionHey, we should probably add some security checks here!
When dealing with SSH keys (which are super sensitive!), we should:
- Check if the destination directory has safe permissions
- Set restrictive permissions on the backup files
- Maybe encrypt the backups?
Here's how we can make it safer:
try: + # Check destination directory permissions + if os.path.exists(destination_dir): + dir_stat = os.stat(destination_dir) + if dir_stat.st_mode & 0o077: # Check if others have any access + log.warning("Destination directory has unsafe permissions") + shutil.copytree(source_dir, destination_dir) + # Set restrictive permissions on backup + os.chmod(destination_dir, 0o700) # Only owner can access log.info("SSH keys and configuration backed up successfully.")CODE/log_miner.py (1)
31-31
:⚠️ Potential issueWatch out! This could be dangerous! 🚨
The way we're building the PowerShell command could let bad stuff happen if someone messes with the log_type or backup_file variables.
Let's make it safer:
- cmd = f'Get-EventLog -LogName "{log_type}" | Export-Csv -Path "{backup_file}" -NoTypeInformation' + # Validate log_type + valid_log_types = ["System", "Application", "Security"] + if log_type not in valid_log_types: + raise ValueError(f"Invalid log type. Must be one of {valid_log_types}") + + # Validate backup_file path + if not backup_file.endswith('.csv') or '/' in backup_file or '\\' in backup_file: + raise ValueError("Invalid backup file name") + + cmd = f'Get-EventLog -LogName "{log_type}" | Export-Csv -Path "{backup_file}" -NoTypeInformation'CODE/sys_internal.py (1)
32-37
:⚠️ Potential issueHold up! We need to be more careful with these commands! 🛡️
Running executables without checking if they exist or validating their paths could be risky.
Here's how to make it safer:
+ # Validate executable path + exe_path = os.path.join('SysInternal_Suite', executable) + if not os.path.isfile(exe_path): + raise FileNotFoundError(f"Executable not found: {exe_path}") + + # Validate executable file permissions + if os.access(exe_path, os.X_OK): - command = f"{os.path.join('SysInternal_Suite', executable)}" + command = f"{exe_path}" + else: + raise PermissionError(f"No execute permission for: {exe_path}") # Execute the command and capture the output result = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=False # Safer execution without shell )CODE/vulnscan.py (1)
389-398
: 🛠️ Refactor suggestionLet's fix that FIXME! 🛠️
The finally block could use some love to make sure we clean up properly.
Here's a better way to handle cleanup:
if __name__ == "__main__": try: packet_sniffer() except Exception as e: log.error(e) finally: - if G: + # Clean up resources + try: + if G: + plt.close('all') # Close all figures + model_cache.clear() # Clear model cache + vectorizer_cache.clear() # Clear vectorizer cache + except Exception as e: + log.error(f"Error during cleanup: {e}") - plt.close()CODE/packet_sniffer.py (1)
389-398
: 🛠️ Refactor suggestionAnother finally block to fix! 🔧
Just like in vulnscan.py, we should make the cleanup more robust.
Here's how to make it better:
if __name__ == "__main__": try: packet_sniffer() except Exception as e: log.error(e) finally: - if G: + try: + if G: + plt.close('all') # Close all figures + # Clean up any open network resources + conf.verb = 0 # Reset verbosity + except Exception as e: + log.error(f"Error during cleanup: {e}") - plt.close()CODE/Logicytics.py (1)
396-399
:⚠️ Potential issueLet's make those system commands safer! 🛡️
Using
subprocess.call
with a string command can be risky, even withshell=False
. We should split the command into a list.Here's the safer way:
if SUB_ACTION == "shutdown": - subprocess.call("shutdown /s /t 3", shell=False) + subprocess.call(["shutdown", "/s", "/t", "3"], shell=False) elif SUB_ACTION == "reboot": - subprocess.call("shutdown /r /t 3", shell=False) + subprocess.call(["shutdown", "/r", "/t", "3"], shell=False)
🧹 Nitpick comments (13)
CODE/logicytics/__init__.py (1)
19-19
: Hey! Let's make this line super simple! 🎯Instead of writing it the long way with
True if ... else False
, you can just compare directly! It's like saying "yes" or "no" instead of "if it's yes then yes else no" 😄Here's the shorter way to write it:
-__show_trace = True if DEBUG == "DEBUG" else False +__show_trace = DEBUG == "DEBUG"🧰 Tools
🪛 Ruff (0.8.2)
19-19: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
CODE/cmd_commands.py (1)
20-21
: Let's make the file handling safer! 🔒Using
with
would be better here to make sure the file gets closed properly, even if something goes wrong.Here's a safer way to write to the file:
output = Execute.command(commands) - open(file, "w", encoding=encoding).write(output) + with open(file, "w", encoding=encoding) as f: + f.write(output)🧰 Tools
🪛 Ruff (0.8.2)
21-21: Use a context manager for opening files
(SIM115)
CODE/wmic.py (2)
19-20
: Let's handle the WMIC.html file more safely! 🔒Just like before, using
with
would be better here to make sure the file gets closed properly.Here's how to fix it:
data = Execute.command("wmic BIOS get Manufacturer,Name,Version /format:htable") - open("WMIC.html", "w").write(data) + with open("WMIC.html", "w") as f: + f.write(data)🧰 Tools
🪛 Ruff (0.8.2)
20-20: Use a context manager for opening files
(SIM115)
28-33
: Make the loop more Pythonic! 🐍Instead of using
range(len())
, we can useenumerate()
to make the code cleaner and more readable.Here's a better way to write the loop:
with open("wmic_output.txt", "w") as file: - for i in range(len(wmic_commands)): - log.info(f"Executing Command Number {i + 1}: {wmic_commands[i]}") - output = Execute.command(wmic_commands[i]) - file.write("-" * 190) - file.write(f"Command {i + 1}: {wmic_commands[i]}\n") - file.write(output) + for i, command in enumerate(wmic_commands, 1): + log.info(f"Executing Command Number {i}: {command}") + output = Execute.command(command) + file.write("-" * 190) + file.write(f"Command {i}: {command}\n") + file.write(output)CODE/dir_list.py (1)
57-61
: Let's make this run faster and safer! 🚀The thread pool setup could be smarter and we should add some safety checks.
Here's how we can improve it:
+ # Calculate optimal thread count based on system and workload + optimal_workers = min(32, (os.cpu_count() or 1) * 2) + + # Validate directory paths + if not os.path.exists(base_directory): + raise ValueError(f"Base directory does not exist: {base_directory}") + - with ThreadPoolExecutor(max_workers=min(32, os.cpu_count() * 4)) as executor: + with ThreadPoolExecutor(max_workers=optimal_workers) as executor: subdirectories = [os.path.join(base_directory, d) for d in os.listdir(base_directory) if os.path.isdir(os.path.join(base_directory, d))] + # Add a limit to prevent overwhelming the system + if len(subdirectories) > 1000: + log.warning(f"Large directory count ({len(subdirectories)}), this might take a while") futures = [executor.submit(run_command_threaded, subdir, file, message, encoding) for subdir inCODE/network_psutil.py (1)
16-16
: Hey, let's tackle that TODO! 🛠️Breaking up this big function would make it easier to understand and maintain.
Want me to help split this into smaller helper functions? I can show you how to break it down!
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 16-16: CODE/network_psutil.py#L16
Unresolved comment '# TODO: Split me up into helper functions'. (C100)CODE/bluetooth_details.py (1)
52-56
: Hey! Let's make this PowerShell command easier to read! 🔧The command string is a bit squished together. We can make it more readable by breaking it into smaller pieces.
Here's a cleaner way to write it:
- command = ( - "Get-PnpDevice | Where-Object { $_.FriendlyName -like '*Bluetooth*' } | " - "Select-Object FriendlyName, DeviceID, Description, Manufacturer, Status, PnpDeviceID | " - "ConvertTo-Json -Depth 3" - ) + command = [ + "Get-PnpDevice", + "| Where-Object { $_.FriendlyName -like '*Bluetooth*' }", + "| Select-Object FriendlyName, DeviceID, Description, Manufacturer, Status, PnpDeviceID", + "| ConvertTo-Json -Depth 3" + ] + command = " ".join(command)CODE/sensitive_data_miner.py (1)
87-89
: Let's make the file size limit adjustable! 🔧Having a fixed 10MB limit might not work for everyone. It would be better to make this configurable!
Here's how we can make it better:
+ # At the top of the file, after imports + DEFAULT_MAX_FILE_SIZE = 10_000_000 # 10MB in bytes + + @classmethod + def set_max_file_size(cls, size_in_mb: int): + """Set the maximum file size limit in megabytes.""" + cls.max_file_size = size_in_mb * 1_000_000 + @staticmethod def __copy_file(src_file_path: Path, dst_file_path: Path): - if src_file_path.stat().st_size > 10_000_000: # 10MB limit + if src_file_path.stat().st_size > getattr(Mine, 'max_file_size', DEFAULT_MAX_FILE_SIZE): log.warning("File exceeds size limit") returnCODE/bluetooth_logger.py (1)
104-136
: Let's make our error messages more helpful! 🎯When something goes wrong with getting Bluetooth devices, it would be nice to know exactly what happened.
Here's how we can make the error messages more helpful:
devices = parse_output( output, regex=r"^(?P<Name>.+?)\s+(?P<DeviceID>.+)$", group_names=["Name", "DeviceID"] ) + if not devices: + log.warning("No Bluetooth devices found. Make sure Bluetooth is turned on and devices are paired.") # Extract MAC addresses for device in devices: mac_match = re.search(r"BLUETOOTHDEVICE_(?P<MAC>[A-F0-9]{12})", device["DeviceID"], re.IGNORECASE) - device["MAC"] = mac_match.group("MAC") if mac_match else "Address Not Found" + if not mac_match: + log.warning(f"Could not extract MAC address from device ID: {device['DeviceID']}") + device["MAC"] = "Address Not Found" + else: + device["MAC"] = mac_match.group("MAC")CODE/vulnscan.py (2)
20-30
: Hey! Let's talk about memory management! 🧠The caching of models and vectorizers is super cool for speed, but these ML models can be pretty chunky! You might wanna add a warning about memory usage or maybe even a cache size limit.
Consider adding a max cache size like this:
# Caching dictionaries for model and vectorizer model_cache = {} vectorizer_cache = {} +MAX_CACHE_SIZE = 5 # Maximum number of models/vectorizers to keep in cache
33-66
: Let's make that cache even smarter! 🚀The caching looks great, but what happens when we load too many models? We should probably kick out the oldest ones when we hit our limit!
Here's a cool way to do it:
def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module: # Check cache first if model_path_to_load in model_cache: log.info(f"Using cached model from {model_path_to_load}") return model_cache[model_path_to_load] + # Evict oldest model if cache is full + if len(model_cache) >= MAX_CACHE_SIZE: + oldest_model = next(iter(model_cache)) + del model_cache[oldest_model] + log.info(f"Evicted {oldest_model} from cache") # Load model if not cached if model_path_to_load.endswith('.pkl'):CODE/_debug.py (1)
13-17
: Let's make those log paths more flexible! 📁The log path is hardcoded which might cause issues if someone runs this from a different directory.
Try this instead:
+import pathlib + +# Get the base directory relative to this file +BASE_DIR = pathlib.Path(__file__).parent.parent +LOG_PATH = BASE_DIR / "ACCESS" / "LOGS" / "DEBUG" / "DEBUG.log" + log = Log( {"log_level": DEBUG, - "filename": "../ACCESS/LOGS/DEBUG/DEBUG.log", + "filename": str(LOG_PATH), "truncate_message": False} )CODE/Logicytics.py (1)
13-17
: Why rename log to log_main? 🤔The renaming from
log
tolog_main
seems unnecessary since we're already importing it directly from logicytics. It might just make things more confusing.Let's keep it simple:
-log_main = Log( +log = Log( {"log_level": DEBUG, "filename": "../ACCESS/LOGS/DEBUG/DEBUG.log", "truncate_message": False} )
🛑 Comments failed to post (3)
CODE/logicytics/__init__.py (1)
88-90: 🛠️ Refactor suggestion
Hey! Let's make this code a bit safer! 🛡️
Two things to think about:
- We're calling
Get.config_data()
twice (check line 17), which is like asking the same question twice! 🤔- The
mkdir()
call could fail if something goes wrong, and we wouldn't know about it!Here's how we can make it better:
-FileManagement.mkdir() -DEBUG, *_ = Get.config_data() +try: + FileManagement.mkdir() +except Exception as e: + print(f"Oops! Couldn't create directory: {e}") + raise log = Log({"log_level": DEBUG}) # Use DEBUG from line 17📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.try: FileManagement.mkdir() except Exception as e: print(f"Oops! Couldn't create directory: {e}") raise log = Log({"log_level": DEBUG}) # Use DEBUG from line 17
CODE/network_psutil.py (1)
18-89: 🛠️ Refactor suggestion
This function is doing way too much! 🎭
The
get_network_info
function is like trying to juggle too many things at once! It's handling:
- Network stats
- Connections
- Interface info
- Bandwidth
- Hostname stuff
Let's split it into smaller, focused functions like:
collect_network_stats()
collect_connections()
measure_bandwidth()
get_host_info()
Want me to show you how to break this down into smaller, easier-to-manage pieces?
CODE/_dev.py (1)
124-126:
⚠️ Potential issueHold up! We should check if the version input is valid! 🛡️
Right now, any input could be accepted as a version number. That could cause problems later!
Let's add some validation:
- _update_ini_file("config.ini", input(f"Enter the new version of the project (Old version is {VERSION}): "), - "version") + while True: + version = input(f"Enter the new version of the project (Old version is {VERSION}): ") + if re.match(r'^\d+\.\d+\.\d+$', version): + _update_ini_file("config.ini", version, "version") + break + else: + print("Please enter a valid version number (e.g., 1.2.3)")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements._update_ini_file("config.ini", files, "files") while True: version = input(f"Enter the new version of the project (Old version is {VERSION}): ") if re.match(r'^\d+\.\d+\.\d+$', version): _update_ini_file("config.ini", version, "version") break else: print("Please enter a valid version number (e.g., 1.2.3)")
…improved logging and error handling Improved _MOD_SKELETON.py by changing its outdated code network_psutil.py: Added error handling to catch file write issues. Made the bandwidth measurement more accurate. Added IPv6 support and error handling for DNS resolution. Split up the mega-function into many protected methods in a new class Signed-off-by: Shahm Najeeb <[email protected]>
…proved error handling registry.py: Potential bug where result may not be defined when an exception occurs so the error log will raise an unrelated exception packet_sniffer.py: Just indented the error message for clearer cmd output, also fixed a potential bugs where cleanup may have errors registry.py: Improved log message vulnscan.py: Improved error exceptions catching + cleanup, also placed main code in new function, and allowed importing via this way (custom paths) __init__.py: Simplified __show_trace logic cmd_commands.py & wmic.py: Made file writing safer using with wmic.py: Made it use a more pythonic approach to the for loop (enumerate) _debug.py: Made the path of the log dynamic from execution Logicytics.py: Renamed log var back to log Signed-off-by: Shahm Najeeb <[email protected]>
Made sure that version submissions follow strict regex pattern Signed-off-by: Shahm Najeeb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
CODE/Logicytics.py (1)
34-35
: 🛠️ Refactor suggestionLet's make the file paths safer! 🔒
Using string concatenation for file paths isn't the best way. Let's use
os.path.join
instead to make it more secure and work on all operating systems.Here's how to fix it:
- if os.path.exists(f"../ACCESS/BACKUP/{name}.zip"): - os.remove(f"../ACCESS/BACKUP/{name}.zip") + backup_path = os.path.join("..", "ACCESS", "BACKUP") + backup_file = os.path.join(backup_path, f"{name}.zip") + if os.path.exists(backup_file): + os.remove(backup_file) - shutil.move(f"{name}.zip", "../ACCESS/BACKUP") + shutil.move(f"{name}.zip", backup_path)Also applies to: 45-45
♻️ Duplicate comments (1)
CODE/packet_sniffer.py (1)
24-24
: 🛠️ Refactor suggestionHey! Let's turn this into a class! 🎯
The TODO comment suggests converting this to a class, which would be super helpful! Right now, we're using global variables like
packet_data
andG
which isn't the best way to do things.Want me to show you how to convert this into a cool class structure? It would make the code way more organized and easier to test! Just let me know if you'd like to see an example. 😊
🧹 Nitpick comments (9)
PLANS.md (1)
16-16
: Newsensitive data miner
andvulnscan
Integration Task Added!
This task is a neat update with version v3.7.0. Just double-check that the merging plan between the two modules is well-documented elsewhere, so everyone understands what needs to be done.CODE/Logicytics.py (1)
288-361
: Let's make this code more organized! 🏗️The
execute_scripts
function is doing a lot of different things. We could split it into smaller, focused functions:
- One for threaded execution
- One for performance checking
- One for regular execution
This would make the code easier to understand and maintain.
Here's how we could refactor it:
-def execute_scripts(): +def execute_single_script(script: str) -> tuple[str, Exception | None]: + """Execute a single script and handle its errors.""" + log.debug(f"Executing {script}") + try: + log.parse_execution(Execute.script(script)) + log.info(f"{script} executed") + return script, None + except Exception as err: + log.error(f"Error executing {script}: {err}") + return script, err + +def execute_scripts_threaded(execution_list: list[str]) -> None: + """Execute scripts using thread pool.""" + log.debug("Using threading") + with ThreadPoolExecutor() as executor: + futures = {executor.submit(execute_single_script, script): script + for script in execution_list} + for future in as_completed(futures): + script = futures[future] + result, error = future.result() + if error: + log.error(f"Failed to execute {script}") + else: + log.debug(f"Completed {script}") + +def execute_scripts_performance(execution_list: list[str]) -> None: + """Execute scripts and measure performance.""" + execution_times = [] + for file in range(len(execution_list)): + start_time = datetime.now() + log.parse_execution(Execute.script(execution_list[file])) + end_time = datetime.now() + elapsed_time = end_time - start_time + execution_times.append((file, elapsed_time)) + log.info(f"{execution_list[file]} executed in {elapsed_time}") + + table = PrettyTable() + table.field_names = ["Script", "Execution Time"] + for file, elapsed_time in execution_times: + table.add_row([file, elapsed_time]) + + with open( + f"../ACCESS/LOGS/PERFORMANCE/Performance_Summary_" + f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt", "w" + ) as f: + f.write(table.get_string()) + + log.info("Performance check complete! Performance log found in ACCESS/LOGS/PERFORMANCE") + +def execute_scripts_regular(execution_list: list[str]) -> None: + """Execute scripts sequentially.""" + try: + for script in execution_list: + log.parse_execution(Execute.script(script)) + log.info(f"{script} executed") + except UnicodeDecodeError as e: + log.error(f"Error in code: {e}") + except Exception as e: + log.error(f"Error in code: {e}") + +def execute_scripts(): + """Execute scripts based on the current action.""" + log.info("Starting Logicytics...") + execution_list = generate_execution_list() + + if ACTION in ("threaded", "depth"): + execute_scripts_threaded(execution_list) + elif ACTION == "performance_check": + execute_scripts_performance(execution_list) + else: + execute_scripts_regular(execution_list)CODE/vulnscan.py (2)
20-30
: Hey! Let's make this code more organized! 🏗️The global variables and caches could be better organized into a class. This would make the code easier to understand and maintain. Here's how:
-# Caching dictionaries for model and vectorizer -model_cache = {} -vectorizer_cache = {} - -# Locks for thread-safe model and vectorizer loading -model_lock = threading.Lock() -vectorizer_lock = threading.Lock() - -# Global variables to hold loaded model and vectorizer -model_to_use = None -vectorizer_to_use = None +class VulnScanState: + def __init__(self): + # Caching dictionaries for model and vectorizer + self.model_cache: dict[str, torch.nn.Module | safe_open] = {} + self.vectorizer_cache: dict[str, TfidfVectorizer] = {} + + # Locks for thread-safe loading + self.model_lock = threading.Lock() + self.vectorizer_lock = threading.Lock() + + # Currently loaded resources + self.model_to_use: torch.nn.Module | safe_open | None = None + self.vectorizer_to_use: TfidfVectorizer | None = None + +# Global state instance +state = VulnScanState()
33-67
: Let's make the model loading even safer! 🔒The model loading looks good, but we can make it even better with a context manager for thread safety. Check this out:
+@contextmanager +def model_loading_context(model_path: str): + with model_lock: + try: + yield + finally: + log.debug(f"Released lock for {model_path}") def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module: # Check cache first - if model_path_to_load in model_cache: - log.info(f"Using cached model from {model_path_to_load}") - return model_cache[model_path_to_load] + with model_loading_context(model_path_to_load): + if model_path_to_load in model_cache: + log.info(f"Using cached model from {model_path_to_load}") + return model_cache[model_path_to_load]CODE/_dev.py (1)
44-44
: Quick fix needed for the f-string! 🔧The f-string here doesn't have any variables to format.
- color_print(f"[x] INI file not found", "red") + color_print("[x] INI file not found", "red")🧰 Tools
🪛 Ruff (0.8.2)
44-44: f-string without any placeholders
Remove extraneous
f
prefix(F541)
CODE/packet_sniffer.py (1)
395-400
: Good job on resource cleanup! 🧹Smart move adding proper cleanup for matplotlib figures! But hey, we can make it even better:
Try this cleaner version:
- # Clean up resources - try: - if G: - plt.close('all') # Close all figures - except Exception as e: - log.error(f"Error during cleanup: {e}") + # Clean up resources + try: + plt.close('all') # Close all figures + except Exception as e: + log.error(f"Error during cleanup: {e}") + finally: + G.clear() # Clear the graph to free memoryThe
if G:
check isn't really needed sinceplt.close('all')
is safe to call even if no figures exist. Plus, I addedG.clear()
to free up memory properly! 🚀CODE/network_psutil.py (3)
1-8
: Hey! Let's organize these imports better! 🔄The imports could be grouped better to make it clearer what's coming from where:
-import os -import socket -import time - -import psutil - -from logicytics import log, Execute - +# Standard library imports +import os +import socket +import time + +# Third-party imports +import psutil + +# Local imports +from logicytics import log, Execute
15-31
: Yo! The main get() method looks solid, but let's make it even better! 🚀The error handling is good, but we could make it more informative by:
- Adding return values to know if it succeeded
- Giving more specific error info
@log.function -def get(self): +def get(self) -> bool: """ Gathers and saves various network-related information by calling multiple internal methods. + + Returns: + bool: True if all operations succeeded, False otherwise """ try: self.__fetch_network_io_stats() self.__fetch_network_connections() self.__fetch_network_interface_addresses() self.__fetch_network_interface_stats() self.__execute_external_network_command() self.__fetch_network_connections_with_process_info() self.__measure_network_bandwidth_usage() self.__fetch_hostname_and_ip() + return True except Exception as e: - log.error(f"Error getting network info: {e}") + log.error(f"Error getting network info: {str(e)}, Type: {type(e).__name__}") + return False
128-148
: The bandwidth measurement could be more accurate! 📊Right now, we're taking measurements every second for 5 seconds. That's cool, but network traffic can be super spiky! We might want to:
- Make the sample size configurable
- Add min/max values
- Handle division by zero just in case
-def __measure_network_bandwidth_usage(self): +def __measure_network_bandwidth_usage(self, sample_count: int = 5, interval: float = 1.0): """ Measures and saves the average network bandwidth usage. + + Args: + sample_count: Number of samples to take (default: 5) + interval: Time between samples in seconds (default: 1.0) """ log.debug("Measuring network bandwidth usage...") samples = [] - for _ in range(5): + for _ in range(sample_count): net1 = psutil.net_io_counters() - time.sleep(1) + time.sleep(interval) net2 = psutil.net_io_counters() samples.append({ 'up': (net2.bytes_sent - net1.bytes_sent) / 1024, 'down': (net2.bytes_recv - net1.bytes_recv) / 1024 }) - avg_up = sum(s['up'] for s in samples) / len(samples) - avg_down = sum(s['down'] for s in samples) / len(samples) + if samples: + avg_up = sum(s['up'] for s in samples) / len(samples) + avg_down = sum(s['down'] for s in samples) / len(samples) + max_up = max(s['up'] for s in samples) + max_down = max(s['down'] for s in samples) + else: + avg_up = avg_down = max_up = max_down = 0 bandwidth_data = f"Average Upload Speed: {avg_up:.2f} KB/s\n" - bandwidth_data += f"Average Download Speed: {avg_down:.2f} KB/s\n" + bandwidth_data += f"Average Download Speed: {avg_down:.2f} KB/s\n" + bandwidth_data += f"Peak Upload Speed: {max_up:.2f} KB/s\n" + bandwidth_data += f"Peak Download Speed: {max_down:.2f} KB/s\n"
🛑 Comments failed to post (3)
CODE/vulnscan.py (2)
234-254: 🛠️ Refactor suggestion
Let's clean up this duplicate code! 🧹
There's duplicate cleanup code in both try-finally blocks. Let's fix that and the unused variables:
+def cleanup_resources(): + """Clean up cached resources and global state""" + model_cache.clear() + vectorizer_cache.clear() + global model_to_use, vectorizer_to_use + model_to_use = None + vectorizer_to_use = None + log.info("Scan complete!") def vulnscan(paths_to_scan: list[str]): try: # ... existing code ... finally: - try: - model_cache.clear() - vectorizer_cache.clear() - model_to_use = None - vectorizer_to_use = None - log.info("Scan complete!") - except Exception as e: - log.error(f"Scan cleanup failed: {e}") + cleanup_resources() try: # ... existing code ... finally: - try: - model_cache.clear() - vectorizer_cache.clear() - model_to_use = None - vectorizer_to_use = None - log.info("Scan complete!") - except Exception as e: - log.error(f"Scan cleanup failed: {e}") + cleanup_resources()Also applies to: 259-293
69-91:
⚠️ Potential issueWatch out! We need to handle errors here!
⚠️ The vectorizer loading function needs error handling for file loading failures. Here's how to fix it:
def load_vectorizer(vectorizer_path_to_load: str) -> TfidfVectorizer: # Check cache first if vectorizer_path_to_load in vectorizer_cache: log.info(f"Using cached vectorizer from {vectorizer_path_to_load}") return vectorizer_cache[vectorizer_path_to_load] # Load vectorizer if not cached - vectorizer = joblib.load(vectorizer_path_to_load) + try: + vectorizer = joblib.load(vectorizer_path_to_load) + except Exception as e: + log.error(f"Failed to load vectorizer from {vectorizer_path_to_load}: {e}") + raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def load_vectorizer(vectorizer_path_to_load: str) -> TfidfVectorizer: """ Load the vectorizer from the specified file path with caching. Parameters: vectorizer_path_to_load (str): Full file path to the vectorizer file to be loaded. Returns: TfidfVectorizer: Loaded vectorizer object. """ # Check cache first if vectorizer_path_to_load in vectorizer_cache: log.info(f"Using cached vectorizer from {vectorizer_path_to_load}") return vectorizer_cache[vectorizer_path_to_load] # Load vectorizer if not cached try: vectorizer = joblib.load(vectorizer_path_to_load) except Exception as e: log.error(f"Failed to load vectorizer from {vectorizer_path_to_load}: {e}") raise # Cache the vectorizer vectorizer_cache[vectorizer_path_to_load] = vectorizer log.info(f"Loaded and cached vectorizer from {vectorizer_path_to_load}") return vectorizer
CODE/network_psutil.py (1)
101-109:
⚠️ Potential issueWatch out! The ipconfig command might not work everywhere!
⚠️ The
ipconfig
command is Windows-specific. On Linux/Mac, it'sifconfig
orip addr
.def __execute_external_network_command(self): """ Executes an external network command and saves the output. """ log.debug("Executing external network command...") - result = Execute.command("ipconfig") + if os.name == 'nt': # Windows + result = Execute.command("ipconfig") + else: # Linux/Mac + result = Execute.command("ip addr") self.__save_data("network_command_output.txt", result) log.info("Network command output saved.")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def __execute_external_network_command(self): """ Executes an external network command and saves the output. """ log.debug("Executing external network command...") if os.name == 'nt': # Windows result = Execute.command("ipconfig") else: # Linux/Mac result = Execute.command("ip addr") self.__save_data("network_command_output.txt", result) log.info("Network command output saved.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
CODE/packet_sniffer.py (1)
19-21
:⚠️ Potential issueGlobal variables alert! 🚫
Using global variables for
packet_data
andG
isn't the best practice. They should be instance variables of theSniff
class.Here's how to fix it:
-# Global configuration conf.verb = 0 # Turn off verbosity for clean output -packet_data = [] # List to store packet information -G = nx.Graph() # Initialize a graph class Sniff: + def __init__(self): + self.packet_data = [] # List to store packet information + self.G = nx.Graph() # Initialize a graphThen update all references to use
self.packet_data
andself.G
.
🧹 Nitpick comments (12)
CODE/_dev.py (3)
11-20
: Hey, cool color printing function! 🎨The
color_print
function is neat, but heads up - it might not work on Windows Command Prompt without special setup. Also, you could make it even cooler by adding more colors like blue and purple!Here's how you could level it up:
def color_print(text, color="reset"): colors = { "reset": "\033[0m", "red": "\033[31m", "green": "\033[32m", "yellow": "\033[33m", + "blue": "\033[34m", + "purple": "\033[35m", + "cyan": "\033[36m", } + # Check if we're on Windows + import platform + if platform.system() == 'Windows': + import os + os.system('color') color_code = colors.get(color.lower(), colors["reset"]) print(f"{color_code}{text}{colors['reset']}")
137-143
: Let's make the version input even better! 🎮The version input validation is great, but what if someone keeps entering the wrong format forever? Maybe add a max number of tries?
Here's a suggestion:
+ max_attempts = 3 + attempts = 0 while True: + if attempts >= max_attempts: + color_print("[x] Maximum attempts reached. Please run the script again.", "red") + return version = input(f"\033[36m[?] Enter the new version of the project (Old version is {VERSION}): \033[0m") if re.match(r'^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$', version): _update_ini_file("config.ini", version, "version") break else: color_print("[!] Please enter a valid version number (e.g., 1.2.3)", "yellow") + attempts += 1 + color_print(f"[!] {max_attempts - attempts} attempts remaining", "yellow")
128-130
: Let's keep the color printing style consistent! 🎯You've got this cool new
color_print
function, but you're not using it here. Let's stick to one way of doing colors!Here's how to make it match:
- print("\n".join([f"\033[92m+ {file}\033[0m" for file in added_files])) # Green + - print("\n".join([f"\033[91m- {file}\033[0m" for file in removed_files])) # Red - - print("\n".join([f"* {file}" for file in normal_files])) + for file in added_files: + color_print(f"+ {file}", "green") + for file in removed_files: + color_print(f"- {file}", "red") + for file in normal_files: + color_print(f"* {file}")CODE/logicytics/Flag.py (1)
763-766
: Avoid repeating the same string operation! 🔄You're calling
.replace("--", "")
multiple times on the same string. Let's make it DRY (Don't Repeat Yourself)!Here's a cleaner way to write it:
- if matched_flag.replace("--", "") not in history_data['flags_usage']: - history_data['flags_usage'][matched_flag.replace("--", "")] = 0 - history_data['flags_usage'][matched_flag.replace("--", "")] += 1 + clean_flag = matched_flag.replace("--", "") + if clean_flag not in history_data['flags_usage']: + history_data['flags_usage'][clean_flag] = 0 + history_data['flags_usage'][clean_flag] += 1CODE/vulnscan.py (3)
20-25
: Hey! These planned features for v3.4.2 look super important! 🚀The TODOs mention some cool performance improvements:
- Batch file reading
- Async file scanning
- Better model loading and caching
- Improved feature extraction
Let me know if you want help implementing any of these features!
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 20-20: CODE/vulnscan.py#L20
Unresolved comment '# TODO: v3.4.2'. (C100)
26-29
: Quick heads up about the v3.4.1 TODOs! 📝The MAX_FILE_SIZE feature would be super helpful to avoid scanning huge files that might slow things down. Want me to help you add this feature?
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 26-26: CODE/vulnscan.py#L26
Unresolved comment '# TODO: v3.4.1'. (C100)
83-104
: Your ML prediction code looks solid, but we can make it even faster! 🏃♂️The code works great, but here's a cool trick to speed things up:
- We can batch process multiple files instead of doing them one by one
- We could cache the device check and model transfer to GPU
Here's how we can make it faster:
def is_sensitive(self, file_content: str) -> tuple[bool, float, str]: + # Cache device and model transfer at class level + if not hasattr(self, '_device'): + self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + if isinstance(self.model, torch.nn.Module): + self.model.to(self._device) + self.model.eval() # Set eval mode once if isinstance(self.model, torch.nn.Module): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - self.model.to(device) features = self.vectorizer.transform([file_content]) - self.model.eval() with torch.no_grad(): - features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(device) + features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(self._device)CODE/network_psutil.py (3)
10-30
: Nice job with the class structure and error handling! 🎯The
NetworkInfo
class is well-organized with clear docstrings. However, it could use a cleanup method to handle resource management.Consider adding a cleanup method:
+ def cleanup(self): + """Clean up any resources used during network info gathering.""" + try: + # Close any open file handles, sockets, etc. + pass + except Exception as e: + log.error(f"Error during cleanup: {e}")
136-136
: Hey, there's a TODO here! 📝The TODO comment indicates that config.ini integration is planned for v3.4.1.
Want me to help implement the config.ini integration for the sample count and interval values?
161-180
: Your IP address handling is solid, but could be even better! 💪The
__fetch_hostname_and_ip
method could separate IPv4 and IPv6 addresses for better clarity.Here's how to improve it:
try: hostname = socket.gethostname() - ip_addresses = [] + ipv4_addresses = [] + ipv6_addresses = [] for res in socket.getaddrinfo(hostname, None): ip = res[4][0] - if ip not in ip_addresses: - ip_addresses.append(ip) + if ':' in ip and ip not in ipv6_addresses: + ipv6_addresses.append(ip) + elif '.' in ip and ip not in ipv4_addresses: + ipv4_addresses.append(ip) ip_config_data = f"Hostname: {hostname}\n" - ip_config_data += "IP Addresses:\n" - for ip in ip_addresses: + ip_config_data += "IPv4 Addresses:\n" + for ip in ipv4_addresses: + ip_config_data += f" - {ip}\n" + ip_config_data += "IPv6 Addresses:\n" + for ip in ipv6_addresses: ip_config_data += f" - {ip}\n"CODE/packet_sniffer.py (2)
173-238
: Let's make this code faster with async! 🏃♂️The
__start_sniffing
method could benefit from async/await for better performance, especially when handling multiple packets.Consider this improvement:
- def __start_sniffing(self, interface: str, packet_count: int = 10, timeout: int = 10): + async def __start_sniffing(self, interface: str, packet_count: int = 10, timeout: int = 10): log.info(f"Starting packet capture on interface '{interface}'...") packet_counter = 0 - def packet_callback(packet: IP) -> bool: + async def packet_callback(packet: IP) -> bool: log.debug(f"Received packet: {packet.summary()}") nonlocal packet_counter if packet_counter >= packet_count: log.info(f"Captured {packet_count} packets, stopping sniffing.") return True - self.__log_packet(packet) + await self.__log_packet(packet) packet_counter += 1
386-393
: Make cleanup even safer with context managers! 🔒The cleanup function could use context managers for better resource management.
Here's a better way:
-def cleanup(): +class NetworkGraphManager: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + plt.close('all') + except Exception as err: + log.error(f"Error during cleanup: {err}") + finally: + G.clear() if __name__ == "__main__": - try: - Sniff().packets() - except Exception as e: - log.error(e) - finally: - cleanup() + with NetworkGraphManager(): + try: + Sniff().packets() + except Exception as e: + log.error(e)
🛑 Comments failed to post (4)
CODE/vulnscan.py (2)
198-203: 🛠️ Refactor suggestion
These hardcoded paths should be in a config file! 🔧
It would be way cooler to have these paths in a config file so users can easily change them without touching the code!
Let's move these to a config file! Want me to show you how to set that up?
182-185: 🛠️ Refactor suggestion
Watch out! This file writing could slow things down!
⚠️ Opening and closing the file for each sensitive file isn't super efficient. Let's collect them in a list and write them all at once!
Here's a better way to do it:
+ sensitive_files = [] for future in as_completed(futures): try: result, probability, reason = future.result() if result: file_path = futures[future] log.debug( f"Sensitive file detected: {file_path} (Confidence: {probability:.2f}). Reason: {reason}") - with open("Sensitive_File_Paths.txt", "a") as sensitive_file: - sensitive_file.write(f"{file_path}\n") + sensitive_files.append(file_path) + # Write all sensitive files at once + if sensitive_files: + with open("Sensitive_File_Paths.txt", "a") as sensitive_file: + sensitive_file.write("\n".join(sensitive_files) + "\n")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.sensitive_files = [] for future in as_completed(futures): try: result, probability, reason = future.result() if result: file_path = futures[future] log.debug( f"Sensitive file detected: {file_path} (Confidence: {probability:.2f}). Reason: {reason}") sensitive_files.append(file_path) # Write all sensitive files at once if sensitive_files: with open("Sensitive_File_Paths.txt", "a") as sensitive_file: sensitive_file.write("\n".join(sensitive_files) + "\n")
CODE/network_psutil.py (2)
128-159: 🛠️ Refactor suggestion
The sleep() is making your code wait around! ⏰
The
__measure_network_bandwidth_usage
method uses blocking sleep() calls which could make the program unresponsive.Consider using async/await for non-blocking measurements:
- def __measure_network_bandwidth_usage(self, sample_count: int = 5, interval: float = 1.0): + async def __measure_network_bandwidth_usage(self, sample_count: int = 5, interval: float = 1.0): samples = [] for _ in range(sample_count): net1 = psutil.net_io_counters() - time.sleep(interval) + await asyncio.sleep(interval) net2 = psutil.net_io_counters()Don't forget to add:
import asyncio
32-46:
⚠️ Potential issueWatch out for path traversal! 🚨
The
__save_data
method needs to sanitize file paths to prevent directory traversal attacks.Here's how to make it safer:
@staticmethod def __save_data(filename: str, data: str, father_dir_name: str = "network_data"): + # Sanitize the filename to prevent directory traversal + filename = os.path.basename(filename) + father_dir_name = os.path.basename(father_dir_name) os.makedirs(father_dir_name, exist_ok=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.@staticmethod def __save_data(filename: str, data: str, father_dir_name: str = "network_data"): """ Saves the given data to a file. :param filename: The name of the file to save the data in. :param data: The data to be saved. :param father_dir_name: The directory to save the file in. Defaults to "network_data". """ # Sanitize the filename to prevent directory traversal filename = os.path.basename(filename) father_dir_name = os.path.basename(father_dir_name) os.makedirs(father_dir_name, exist_ok=True) try: with open(os.path.join(father_dir_name, filename), "w") as f: f.write(data) except IOError as e: log.error(f"Failed to save {filename}: {e}")
[__init__.py] Added new ObjectLoadError Exception [_dev.py] Improved usage of color_print, as well as added a max attempt of 10 for inputting a valid version [Flag.py] Fixed potential bug where replacing "--" can result in incorrect flags, now its lstrip('-'), Fixed bug where if a flag match was found (using cosine sim) it would still say "Sorry" and load up the ML to get the flag for you... [Logicytics.py] Fixed bug where MAX_WORKERS was str rather than int due to improper config.get call Added safe_remove and safe_append to make sure the execution list is proper and doesn't break Refactored generate_execution_list to be more efficient Performance flag now logs memory usage!!! Added simple KeyboardInterrupt warning [network_psutil.py] Made the measure method an async to improve efficiency and remove the time.sleep() bottleneck [packet_sniffer.py] Made global variables inside the Sniff class (under __init__) [vulnscan.py] Added checks for vectorizer loading to dodge crashes Improved IO bottleneck by making the write happen once in the end rather than every for-loop Signed-off-by: Shahm Najeeb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 10
🔭 Outside diff range comments (1)
CODE/Logicytics.py (1)
238-262
: 🛠️ Refactor suggestionLet's make sure those backups actually work! 🎯
The backup method doesn't check if the target directory exists before trying to back it up.
Here's a safer version:
@staticmethod def backup(directory: str, name: str): + if not os.path.exists(directory): + log.error(f"Directory {directory} does not exist!") + return False + # Check if backup exists, delete it if so if os.path.exists(f"../ACCESS/BACKUP/{name}.zip"): os.remove(f"../ACCESS/BACKUP/{name}.zip")🧰 Tools
🪛 Ruff (0.8.2)
255-255: Loop control variable
dirs
not used within loop bodyRename unused
dirs
to_dirs
(B007)
🧹 Nitpick comments (14)
CODE/_dev.py (2)
72-84
: Let's make the input prompt even cooler! 🎮The colored prompt is nice, but we could make it even better by moving the color codes to the color_print function.
Here's how:
- answer = input(f"\033[36m[?] {question} (y)es or (n)o:- \033[0m") + prompt = f"[?] {question} (y)es or (n)o:- " + answer = input(color_print(prompt, "cyan"))This way, all our color handling stays in one place! 🎯
139-154
: Version input is looking solid! 🎯Love the regex validation and attempt limit! Just one tiny suggestion to make it even better.
Let's extract that regex pattern into a constant at the top of the file:
+VERSION_PATTERN = r'^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$' # Later in the code... - if re.match(r'^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$', version): + if re.match(VERSION_PATTERN, version):This makes it easier to update if we ever need to change the version format! 🔧
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 107-154: CODE/_dev.py#L107-L154
Complex MethodCODE/logicytics/Flag.py (3)
656-656
: Let's make those string replacements more consistent! 🔄You're using
replace('_', '-')
in multiple places to convert flag names. It would be cleaner to create a helper method for this common operation.Here's a suggestion to make it more maintainable:
+ @staticmethod + def __format_flag_name(flag: str) -> str: + """Convert internal flag name to CLI format.""" + return flag.replace('_', '-') - print(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0].replace('_', '-')}'?") + print(f"Invalid flag '{user_input}', Did you mean '--{cls.__format_flag_name(closest_matches[0])}'?") - f"User input: {user_input_desc}\nMatched flag: {flag_received.replace('_', '-')}\nAccuracy: {accuracy_received:.2f}%\n") + f"User input: {user_input_desc}\nMatched flag: {cls.__format_flag_name(flag_received)}\nAccuracy: {accuracy_received:.2f}%\n") - print(f"Matched flag: {flag_received.replace('_', '-')} (Accuracy: {accuracy_received:.2f}%)\n") + print(f"Matched flag: {cls.__format_flag_name(flag_received)} (Accuracy: {accuracy_received:.2f}%)\n")Also applies to: 667-668, 670-670
159-159
: Heads up! There's a TODO comment here 👀The TODO comment indicates this feature is in beta for v3.6.0. Want me to help track this?
I can help create an issue to track this beta feature. Would you like me to do that?
764-768
: Let's clean up this history update logic! 🧹The repeated string manipulation with
replace("--", "")
could be simplified.Here's a cleaner way to handle it:
- if matched_flag.replace("--", "") not in history_data['flags_usage']: - history_data['flags_usage'][matched_flag.replace("--", "")] = 0 - history_data['flags_usage'][matched_flag.replace("--", "")] += 1 + clean_flag = matched_flag.replace("--", "") + if clean_flag not in history_data['flags_usage']: + history_data['flags_usage'][clean_flag] = 0 + history_data['flags_usage'][clean_flag] += 1CODE/Logicytics.py (3)
19-21
: Hey! Let's make the config file path more flexible! 🔧The hardcoded path
"config.ini"
might break if someone runs the script from a different directory. We should use an absolute path based on the script's location.Here's a cooler way to do it:
-config.read("config.ini") +config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.ini"))
197-213
: Let's make the memory units more consistent! 📊The memory calculations mix different units and could be clearer. Let's create a helper function to handle unit conversions!
Here's a better way:
+def bytes_to_mb(bytes_value: int) -> float: + return bytes_value / (1024 * 1024) execution_times = [] memory_usage = [] process = psutil.Process() for file in range(len(self.execution_list)): start_time = datetime.now() - start_memory = process.memory_info().rss / 1024 / 1024 # MB + start_memory = bytes_to_mb(process.memory_info().rss) log.execution(Execute.script(self.execution_list[file])) end_time = datetime.now() - end_memory = process.memory_info().rss / 1024 / 1024 # MB + end_memory = bytes_to_mb(process.memory_info().rss)
505-510
: We should clean up after ourselves! 🧹The keyboard interrupt handler warns about leftover files but doesn't try to clean them up.
Would you like me to help create a cleanup function that:
- Removes temporary files
- Closes open file handles
- Restores the original state
This would make the force shutdown much safer!
CODE/packet_sniffer.py (3)
19-25
: Yo, nice job on the class setup! 🎮The
Sniff
class is organized like a pro gamer's setup - clean and ready for action! Just one thing though - you might wanna add some error handling in__init__
when initializing that graph.def __init__(self): conf.verb = 0 # Turn off verbosity for clean output + try: self.packet_data = [] # List to store packet information self.G = nx.Graph() # Initialize a graph + except Exception as err: + log.error(f"Failed to initialize Sniff: {err}") + raise
27-67
: This packet logging is lit! 🔥The
__log_packet
method is handling packets like a boss, but there's a small thing we could make even better. When you're dealing with errors, it might be good to know what kind of packet caused the problem.try: if packet.haslayer(IP): log.debug(f"Packet captured: {packet.summary()}") packet_info = { 'src_ip': packet[IP].src, 'dst_ip': packet[IP].dst, 'protocol': self.__get_protocol_name(packet), 'src_port': self.__get_port_info(packet, 'sport'), 'dst_port': self.__get_port_info(packet, 'dport'), } self.packet_data.append(packet_info) self.__print_packet_summary(packet_info) self.__add_to_graph(packet_info) except Exception as err: - log.error(f"Error processing packet: {err}") + log.error(f"Error processing packet {packet.summary() if hasattr(packet, 'summary') else 'Unknown'}: {err}")
332-401
: The main sniffing logic is on point! 🎯The error handling and interface name correction is smart! Just one suggestion - you might want to add a timeout to the retry loop to avoid hanging if something goes wrong.
+ from time import time + start_time = time() + max_retry_time = 30 # seconds for attempt in range(2): # Try original and corrected name try: + if time() - start_time > max_retry_time: + log.error("Retry timeout exceeded") + break self.__start_sniffing(interface, packet_count, timeout) breakCODE/vulnscan.py (3)
20-25
: Hey! Let's track these cool future improvements! 🚀These planned improvements for v3.4.2 look awesome! They'll make the scanning way faster with batch processing and async operations. Let's make sure we don't forget about them.
Want me to create GitHub issues to track these improvements? I can help set those up! 🙌
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 20-20: CODE/vulnscan.py#L20
Unresolved comment '# TODO: v3.4.2'. (C100)
26-29
: Don't forget about this file size limit feature! 📏Adding a MAX_FILE_SIZE check would be super helpful to avoid scanning huge files that might slow things down.
Want me to create a GitHub issue for this v3.4.1 feature too? I can help set that up! 🙌
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 26-26: CODE/vulnscan.py#L26
Unresolved comment '# TODO: v3.4.1'. (C100)
87-108
: Let's make this code more memory-friendly! 🧠The
is_sensitive
method looks solid, but we could make it even better! When dealing with large files, those feature arrays might eat up a lot of memory.Here's a cool way to make it better:
def is_sensitive(self, file_content: str) -> tuple[bool, float, str]: if isinstance(self.model, torch.nn.Module): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) - features = self.vectorizer.transform([file_content]) + # Use sparse matrices to save memory + features = self.vectorizer.transform([file_content]).tocsr() self.model.eval() with torch.no_grad(): - features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(device) + # Convert sparse matrix to tensor more efficiently + features_tensor = torch.sparse_coo_tensor( + torch.LongTensor([features.nonzero()[0], features.nonzero()[1]]), + torch.FloatTensor(features.data), + size=features.shape + ).to(device) prediction = self.model(features_tensor) probability = torch.softmax(prediction, dim=1).max().item() - top_features = np.argsort(features.toarray()[0])[-5:] + # Get top features from sparse matrix directly + feature_scores = features.data + top_indices = np.argsort(feature_scores)[-5:] - reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_features]) + reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_indices]) return prediction.argmax(dim=1).item() == 1, probability, reason
🛑 Comments failed to post (10)
CODE/logicytics/__init__.py (2)
141-142: 🛠️ Refactor suggestion
Hold up! We should handle errors here too!
⚠️ These initialization calls could fail, and we wouldn't know why. Let's wrap them in a try-catch:
-FileManagement.mkdir() -log = Log({"log_level": DEBUG}) +try: + FileManagement.mkdir() + log = Log({"log_level": DEBUG}) +except Exception as e: + import sys + sys.stderr.write(f"Failed to initialize logging: {e}\n") + exit(1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.try: FileManagement.mkdir() log = Log({"log_level": DEBUG}) except Exception as e: import sys sys.stderr.write(f"Failed to initialize logging: {e}\n") exit(1)
27-68: 🛠️ Refactor suggestion
Hey, we need to make this config loading more robust! 🛠️
The
config_data
function is doing a good job, but it could crash if the config file is messed up. Let's add some error checking!Here's how we can make it better:
def config_data() -> tuple[str, str, list[str], bool]: config = configparser.ConfigParser() path = config_path() config.read(path) + try: log_using_debug = config.getboolean("Settings", "log_using_debug") delete_old_logs = config.getboolean("Settings", "delete_old_logs") version = config.get("System Settings", "version") files = config.get("System Settings", "files").split(", ") + except (configparser.NoSectionError, configparser.NoOptionError) as e: + print(f"Error reading config file: {e}") + exit(1) log_using_debug = "DEBUG" if log_using_debug else "INFO" return log_using_debug, version, files, delete_old_logsAlso, that
config_path
should probably use the logger - but wait, we can't because we're still setting up the logger! 🤔 Maybe we should usesys.stderr.write
instead:- print("The config.ini file is not found in the expected location.") + import sys + sys.stderr.write("The config.ini file is not found in the expected location.\n")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def config_data() -> tuple[str, str, list[str], bool]: """ Retrieves configuration data from the 'config.ini' file. If the configuration file is not found in any of these locations, the program exits with an error message. Returns: tuple[str, str, list[str], bool]: A tuple containing: - Log level (str): Either "DEBUG" or "INFO" - Version (str): System version from configuration - Files (list[str]): List of files specified in configuration - Delete old logs (bool): Flag indicating whether to delete old log files Raises: SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations """ def config_path(): script_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(script_dir) configs_path = os.path.join(parent_dir, "config.ini") if os.path.exists(configs_path): return configs_path else: import sys sys.stderr.write("The config.ini file is not found in the expected location.\n") exit(1) config = configparser.ConfigParser() path = config_path() config.read(path) try: log_using_debug = config.getboolean("Settings", "log_using_debug") delete_old_logs = config.getboolean("Settings", "delete_old_logs") version = config.get("System Settings", "version") files = config.get("System Settings", "files").split(", ") except (configparser.NoSectionError, configparser.NoOptionError) as e: print(f"Error reading config file: {e}") exit(1) log_using_debug = "DEBUG" if log_using_debug else "INFO" return log_using_debug, version, files, delete_old_logs
CODE/logicytics/Flag.py (1)
38-38: 💡 Verification agent
🧩 Analysis chain
Hey! Nice move making the Match class private! 🔒
Renaming
Match
to_Match
is a solid choice for encapsulation. Just make sure you've updated all the places where this class is used in other files!Let's double-check if we caught all the references:
🏁 Script executed:
#!/bin/bash # Search for any remaining references to the old 'Match' class rg -i "class Match|Match\." --type pyLength of output: 745
Heads-Up: Update Outdated Comment on Flag Method
Great job renaming the
Match
class to_Match
and updating its code references! The search results show that all the actual method calls now use_Match
correctly. However, there's still an outdated comment inCODE/logicytics/Flag.py
that mentions “Uses the Match.flag method…”—this should be updated to reference_Match.flag
to avoid any confusion.
- Action Item: Update the comment in
CODE/logicytics/Flag.py
to reflect the new_Match
naming.- Double-Check: Ensure that any documentation or inline comments across the codebase are consistent with this change.
CODE/Logicytics.py (2)
166-176: 🛠️ Refactor suggestion
Watch out for those sneaky thread errors! 🕷️
The threading code could use some extra error handling for when things go wrong in the worker threads.
Here's a safer way to handle thread errors:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(self.__script_handler, script): script for script in self.execution_list} for future in as_completed(futures): script = futures[future] - result, error = future.result() - if error: - log.error(f"Failed to execute {script}") - else: - log.debug(f"Completed {script}") + try: + result, error = future.result() + if error: + log.error(f"Failed to execute {script}: {error}") + else: + log.debug(f"Completed {script}") + except Exception as e: + log.error(f"Thread crashed while executing {script}: {e}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(self.__script_handler, script): script for script in self.execution_list} for future in as_completed(futures): script = futures[future] try: result, error = future.result() if error: log.error(f"Failed to execute {script}: {error}") else: log.debug(f"Completed {script}") except Exception as e: log.error(f"Thread crashed while executing {script}: {e}")
309-313: 🛠️ Refactor suggestion
That Python command might not work everywhere! 🌍
The execute_new_window method assumes 'python' is in the PATH and uses Windows-specific commands.
Here's a more portable solution:
- sr_process = subprocess.Popen(["cmd.exe", "/c", "start", "python", sr_script_path]) + if os.name == 'nt': # Windows + sr_process = subprocess.Popen(["cmd.exe", "/c", "start", sys.executable, sr_script_path]) + else: # Unix-like + sr_process = subprocess.Popen([sys.executable, sr_script_path])Don't forget to add
import sys
at the top!📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.import os import sys import subprocess # ... other code ... sr_current_dir = os.path.dirname(os.path.abspath(__file__)) sr_script_path = os.path.join(sr_current_dir, file_path) if os.name == 'nt': # Windows sr_process = subprocess.Popen(["cmd.exe", "/c", "start", sys.executable, sr_script_path]) else: # Unix-like sr_process = subprocess.Popen([sys.executable, sr_script_path]) sr_process.wait() exit(0)
CODE/packet_sniffer.py (3)
69-127: 🛠️ Refactor suggestion
Your protocol detection is smooth! 👊
The static methods for protocol and port detection are clean and well-documented. Just one small thing - in
__get_port_info
, you're not actually raising thatValueError
mentioned in the docstring when an invalidport_type
is provided.@staticmethod def __get_port_info(packet: IP, port_type: str) -> int | None: + if port_type not in ('sport', 'dport'): + raise ValueError(f"Invalid port_type '{port_type}'. Must be 'sport' or 'dport'.") log.debug(f"Port type: {port_type}") if packet.haslayer(TCP): return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport elif packet.haslayer(UDP): return packet[UDP].sport if port_type == 'sport' else packet[UDP].dport return None📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.@staticmethod def __get_protocol_name(packet: IP) -> str: """ Determines the protocol name of a captured network packet. This function examines the layers of a given IP packet to identify its protocol type. It supports identification of TCP, UDP, ICMP, and classifies any other packet types as 'Other'. Parameters: packet (IP): The captured network packet to analyze for protocol identification. Returns: str: The protocol name, which can be one of: 'TCP', 'UDP', 'ICMP', or 'Other'. Notes: - Uses Scapy's layer checking methods to determine protocol - Logs debug information about the packet and detected protocol - Provides a fallback 'Other' classification for unrecognized protocols """ log.debug(f"Checking protocol for packet: {packet.summary()}") if packet.haslayer(TCP): log.debug("Protocol: TCP") return 'TCP' elif packet.haslayer(UDP): log.debug("Protocol: UDP") return 'UDP' elif packet.haslayer(ICMP): log.debug("Protocol: ICMP") return 'ICMP' else: log.debug("Protocol: Other") return 'Other' # Function to extract port information from a packet @staticmethod def __get_port_info(packet: IP, port_type: str) -> int | None: """ Extracts the source or destination port from a captured packet. Parameters: packet (IP): The captured packet to analyze. port_type (str): The type of port to extract ('sport' for source port, 'dport' for destination port). Returns: int | None: The port number if available, otherwise None. Raises: ValueError: If an invalid port_type is provided. Notes: - Supports extracting ports from TCP and UDP layers - Returns None if the packet does not have TCP or UDP layers """ if port_type not in ('sport', 'dport'): raise ValueError(f"Invalid port_type '{port_type}'. Must be 'sport' or 'dport'.") log.debug(f"Port type: {port_type}") if packet.haslayer(TCP): return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport elif packet.haslayer(UDP): return packet[UDP].sport if port_type == 'sport' else packet[UDP].dport return None
396-400:
⚠️ Potential issueWatch out for this cleanup call!
⚠️ Creating a new
Sniff
instance just for cleanup isn't the best move. You should use the same instance that was used for sniffing.if __name__ == "__main__": try: - Sniff().packets() + sniffer = Sniff() + sniffer.packets() except Exception as e: log.error(e) finally: - Sniff().cleanup() + sniffer.cleanup()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.if __name__ == "__main__": try: sniffer = Sniff() sniffer.packets() except Exception as e: log.error(e) finally: sniffer.cleanup()
239-331: 🛠️ Refactor suggestion
Your visualization game is strong! 📊
The
__visualize_graph
method is super flexible with all those customization options! But there's one thing we should probably check - what if someone passes an invalidlayout_func
?def __visualize_graph(self, node_colors: dict[str, str] | None = None, node_sizes: dict[str, int] | None = None, *, figsize: tuple[int, int] = (12, 8), font_size: int = 10, font_weight: str = "bold", title: str = "Network Connections Graph", output_file: str = "network_connections_graph.png", layout_func: callable = nx.spring_layout): + if not callable(layout_func): + raise TypeError("layout_func must be a callable") pos = layout_func(self.G)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.# Function to visualize the graph of packet connections def __visualize_graph(self, node_colors: dict[str, str] | None = None, node_sizes: dict[str, int] | None = None, *, # Force keyword arguments for the following parameters figsize: tuple[int, int] = (12, 8), font_size: int = 10, font_weight: str = "bold", title: str = "Network Connections Graph", output_file: str = "network_connections_graph.png", layout_func: callable = nx.spring_layout): """ Visualizes the graph of packet connections with customizable node colors and sizes. Generates a network graph representation of packet connections using NetworkX and Matplotlib, with optional customization of node colors and sizes. Parameters: node_colors (dict, optional): A dictionary mapping nodes to their display colors. If not provided, defaults to skyblue for all nodes. node_sizes (dict, optional): A dictionary mapping nodes to their display sizes. If not provided, defaults to 3000 for all nodes. figsize (tuple, optional): The size of the figure in inches (width, height). Defaults to (12, 8). font_size (int, optional): The font size for node labels. Defaults to 10. font_weight (str, optional): The font weight for node labels. Defaults to 'bold'. title (str, optional): The title of the graph. Defaults to 'Network Connections Graph'. output_file (str, optional): The name of the output PNG file to save the graph visualization. Defaults to 'network_connections_graph.png'. layout_func (callable, optional): The layout function to use for the graph. Defaults to nx.spring_layout. Side Effects: - Creates a matplotlib figure - Saves a PNG image file named 'network_connections_graph.png' - Closes the matplotlib figure after saving Returns: None Example: # Default visualization visualize_graph() # Custom node colors and sizes custom_colors = {'192.168.1.1': 'red', '10.0.0.1': 'green'} custom_sizes = {'192.168.1.1': 5000, '10.0.0.1': 2000} visualize_graph(node_colors=custom_colors, node_sizes=custom_sizes) """ if not callable(layout_func): raise TypeError("layout_func must be a callable") pos = layout_func(self.G) plt.figure(figsize=figsize) if node_colors is None: node_colors = {node: "skyblue" for node in self.G.nodes()} if node_sizes is None: node_sizes = {node: 3000 for node in self.G.nodes()} node_color_list = [node_colors.get(node, "skyblue") for node in self.G.nodes()] node_size_list = [node_sizes.get(node, 3000) for node in self.G.nodes()] nx.draw(self.G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list, font_size=font_size, font_weight=font_weight) edge_labels = nx.get_edge_attributes(self.G, 'protocol') nx.draw_networkx_edge_labels(self.G, pos, edge_labels=edge_labels) plt.title(title) plt.savefig(output_file) plt.close()
CODE/vulnscan.py (2)
193-198:
⚠️ Potential issueWatch out! This could be dangerous!
⚠️ Opening files with "a" mode without proper path handling could be risky! Someone could trick the program into writing to system files.
Here's a safer way to do it:
- with open("Sensitive_File_Paths.txt", "a") as sensitive_file: + output_path = os.path.join(os.getcwd(), "Sensitive_File_Paths.txt") + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, "a") as sensitive_file:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.output_path = os.path.join(os.getcwd(), "Sensitive_File_Paths.txt") os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "a") as sensitive_file: if sensitive_files: sensitive_file.write("\n".join(sensitive_files) + "\n") else: sensitive_file.write("Sadly no sensitive file's were detected.")
206-211: 🛠️ Refactor suggestion
Let's make this work everywhere! 🌍
The paths are Windows-only right now. That's not cool for people using Mac or Linux!
Here's how to make it work everywhere:
base_paths = [ - "C:\\Users\\", - "C:\\Windows\\Logs", - "C:\\Program Files", - "C:\\Program Files (x86)" + os.path.expanduser("~"), # Home directory + os.path.join(os.path.sep, "var", "log"), # Linux logs + os.path.join(os.path.sep, "Applications") if sys.platform == "darwin" # Mac apps + else os.path.join(os.path.sep, "Program Files") # Windows apps ]Don't forget to add
import sys
at the top! 😉📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.# At the top of your file, add these imports if they're not already present: import os import sys # … rest of your imports … # Replace the original base_paths block with the following: base_paths = [ os.path.expanduser("~"), # Home directory os.path.join(os.path.sep, "var", "log"), # Linux logs os.path.join(os.path.sep, "Applications") if sys.platform == "darwin" else os.path.join(os.path.sep, "Program Files") # Windows apps ]
[__init__.py] Added a library check to make sure its only imported Removal of config_data to Config.py (Created it as well) [_dev.py] [color_print()] Added new color cyan and new option to not print but input (also returns values) Changed inputs to color_print(is_input=True) Changed some messages to make them look better Changed how the logic of getting the file changes work, now its WAY more accurate and better and faster [Config.py] Added new file, this is for config data gain, [Ref: __init__.py] [Flag.py, Logicytics.py, packet_sniffer.py] Changed config to the CONFIG global var from __init__.py [Get.py] Changed list_of_files() logic to now only exclude files starting with '_' and any extensions and files passed down (rather than hardcoded files like Logicytics.py) Refactored the class and function to be more efficient as well as fixed some bugs [Logicytics.py] Added exclusion to get files to accommodate the new changes [Ref: Get.py] Mitigated potential bugs in __threaded method with ThreadPoolExecutor, added a try-except Mitigated potential bugs in backup if the path doesn't exist Made the subprocess opening for execute_new_window more safe by mitigating a bug if python doesn't exist in PATH, now it takes python PATH from sys.executable [packet_sniffer.py] Improved error logging messages Made method __get_port_info() safer by making the port_type optional defaulting to 'sport', also added validation to only allow 'sport' and 'dport' Added a retry timer in the code, thus script maxes execution at 30sec (based on max_retry_time) Made Sniff() init once only. [vulnscan.py] Improved Time bottleneck by only reading readable files that aren't part of UNREADABLE_EXTENSIONS list, thus now it will read/parse more than scan, which improves time and reduces issues Renamed some methods to be protected members Improved _is_sensitive by using sparse matrices Fixed bug where if result is empty, it crashes due to append of a variable that doesn't exist Remade the ThreadPoolExecutor logic for getting the files by improving its speed and efficiency Signed-off-by: Shahm Najeeb <[email protected]>
Signed-off-by: Shahm Najeeb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 7
🧹 Nitpick comments (17)
CODE/packet_sniffer.py (4)
18-19
: Hey! 🤔 Let's be more specific about those warnings we're ignoring!Instead of ignoring all warnings, it would be safer to only ignore the specific Wireshark-related warnings. This way, we don't accidentally hide other important warnings that might pop up!
-warnings.filterwarnings("ignore") +# Suppress specific Wireshark-related warnings +warnings.filterwarnings("ignore", category=RuntimeWarning, module="wireshark")
380-380
: Let's track that TODO! 📝I see you're planning to make the
max_retry_time
configurable in v3.4.1. Want me to help create an issue to track this enhancement?
399-401
: Let's make that error message more helpful! 🎯When cleanup fails, it would be super helpful to know which resources failed to clean up.
- log.error(f"Error during cleanup: {err}") + log.error(f"Error while closing matplotlib figures during cleanup: {err}")
267-270
: Think about security! 🔒When saving packet data to CSV, we might be storing sensitive network information. Consider:
- Adding file permissions to restrict access
- Implementing data retention policies
- Sanitizing sensitive data before saving
Here's how you could add file permissions:
df = pd.DataFrame(self.packet_data) df.to_csv(file_path, index=False) +os.chmod(file_path, 0o600) # Read/write for owner only log.info(f"Packet data saved to '{file_path}'.")
PLANS.md (3)
10-10
: Heads up on Versioning and Wording!
The task reads "Implement TODOs for v3.4.3" but its version is set to v3.4.1, which might be confusing. Also, the static analysis tool mentioned that a hyphen might be missing in “to-dos.” Please double-check if you want to change “TODOs” to “to-dos” for consistency.🧰 Tools
🪛 LanguageTool
[grammar] ~10-~10: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ...-|------------------------| | Implement TODOs for v3.4.3 ...(TO_DO_HYPHEN)
13-13
: Double-Check the To-Do Wording!
The entry "Implement TODOs for v3.4.2" is clear, but similar to the earlier point, consider if “TODOs” should be revised to “to-dos” as suggested by the grammar hint.🧰 Tools
🪛 LanguageTool
[grammar] ~13-~13: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ... | ✅ | | Implement TODOs for v3.4.2 ...(TO_DO_HYPHEN)
19-19
: Consider a More Formal Term!
The task "Smushsensitive data miner
withvulnscan
" is creative, but “smush” might be too informal. Maybe consider using “Merge” or “Integrate” to keep the documentation professional. Otherwise, it works well.CODE/_dev.py (2)
12-26
: Cool new color_print function! Let's make it even better! 🎨Hey! The color printing is awesome, but we can make it even cooler:
- Move those color codes to constants at the top
- Make the type hints more specific with
Union[None, str]
orstr | None
Here's how:
+ # At the top of the file + COLORS = { + "reset": "\033[0m", + "red": "\033[31m", + "green": "\033[32m", + "yellow": "\033[33m", + "cyan": "\033[36m", + } - def color_print(text, color="reset", is_input=False) -> None | str: - colors = { - "reset": "\033[0m", - "red": "\033[31m", - "green": "\033[32m", - "yellow": "\033[33m", - "cyan": "\033[36m", - } + def color_print(text: str, color: str = "reset", is_input: bool = False) -> str | None: + color_code = COLORS.get(color.lower(), COLORS["reset"])
144-160
: Nice version checking! But let's make it simpler! 🎯The version validation is great, but we can make it easier to understand:
- The regex is a bit complex - we could use a simpler pattern
- The max attempts logic is solid! 👍
Try this simpler version:
- if re.match(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$", version): + if re.match(r"^\d+\.\d+\.\d+$", version):This still catches invalid versions but is way easier to read! Just remember it won't catch versions like "01.1.1" - but that's probably fine for most cases.
CODE/Logicytics.py (3)
18-19
: Consider encapsulating global variables in a configuration class.Hey! 👋 Instead of using global variables, it would be cooler to wrap these in a config class. This makes it easier to manage and test! Here's how:
-ACTION, SUB_ACTION = None, None -MAX_WORKERS = CONFIG.getint("Settings", "max_workers", fallback=min(32, (os.cpu_count() or 1) + 4)) +class LogicyticsConfig: + def __init__(self, config): + self.action = None + self.sub_action = None + self.max_workers = config.getint( + "Settings", + "max_workers", + fallback=min(32, (os.cpu_count() or 1) + 4) + ) + +config = LogicyticsConfig(CONFIG)
515-515
: Let's tackle that TODO about the cleanup function! 🧹I noticed you want to add a cleanup function in v3.4.2. I can help you implement it! Here's what we could do:
- Track created files during execution
- Clean them up on exit
- Handle both normal and forced exits
Want me to create an issue for this and propose a solution?
510-516
: Make the error message more helpful! 📝The warning about force shutdown is good, but let's make it more helpful for users:
try: Logicytics() except KeyboardInterrupt: - log.warning("Shutting down Logicytics utility with force causes many files to remain where they shouldn't") - log.warning("Please don't force shut Logicytics again - As we don't have a cleanup function yet.") + log.warning("⚠️ Force shutdown detected! Some temporary files might be left behind.") + log.warning("💡 Pro tip: Next time, let the program finish naturally or press Enter to exit safely.") + log.debug("Temporary files might be found in: ./temp, ../ACCESS/LOGS") # TODO v3.4.2 -> Cleanup function exit(0)CODE/vulnscan.py (3)
32-46
: Move UNREADABLE_EXTENSIONS to config.iniHey! You've got a super detailed list of file extensions to skip, but it's hardcoded. Moving this to
config.ini
would make it easier to update without touching the code.
21-31
: Let's track these TODOs properly! 📝These are some cool improvements you're planning! Instead of leaving them as comments, wanna create GitHub issues to track them? I can help set those up!
Would you like me to create GitHub issues for:
- v3.4.2 improvements (batch reading, async scanning, etc.)
- v3.4.1 improvements (MAX_FILE_SIZE, config updates)
🧰 Tools
🪛 GitHub Check: CodeFactor
[notice] 26-26: CODE/vulnscan.py#L26
Unresolved comment '# TODO: v3.4.1'. (C100)
225-230
: Use Path for file operationsInstead of using string paths for the output file, let's use
Path
to handle it more safely!Here's a better way:
- with open("Sensitive_File_Paths.txt", "a") as sensitive_file: + output_path = Path("Sensitive_File_Paths.txt") + with output_path.open("a") as sensitive_file:CODE/logicytics/Get.py (1)
8-29
: Sweet decorator! But let's make it even cooler! 🎨The
time_execution
decorator is neat, but we can make it more flexible and useful:
- Instead of just printing, we could use the logger.
- We could add an option to log only if execution time exceeds a threshold.
Here's how we can make it better:
+from logicytics import log + def time_execution(func): """ A decorator that logs the execution time of a function. Usage: @time_execution def my_function(): ... Or: time_execution(my_function, *args, **kwargs) + + Args: + func: The function to be decorated + + Returns: + A wrapped function that logs its execution time """ @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() - print(f"Function '{func.__name__}' executed in {end_time - start_time:.6f} seconds") + execution_time = end_time - start_time + log.debug(f"Function '{func.__name__}' executed in {execution_time:.6f} seconds") + if execution_time > 1.0: # Log warning if execution takes more than 1 second + log.warning(f"Function '{func.__name__}' took longer than expected: {execution_time:.6f} seconds") return result return wrapperCODE/logicytics/__init__.py (1)
25-30
: Nice exception class! But let's make it even more helpful! 💡The
ObjectLoadError
is good, but we can make it more informative for debugging.Here's how we can improve it:
class ObjectLoadError(Exception): """Raised when an Object fails to load.""" - def __init__(self, message="Failed to load object"): + def __init__(self, message="Failed to load object", object_name=None): + """ + Initialize the exception with a custom message and object details. + + Args: + message (str): The error message + object_name (str, optional): Name of the object that failed to load + """ + self.object_name = object_name + if object_name: + message = f"{message} (Object: {object_name})" super().__init__(message)
🛑 Comments failed to post (7)
CODE/_dev.py (1)
101-105: 🛠️ Refactor suggestion
Let's make those file paths work everywhere! 🌍
The file paths use Windows-style backslashes (
\
) which might not work on Mac or Linux. Let's useos.path.join
instead!Here's how to fix it:
checks = [ - ("[-] Have you read the required contributing guidelines?", "..\\CONTRIBUTING.md"), - ("[-] Have you made files you don't want to be run start with '_'?", "."), - ("[-] Have you added the file to CODE dir?", "."), - ("[-] Have you added docstrings and comments?", "..\\CONTRIBUTING.md"), - ("[-] Is each file containing around 1 main feature?", "..\\CONTRIBUTING.md"), + ("[-] Have you read the required contributing guidelines?", os.path.join("..", "CONTRIBUTING.md")), + ("[-] Have you made files you don't want to be run start with '_'?", "."), + ("[-] Have you added the file to CODE dir?", "."), + ("[-] Have you added docstrings and comments?", os.path.join("..", "CONTRIBUTING.md")), + ("[-] Is each file containing around 1 main feature?", os.path.join("..", "CONTRIBUTING.md")), ]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.checks = [ ("[-] Have you read the required contributing guidelines?", os.path.join("..", "CONTRIBUTING.md")), ("[-] Have you made files you don't want to be run start with '_'?", "."), ("[-] Have you added the file to CODE dir?", "."), ("[-] Have you added docstrings and comments?", os.path.join("..", "CONTRIBUTING.md")), ("[-] Is each file containing around 1 main feature?", os.path.join("..", "CONTRIBUTING.md")), ]
CODE/Logicytics.py (2)
193-235: 🛠️ Refactor suggestion
The performance monitoring could be more accurate.
Hey! 🔍 The current way of measuring memory usage might not be super accurate because:
- It only measures the Python process memory
- It doesn't account for garbage collection
- The memory delta might be affected by other processes
Here's a better way to do it:
def __performance(self): """Checks performance of each script.""" if DEBUG.lower() != "debug": log.warning("Advised to turn on DEBUG logging!!") execution_times = [] memory_usage = [] - process = psutil.Process() + process = psutil.Process(os.getpid()) for file in range(len(self.execution_list)): + # Force garbage collection before measurements + import gc + gc.collect() start_time = datetime.now() - start_memory = process.memory_info().rss / 1024 / 1024 # MB + start_memory = process.memory_full_info().uss / 1024 / 1024 # MB log.execution(Execute.script(self.execution_list[file])) end_time = datetime.now() - end_memory = process.memory_info().rss / 1024 / 1024 # MB + end_memory = process.memory_full_info().uss / 1024 / 1024 # MB📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def __performance(self): """Checks performance of each script.""" if DEBUG.lower() != "debug": log.warning("Advised to turn on DEBUG logging!!") execution_times = [] memory_usage = [] process = psutil.Process(os.getpid()) for file in range(len(self.execution_list)): # Force garbage collection before measurements import gc gc.collect() start_time = datetime.now() start_memory = process.memory_full_info().uss / 1024 / 1024 # MB log.execution(Execute.script(self.execution_list[file])) end_time = datetime.now() end_memory = process.memory_full_info().uss / 1024 / 1024 # MB elapsed_time = end_time - start_time memory_delta = end_memory - start_memory memory_usage.append((self.execution_list[file], str(memory_delta))) execution_times.append((self.execution_list[file], elapsed_time)) log.info(f"{self.execution_list[file]} executed in {elapsed_time}") log.info(f"{self.execution_list[file]} used {memory_delta:.2f}MB of memory") log.debug(f"Started with {start_memory}MB of memory and ended with {end_memory}MB of memory") table = PrettyTable() table.field_names = ["Script", "Execution Time", "Memory Usage (MB)"] for script, elapsed_time in execution_times: memory = next(m[1] for m in memory_usage if m[0] == script) table.add_row([script, elapsed_time, f"{memory:.2f}"]) try: with open( f"../ACCESS/LOGS/PERFORMANCE/Performance_Summary_" f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt", "w" ) as f: f.write(table.get_string()) f.write( "\nSome values may be negative, Reason may be due to external resources playing with memory usage, " "close background tasks to get more accurate readings") f.write("Note: This is not a low-level memory logger, data here isn't 100% accurate!") log.info("Performance check complete! Performance log found in ACCESS/LOGS/PERFORMANCE") except Exception as e: log.error(f"Error writing performance log: {e}")
295-318:
⚠️ Potential issueWatch out! 🚨 There's a potential security risk here.
The way you're running commands could be dangerous if someone messes with the file path. Let's make it safer:
@staticmethod def execute_new_window(file_path: str): + # Whitelist of allowed scripts + ALLOWED_SCRIPTS = { + "_debug.py", + "_dev.py" + } + if os.path.basename(file_path) not in ALLOWED_SCRIPTS: + log.error(f"Unauthorized script execution attempt: {file_path}") + return sr_current_dir = os.path.dirname(os.path.abspath(__file__)) sr_script_path = os.path.join(sr_current_dir, file_path) + if not os.path.exists(sr_script_path): + log.error(f"Script not found: {sr_script_path}") + return sr_process = subprocess.Popen(["cmd.exe", "/c", "start", sys.executable, sr_script_path]) sr_process.wait() exit(0)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.@staticmethod def execute_new_window(file_path: str): """ Execute a Python script in a new command prompt window. This function launches the specified Python script in a separate command prompt window, waits for its completion, and then exits the current process. Parameters: file_path (str): The relative path to the Python script to be executed, which will be resolved relative to the current script's directory. Side Effects: - Opens a new command prompt window - Runs the specified Python script - Terminates the current process after script execution Raises: FileNotFoundError: If the specified script path does not exist subprocess.SubprocessError: If there are issues launching the subprocess """ # Whitelist of allowed scripts ALLOWED_SCRIPTS = { "_debug.py", "_dev.py" } if os.path.basename(file_path) not in ALLOWED_SCRIPTS: log.error(f"Unauthorized script execution attempt: {file_path}") return sr_current_dir = os.path.dirname(os.path.abspath(__file__)) sr_script_path = os.path.join(sr_current_dir, file_path) if not os.path.exists(sr_script_path): log.error(f"Script not found: {sr_script_path}") return sr_process = subprocess.Popen(["cmd.exe", "/c", "start", sys.executable, sr_script_path]) sr_process.wait() exit(0)
CODE/vulnscan.py (2)
239-244: 🛠️ Refactor suggestion
Move paths to config.ini
Those Windows paths are hardcoded! Let's move them to
config.ini
so they're easier to change and work on different systems.
105-134: 🛠️ Refactor suggestion
Add error handling for model predictions
The
_is_sensitive
method could crash if the model prediction fails. Let's wrap the prediction code in try-except blocks!Here's how you could make it safer:
def _is_sensitive(self, file_content: str) -> tuple[bool, float, str]: """Determines if a file's content is sensitive using the model.""" + try: if isinstance(self.model, torch.nn.Module): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) features = self.vectorizer.transform([file_content]).tocsr() self.model.eval() with torch.no_grad(): features_tensor = torch.sparse_coo_tensor( torch.LongTensor([features.nonzero()[0], features.nonzero()[1]]), torch.FloatTensor(features.data), size=features.shape ).to(device) prediction = self.model(features_tensor) probability = torch.softmax(prediction, dim=1).max().item() feature_scores = features.data top_indices = np.argsort(feature_scores)[-5:] reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_indices]) return prediction.argmax(dim=1).item() == 1, probability, reason else: features = self.vectorizer.transform([file_content]) prediction = self.model.predict_proba(features) probability = prediction.max() top_features = np.argsort(features.toarray()[0])[-5:] reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_features]) return self.model.predict(features)[0] == 1, probability, reason + except Exception as e: + log.error(f"Model prediction failed: {e}") + return False, 0.0, "Model prediction error"Committable suggestion skipped: line range outside the PR's diff.
CODE/logicytics/Config.py (1)
5-44: 🛠️ Refactor suggestion
Yo! Let's make this function more robust and user-friendly! 🚀
The function looks solid, but we can make it even better:
- Instead of exiting on error, we should raise custom exceptions to let the caller handle errors their way.
- We could add some validation for the config values.
Here's how we can level up the code:
-def __config_data() -> tuple[str, str, list[str], bool, str]: +def __config_data() -> tuple[str, str, list[str], bool, configparser.ConfigParser]: """ Retrieves configuration data from the 'config.ini' file. If the configuration file is not found in any of these locations, - the program exits with an error message. + raises a ConfigError. Returns: tuple[str, str, list[str], bool]: A tuple containing: - Log level (str): Either "DEBUG" or "INFO" - Version (str): System version from configuration - Files (list[str]): List of files specified in configuration - Delete old logs (bool): Flag indicating whether to delete old log files - - CONFIG itself + - CONFIG (configparser.ConfigParser): The configuration object Raises: - SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations + ConfigError: If the config file is not found or has invalid values """ + class ConfigError(Exception): + """Raised when there are issues with the configuration.""" + pass def _config_path() -> str: configs_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.ini") if os.path.exists(configs_path): return configs_path - else: - print("The config.ini file is not found in the expected location.") - exit(1) + raise ConfigError("The config.ini file is not found in the expected location.") config = configparser.ConfigParser() path = _config_path() config.read(path) + # Validate required sections and options + required_sections = {"Settings", "System Settings"} + if not all(section in config for section in required_sections): + raise ConfigError(f"Missing required sections: {required_sections - set(config.sections())}") log_using_debug = config.getboolean("Settings", "log_using_debug") delete_old_logs = config.getboolean("Settings", "delete_old_logs") version = config.get("System Settings", "version") files = config.get("System Settings", "files").split(", ") + # Validate version format (assuming semantic versioning) + if not version.count(".") == 2: + raise ConfigError(f"Invalid version format: {version}") log_using_debug = "DEBUG" if log_using_debug else "INFO" return log_using_debug, version, files, delete_old_logs, config📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def __config_data() -> tuple[str, str, list[str], bool, configparser.ConfigParser]: """ Retrieves configuration data from the 'config.ini' file. If the configuration file is not found in any of these locations, raises a ConfigError. Returns: tuple[str, str, list[str], bool, configparser.ConfigParser]: A tuple containing: - Log level (str): Either "DEBUG" or "INFO" - Version (str): System version from configuration - Files (list[str]): List of files specified in configuration - Delete old logs (bool): Flag indicating whether to delete old log files - CONFIG (configparser.ConfigParser): The configuration object Raises: ConfigError: If the config file is not found or has invalid values """ class ConfigError(Exception): """Raised when there are issues with the configuration.""" pass def _config_path() -> str: configs_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.ini") if os.path.exists(configs_path): return configs_path else: raise ConfigError("The config.ini file is not found in the expected location.") config = configparser.ConfigParser() path = _config_path() config.read(path) # Validate required sections and options required_sections = {"Settings", "System Settings"} if not all(section in config for section in required_sections): raise ConfigError(f"Missing required sections: {required_sections - set(config.sections())}") log_using_debug = config.getboolean("Settings", "log_using_debug") delete_old_logs = config.getboolean("Settings", "delete_old_logs") version = config.get("System Settings", "version") files = config.get("System Settings", "files").split(", ") # Validate version format (assuming semantic versioning) if not version.count(".") == 2: raise ConfigError(f"Invalid version format: {version}") log_using_debug = "DEBUG" if log_using_debug else "INFO" return log_using_debug, version, files, delete_old_logs, config
CODE/logicytics/__init__.py (1)
5-5: 🛠️ Refactor suggestion
Heads up! You've got some unused imports! 🧹
According to the static analysis, these imports aren't being used:
VERSION
CURRENT_FILES
DELETE_LOGS
CONFIG
Let's clean it up:
-from logicytics.Config import DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, CONFIG +from logicytics.Config import DEBUG📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.from logicytics.Config import DEBUG
🧰 Tools
🪛 Ruff (0.8.2)
5-5:
logicytics.Config.VERSION
imported but unused; consider removing, adding to__all__
, or using a redundant alias(F401)
5-5:
logicytics.Config.CURRENT_FILES
imported but unused; consider removing, adding to__all__
, or using a redundant alias(F401)
5-5:
logicytics.Config.DELETE_LOGS
imported but unused; consider removing, adding to__all__
, or using a redundant alias(F401)
5-5:
logicytics.Config.CONFIG
imported but unused; consider removing, adding to__all__
, or using a redundant alias(F401)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
PLANS.md (3)
10-10
: Nitpick on Hyphenation in Task DescriptionHey, just a small note: In line 10, the task says "Implement TODOs for v3.4.1". The static analysis suggests a hyphen might help clarity—consider changing it to "Implement to-dos for v3.4.1".
- | Implement TODOs for v3.4.1 | v3.4.1 | ✅ | + | Implement to-dos for v3.4.1 | v3.4.1 | ✅ |🧰 Tools
🪛 LanguageTool
[grammar] ~10-~10: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ...-|------------------------| | Implement TODOs for v3.4.1 ...(TO_DO_HYPHEN)
13-13
: Nitpick on Hyphenation in Task DescriptionFor consistency with line 10, it might be clearer to change "Implement TODOs for v3.4.2" to "Implement to-dos for v3.4.2". This tiny tweak helps keep things neat and consistent.
- | Implement TODOs for v3.4.2 | v3.4.2 | ✅ | + | Implement to-dos for v3.4.2 | v3.4.2 | ✅ |🧰 Tools
🪛 LanguageTool
[grammar] ~13-~13: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ... | ✅ | | Implement TODOs for v3.4.2 ...(TO_DO_HYPHEN)
20-20
: Consider Formal Wording HereOn line 20, the task "Smush
sensitive data miner
withvulnscan
" is cool, but the word "Smush" is a bit informal. You might want to swap it out for "Merge" or "Integrate" for extra clarity.- | Smush `sensitive data miner` with `vulnscan` | v3.7.0 | ✅ | + | Merge `sensitive data miner` with `vulnscan` | v3.7.0 | ✅ |
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
PLANS.md
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
PLANS.md
[grammar] ~10-~10: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ...-|------------------------| | Implement TODOs for v3.4.1 ...
(TO_DO_HYPHEN)
[grammar] ~13-~13: It appears that a hyphen is missing in the plural noun “to-dos”?
Context: ... | ✅ | | Implement TODOs for v3.4.2 ...
(TO_DO_HYPHEN)
🔇 Additional comments (3)
PLANS.md (3)
11-11
: Docstrings Task Looks SolidLine 11's task "Add docstrings to all functions as well as var types" is clear and to the point. No issues here!
12-12
: Cleanup Task is ClearThe task on line 12 for implementing cleanup functionality for Logicytics on a KeyboardInterrupt is straightforward and fits right in with our improvements.
14-14
: Log Handling Task is on PointLine 14’s "Implement logs for the logicytics lib, rather than prints" is nicely worded and clearly explained.
Also improved __performance() method accuracy Signed-off-by: Shahm Najeeb <[email protected]>
Signed-off-by: Shahm Najeeb <[email protected]>
Code Climate has analyzed commit 54090f0 and detected 10 issues on this pull request. Here's the issue category breakdown:
View more on Code Climate. |
This pull request includes several changes aimed at improving code quality, removing deprecated features, and enhancing logging functionality. The most important changes include the removal of deprecated functions, improvements to logging, and modifications to GPU checks. Codebase simplification:
Logging improvements:
Functionality improvements:
Minor adjustments:
|
@coderabbitai Generate me a table with ALL changes to be added to the release of this repo, dont leave anything out, use the commit messages, every file changed should have its change beside it |
Tip For best results, initiate chat on the files or code changes. @DefinetlyNotAI Here's a complete table of all changes for this release:
(⌐■_■) Yo, that's a lot of changes! Let me know if you need any clarification about specific changes. |
Prerequisites
--dev
flag, if required.PR Type
update
Description
Reworked how Get.py gets the path of the config.ini (for Flag.py) - Now it gets it perfectly using proper logic and not depending on execution script path,
Removed init files for IDEA
Removed deprecated feature _generate_data.py
Minor Updates for FileManagement.py and config.ini for readability purposes
Allowing _test_gpu_acceleration.py to be imported by function check_gpu(), this although is experimental and is still planned to be removed in the future!
This pull request includes several changes to improve the functionality, remove deprecated code, and update configurations in the
CODE
directory. The most important changes include the addition of a new network utility script, modifications to existing scripts for better type hinting and error handling, and the removal of deprecated functions.New functionality:
CODE/network_psutil.py
: Added a new script to collect and save various network statistics usingpsutil
. This includes network interface stats, connections, addresses, and bandwidth usage.Code improvements:
CODE/VulnScan/tools/_test_gpu_acceleration.py
: Modified thecheck_gpu
function to return strings instead of printing directly, improving testability and reusability.CODE/logicytics/FileManagement.py
: Updated the__remove_files
andand_hash
methods to use type hints and simplified the filename generation logic. [1] [2]Removal of deprecated code:
CODE/VulnScan/v2-deprecated/_generate_data.py
: Removed the entire file, which contained deprecated functions for generating test data.Configuration updates:
CODE/config.ini
: Updated the version to 3.4.0 and added a new filenetwork_psutil.py
to the list of tracked files.CODE/dump_memory.py
: Updated TODO comments to reflect the new version 3.4.1.Minor changes:
CODE/logicytics/Flag.py
: Renamed theMatch
class to_Match
and updated references to improve encapsulation. [1] [2]Credit
N/A
Issues Fixed
N/A
Summary by CodeRabbit
New Features
config.ini
file.Refactor
Chores