Skip to content

Commit

Permalink
feat: add colorlog
Browse files Browse the repository at this point in the history
  • Loading branch information
SlaviXG committed Jun 10, 2024
1 parent 0b4e1ea commit f4b99ae
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 21 deletions.
33 changes: 33 additions & 0 deletions color_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from colorama import Fore, Style, init

# Initialize colorama
init(autoreset=True)

# Variable to control color logging
use_color = True


def enable_color_logging(enable: bool):
global use_color
use_color = enable


def log_info(message: str):
if use_color:
print(Fore.GREEN + message)
else:
print(message)


def log_warning(message: str):
if use_color:
print(Fore.YELLOW + message)
else:
print(message)


def log_error(message: str):
if use_color:
print(Fore.RED + message)
else:
print(message)
1 change: 1 addition & 0 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ registration_timeout = 5
pipeline_mode = True
realtime_mode = True
jsonify = True
colorlog = True
pipeline1 = sudo cpufreq-set -r -f 1800000; stress-ng --cpu 0 --timeout 60s --metrics-brief
pipeline2 = echo "Pipeline 2 command 1"; echo "Pipeline 2 command 2"
28 changes: 17 additions & 11 deletions operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import configparser
import os
import json
from threading import Lock, Thread
import time
from threading import Lock
import color_log


class Operator:
Expand All @@ -19,6 +20,7 @@ def __init__(self,
pipeline_mode: bool,
realtime_mode: bool,
jsonify: bool,
colorlog: bool,
*args, **kwargs):
super().__init__(*args, **kwargs)
self._broker = broker
Expand All @@ -32,14 +34,16 @@ def __init__(self,
self._pipeline_mode = pipeline_mode
self._realtime_mode = realtime_mode
self._jsonify = jsonify
self._colorlog = colorlog
color_log.enable_color_logging(self._colorlog)
self._clients = set()
self._client = mqtt.Client()
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._lock = Lock()

def on_connect(self, client, userdata, flags, rc):
print(f"Connected with result code {rc}")
color_log.log_info(f"Connected with result code {rc}")
self._client.subscribe(self._registration_topic)
self._client.subscribe(self._response_topic)

Expand All @@ -51,17 +55,17 @@ def on_message(self, client, userdata, msg):
with self._lock:
if payload not in self._clients:
self._clients.add(payload)
print(f"Registered client: {payload}")
color_log.log_info(f"Registered client: {payload}")
self._client.publish(self._ack_topic, payload)
elif topic == self._response_topic:
print(f"Received feedback:\n{payload}")
color_log.log_info(f"Received feedback:\n{payload}")

def run(self):
self._client.connect(self._broker, self._port, keepalive=60)
self._client.loop_start()

last_registration_time = time.time()
print("Waiting for clients to register...")
color_log.log_info("Waiting for clients to register...")
while True:
with self._lock:
if self._clients:
Expand All @@ -70,7 +74,7 @@ def run(self):
else:
last_registration_time = time.time()

print(f"Registered clients: {', '.join(self._clients)}")
color_log.log_info(f"Registered clients: {', '.join(self._clients)}")

if self._pipeline_mode:
self.run_pipelines()
Expand All @@ -84,7 +88,7 @@ def run(self):
self._client.disconnect()

def run_pipelines(self):
print("Running pipelines...")
color_log.log_info("Running pipelines...")
with self._lock:
for pipeline_name, pipeline_commands in self._pipelines.items():
for client_id in self._clients:
Expand All @@ -95,11 +99,11 @@ def run_pipelines(self):
else:
message = f"{client_id}|{command_message}"
self._client.publish(self._command_topic, message)
print(f"Published command to {client_id}: {command_message}")
color_log.log_warning(f"Published command to {client_id}: {command_message}")
time.sleep(1) # Add a delay to ensure commands are processed sequentially

def run_realtime_mode(self):
print("Entering real-time command mode...")
color_log.log_info("Entering real-time command mode...")
while True:
command = input("Enter command to send to all clients (or 'exit' to quit): ")
if command.lower() == 'exit':
Expand All @@ -112,7 +116,7 @@ def run_realtime_mode(self):
else:
message = f"{client_id}|{command_message}"
self._client.publish(self._command_topic, message)
print(f"Published command to {client_id}: {command_message}")
color_log.log_warning(f"Published command to {client_id}: {command_message}")


if __name__ == '__main__':
Expand All @@ -128,6 +132,7 @@ def run_realtime_mode(self):
pipeline_mode = config.getboolean('operator', 'pipeline_mode')
realtime_mode = config.getboolean('operator', 'realtime_mode')
jsonify = config.getboolean('operator', 'jsonify')
colorlog = config.getboolean('operator', 'colorlog')
pipelines = {k: v for k, v in config['operator'].items() if k.startswith('pipeline')}
operator = Operator(broker,
port,
Expand All @@ -139,5 +144,6 @@ def run_realtime_mode(self):
pipelines,
pipeline_mode,
realtime_mode,
jsonify)
jsonify,
colorlog)
operator.run()
25 changes: 15 additions & 10 deletions sut.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from queue import Queue
from threading import Thread, Event
from datetime import datetime
import color_log


class SUT:
Expand All @@ -18,6 +19,7 @@ def __init__(self, client_id: str,
registration_topic: str,
ack_topic: str,
jsonify: bool,
colorlog: bool,
*args, **kwargs):
super().__init__(*args, **kwargs)
self._client_id = client_id
Expand All @@ -28,6 +30,8 @@ def __init__(self, client_id: str,
self._registration_topic = registration_topic
self._ack_topic = ack_topic
self._jsonify = jsonify
self._colorlog = colorlog
color_log.enable_color_logging(self._colorlog)
self._client = mqtt.Client(client_id=client_id)
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
Expand All @@ -45,17 +49,17 @@ def __init__(self, client_id: str,

def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Connected successfully to {self._broker}:{self._port}")
color_log.log_info(f"Connected successfully to {self._broker}:{self._port}")
self._client.subscribe(self._command_topic)
self._client.subscribe(self._ack_topic)
else:
print(f"Connection failed with code {rc}")
color_log.log_error(f"Connection failed with code {rc}")

def on_message(self, client, userdata, msg):
message = msg.payload.decode()
if msg.topic == self._ack_topic:
if message == self._client_id:
print(f"Received acknowledgment for {self._client_id}")
color_log.log_info(f"Received acknowledgment for {self._client_id}")
self._ack_received.set()
else:
if self._jsonify:
Expand All @@ -64,27 +68,27 @@ def on_message(self, client, userdata, msg):
msg_client_id = data['client_id']
command = data['command']
except json.JSONDecodeError as e:
print(f"Failed to decode JSON message: {e}")
color_log.log_error(f"Failed to decode JSON message: {e}")
return
else:
msg_client_id, command = message.split('|', 1)

if msg_client_id == self._client_id:
print(f"Received command for {self._client_id}: {command}")
color_log.log_warning(f"Received command for {self._client_id}: {command}")
self._command_queue.put(command)

def _send_registration(self):
while not self._ack_received.is_set():
self._client.publish(self._registration_topic, self._client_id)
print(f"Sent registration for {self._client_id}")
color_log.log_info(f"Sent registration for {self._client_id}")
time.sleep(5) # Wait before resending registration

def _process_commands(self):
while not self._stop_event.is_set():
command = self._command_queue.get()
if command is None:
break
print(f"Executing command: {command}")
color_log.log_warning(f"Executing command: {command}")
try:
start_time = datetime.now()
start_iso = start_time.isoformat()
Expand Down Expand Up @@ -133,11 +137,11 @@ def _process_commands(self):
self._command_queue.task_done()

def run(self):
print(f"Attempting to connect to broker at {self._broker}:{self._port}")
color_log.log_info(f"Attempting to connect to broker at {self._broker}:{self._port}")
try:
self._client.connect(self._broker, self._port, 60)
except Exception as e:
print(f"Connection to broker failed: {e}")
color_log.log_error(f"Connection to broker failed: {e}")
self._client.loop_forever()

def stop(self):
Expand All @@ -157,8 +161,9 @@ def stop(self):
registration_topic = config['mqtt']['registration_topic']
ack_topic = config['mqtt']['ack_topic']
jsonify = config.getboolean('operator', 'jsonify')
colorlog = config.getboolean('operator', 'colorlog')
client_id = os.getenv('CLIENT_ID') or 'client1' # Default to 'client1' if CLIENT_ID not set
sut = SUT(client_id, broker, port, command_topic, response_topic, registration_topic, ack_topic, jsonify)
sut = SUT(client_id, broker, port, command_topic, response_topic, registration_topic, ack_topic, jsonify, colorlog)
try:
sut.run()
except KeyboardInterrupt:
Expand Down

0 comments on commit f4b99ae

Please sign in to comment.