diff --git a/.gitignore b/.gitignore index 090fef4..58df3dc 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +/*.db diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..8b27833 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index b6804a9..332d191 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,4 +8,8 @@ "python.testing.pytestEnabled": true, "python.testing.cwd": "${workspaceFolder}/tests", "python.testing.autoTestDiscoverOnSaveEnabled": true, + "flake8.args": [ + "--max-line-length=120 --ignore E402,W503" + ], + "editor.defaultFormatter": "ms-python.black-formatter" } \ No newline at end of file diff --git a/basic_bot.py b/basic_bot.py index 6e60a10..67fb088 100644 --- a/basic_bot.py +++ b/basic_bot.py @@ -1,20 +1,22 @@ +from meshtastic.stream_interface import StreamInterface from message_processor import MessageProcessor from geopy.geocoders import Nominatim import maidenhead as mh from dadjokes import Dadjoke + class BasicBot(MessageProcessor): - def __init__(self): - super().__init__() - self.trap_list = ["ping", "ack", "testing", "pong", "lheard", "sitrep", "joke"] + def __init__(self, interface: StreamInterface): + super(BasicBot, self).__init__(interface) + self.trap_list = ["ping", "ack", "lheard", "sitrep", "joke", "whereami"] pass - - def auto_response(self, message, snr, rssi, hop, message_from_id, location:list[float], node_list:list[str]): - print(f"BasicBot: Got message: {message}") + def auto_response( + self, message, snr, rssi, hop, message_from_id, location: list[float] + ): + bot_response = None message = message.lower().strip() if "ping" in message: - #Check if the user added @foo to the message bot_response = "PONG, " + f"SNR:{snr} RSSI:{rssi} HOP {hop}" if " " in message: bot_response += " and copy: " + message.split(" ")[1] @@ -26,37 +28,83 @@ def auto_response(self, message, snr, rssi, hop, message_from_id, location:list[ elif "lheard" in message or "sitrep" in message: # make a nice list for the user - short_node_list = [] - for x in node_list[:5]: - short_node_list.append(f"{x[0]} SNR:{x[2]}") + if not self.node_list: + return "Error Processing Node List" + + short_node_list = self.get_node_list() - bot_response = "Last 5 nodes heard:\n" + str("\n".join(short_node_list)) + node_list = [] + for x in short_node_list: + node_list.append(f"{x[0]} [SNR:{x[2]}]") + + bot_response = "Last 5 nodes heard:\n" + str("\n".join(node_list)) elif "whereami" in message: bot_response = self.where_am_i(location[0], location[1]) - + elif "joke" in message: bot_response = self.tell_joke() - + return bot_response def tell_joke(self): - # tell a dad joke, does it need an explanationn :) dadjoke = Dadjoke() return dadjoke.joke - + def where_am_i(self, lat=0, lon=0): whereIam = "" if float(lat) == 0 and float(lon) == 0: - return super().NO_DATA_NOGPS - # initialize Nominatim API + return self.NO_DATA_NOGPS + geolocator = Nominatim(user_agent="mesh-bot") - location = geolocator.reverse(lat + ", " + lon) - address = location.raw['address'] - address_components = ['house_number', 'road', 'city', 'state', 'postcode', 'county', 'country'] - whereIam += ' '.join([address.get(component, '') for component in address_components if component in address]) + location = geolocator.reverse(str(lat) + ", " + str(lon)) + address = location.raw["address"] + address_components = [ + "house_number", + "road", + "city", + "state", + "postcode", + "county", + "country", + ] + whereIam += " ".join( + [ + address.get(component, "") + for component in address_components + if component in address + ] + ) grid = mh.to_maiden(float(lat), float(lon)) whereIam += " Grid: " + grid - return whereIam \ No newline at end of file + return whereIam + + def get_node_list(self, limit=5): + + result = [] + + for index, node in enumerate(self.node_list): + # ignore own + if node["num"] == self.myNodeNum: + continue + + node_name = MessageProcessor.get_name_from_number( + self.interface, node["num"] + ) + snr = node.get("snr", 0) + + # issue where lastHeard is not always present + last_heard = node.get("lastHeard", 0) + + # make a list of nodes with last heard time and SNR + item = (node_name, last_heard, snr) + result.append(item) + + if index >= limit: + break + + result.sort(key=lambda x: x[1], reverse=True) + + return result diff --git a/db_operations.py b/db_operations.py new file mode 100644 index 0000000..42f44b5 --- /dev/null +++ b/db_operations.py @@ -0,0 +1,94 @@ +import sqlite3 +import threading +from datetime import datetime + +thread_local = threading.local() + + +def get_db_connection(): + if not hasattr(thread_local, "connection"): + thread_local.connection = sqlite3.connect(database="messages.db", check_same_thread=False) + return thread_local.connection + + +def initialize_database(): + conn = get_db_connection() + c = conn.cursor() + + c.execute( + """CREATE TABLE IF NOT EXISTS message ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id INTEGER NOT NULL, + sender TEXT NOT NULL, + sender_short_name TEXT NOT NULL, + sender_long_name TEXT NOT NULL, + reply_id INTEGER NOT NULL, + channel INTEGER NOT NULL, + date TEXT NOT NULL, + content TEXT NOT NULL + );""" + ) + c.execute( + """CREATE TABLE IF NOT EXISTS channels ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + url TEXT NOT NULL + );""" + ) + conn.commit() + print("Database schema initialized.") + + +def add_channel(name, url): + conn = get_db_connection() + c = conn.cursor() + c.execute("INSERT INTO channels (name, url) VALUES (?, ?)", (name, url)) + conn.commit() + + +def get_channels(): + conn = get_db_connection() + c = conn.cursor() + c.execute("SELECT name, url FROM channels") + return c.fetchall() + + +def add_message( + message_id, + sender_id, + sender_short_name, + sender_long_name, + reply_id, + channel, + content, +): + conn = get_db_connection() + c = conn.cursor() + date = datetime.now().strftime("%Y-%m-%d %H:%M") + + c.execute( + "INSERT INTO message (message_id, sender, sender_short_name, sender_long_name, reply_id, " + "channel, date, content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + message_id, + sender_id, + sender_short_name, + sender_long_name, + reply_id, + channel, + date, + content, + ), + ) + return conn.commit() + + +def get_messages(top: int = 5): + conn = get_db_connection() + c = conn.cursor() + c.execute( + "SELECT id, sender_short_name, sender_long_name, date, channel, " + "content FROM message ORDER BY date DESC LIMIT ?", + (top,), + ) + return c.fetchall() diff --git a/log.py b/log.py index 8a34027..649748e 100644 --- a/log.py +++ b/log.py @@ -1,4 +1,5 @@ from datetime import datetime + def log_timestamp(): - return datetime.now().strftime("%Y-%m-%d %H:%M:%S") \ No newline at end of file + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/mesh_bot.py b/mesh_bot.py index 55f4fdf..7d71be7 100644 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -5,8 +5,8 @@ import time from typing import List from pubsub import pub -from datetime import datetime from basic_bot import BasicBot +from db_operations import initialize_database from message_processor import MessageProcessor from serial_mesh import SerialMeshHelper @@ -14,43 +14,54 @@ import meshtastic.tcp_interface import meshtastic.ble_interface +from store_forward_bot import StoreForwardBot from weather_bot import WeatherBot # Uncomment the interface you want to use depending on your device connection -interface = meshtastic.serial_interface.SerialInterface() #serial interface -#interface=meshtastic.tcp_interface.TCPInterface(hostname="10.0.4.36") # IP of your device -#interface=meshtastic.ble_interface.BLEInterface("10:06:1C:49:90:36") # BLE interface - find it using meshtastic --ble-scan +interface = meshtastic.serial_interface.SerialInterface() # serial interface +# interface=meshtastic.tcp_interface.TCPInterface(hostname="10.0.4.36") # IP of your device +# BLE interface - find it using meshtastic --ble-scan +# interface=meshtastic.ble_interface.BLEInterface("10:06:1C:49:90:36") -#interface = None +initialize_database() -myinfo = interface.getMyNodeInfo() -myNodeNum = myinfo['num'] -print(f"System: My Node Number is {myNodeNum}") +bb = BasicBot(interface) +wb = WeatherBot(interface) +sfb = StoreForwardBot(interface) -bb = BasicBot() -wb = WeatherBot() +# node_list = interface.nodes.values() +# print(f"System: Node List {node_list}") -message_processors:List[MessageProcessor] = [bb, wb] +message_processors: List[MessageProcessor] = [bb, wb, sfb] sh = SerialMeshHelper(interface, message_processors) # if not serial or serial.myNodeNum == -1: # print("System: Critical Error script abort. Could not get myNodeNum") # exit() + def exit_handler(signum, frame): print("\nSystem: Closing Autoresponder") interface.close() - exit (0) + exit(0) + + +print("\nMeshtastic Autoresponder MESH Bot CTL+C to exit\n") -print ("\nMeshtastic Autoresponder MESH Bot CTL+C to exit\n") -pub.subscribe(sh.onReceive, 'meshtastic.receive') -print (f"System: Autoresponder Started for device {sh.get_name_from_number(sh.myNodeNum)}") +# subscribe to process messages +pub.subscribe(sh.onReceive, "meshtastic.receive") +# subscrie to store to DB +pub.subscribe(sfb.onReceive, "meshtastic.receive") + +print( + f"System: Autoresponder Started for device {MessageProcessor.get_name_from_number(interface, sh.myNodeNum)}" +) while True: # Catch CTL+C to exit + time.sleep(0.05) signal.signal(signal.SIGINT, exit_handler) + time.sleep(0.05) pass - - # EOF diff --git a/message_processor.py b/message_processor.py index dce706c..18e889b 100644 --- a/message_processor.py +++ b/message_processor.py @@ -1,26 +1,59 @@ import time +from typing import Dict +from meshtastic.stream_interface import StreamInterface + class MessageProcessor: - URL_TIMEOUT = 10 # wait time for URL requests - NO_DATA_NOGPS = "No GPS data available" - ERROR_FETCHING_DATA = "error fetching data" - LATITUDE:float = 48.50 - LONGITUDE:float = -123.0 + URL_TIMEOUT = 10 # wait time for URL requests + NO_DATA_NOGPS = "No GPS data available" + ERROR_FETCHING_DATA = "error fetching data" + LATITUDE: float = 48.50 + LONGITUDE: float = -123.0 + node_list: Dict[str, Dict] = {} + + def __init__(self, interface: StreamInterface): + self.interface = interface + self.trap_list = [] + self.node_list = interface.nodes.values() + # print(f"System: Node List {self.node_list}") + self.myNodeNum = interface.getMyNodeInfo()["num"] + pass - def __init__(self): - self.trap_list = [] - pass - - def auto_response(self, message, snr, rssi, hop, message_from_id, location:list[float], node_list:list[str]): - # wait a 700ms to avoid message collision from lora-ack - time.sleep(0.7) - pass + def auto_response( + self, message, snr, rssi, hop, message_from_id, location: list[float] + ): + # wait a 700ms to avoid message collision from lora-ack + time.sleep(0.7) + pass - def messageTrap(self, msg): + def messageTrap(self, msg): # Check if the message contains a trap word - message_list=msg.split(" ") + message_list = msg.split(" ") for m in message_list: for t in self.trap_list: if t.lower() == m.lower(): return True - return False \ No newline at end of file + return False + + @staticmethod + def get_name_from_number(interface: StreamInterface, number, type="long"): + name = "" + for node in interface.nodes.values(): + if number == node["num"]: + if type == "long": + name = node["user"]["longName"] + return name + elif type == "short": + name = node["user"]["shortName"] + return name + else: + pass + else: + name = str( + MessageProcessor.decimal_to_hex(number) + ) # If long name not found, use the ID as string + return name + + @staticmethod + def decimal_to_hex(decimal_number): + return f"!{decimal_number:08x}" diff --git a/requirements.txt b/requirements.txt index 3deb9d9..a86e3b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,7 @@ geopy maidenhead beautifulsoup4 dadjokes +flask +waitress mock pytest \ No newline at end of file diff --git a/serial_mesh.py b/serial_mesh.py index 3b69ac0..3d39c76 100644 --- a/serial_mesh.py +++ b/serial_mesh.py @@ -1,19 +1,13 @@ - import time from typing import List from log import log_timestamp - from meshtastic.stream_interface import StreamInterface -from meshtastic.serial_interface import SerialInterface -from meshtastic.tcp_interface import TCPInterface -import meshtastic.serial_interface -import meshtastic.tcp_interface -import meshtastic.ble_interface - - from message_processor import MessageProcessor -RESPOND_BY_DM_ONLY = True # Set to True to respond messages via DM only (keeps the channel clean) +RESPOND_BY_DM_ONLY = ( + True # Set to True to respond messages via DM only (keeps the channel clean) +) + class SerialMeshHelper: @@ -22,7 +16,9 @@ class SerialMeshHelper: trap_list = [] myNodeNum = -1 - def __init__(self, interface: StreamInterface, message_processors: List[MessageProcessor]): + def __init__( + self, interface: StreamInterface, message_processors: List[MessageProcessor] + ): self.interface = interface self.message_processors = message_processors @@ -32,12 +28,12 @@ def __init__(self, interface: StreamInterface, message_processors: List[MessageP print(f"System: Traps are: {self.trap_list}") self.help_message = "Commands are: " + ", ".join(self.trap_list) - self.MOTD = "Thanks for using MeshBOT! Have a good day!" # Message of the Day + self.MOTD = "Thanks for using MeshBOT! Have a good day!" # Message of the Day try: myinfo = self.interface.getMyNodeInfo() print(f"System: My Node Number is {myinfo}") - self.myNodeNum = myinfo['num'] + self.myNodeNum = myinfo["num"] except Exception as e: print(f"System: Critical Error script abort. {e}") exit() @@ -46,21 +42,25 @@ def get_node_location(self, number) -> list[float]: # Get the location of a node by its number from nodeDB on device latitude = 0 longitude = 0 - position = [0,0] + position = [0, 0] if self.interface.nodes: for node in self.interface.nodes.values(): - if number == node['num']: - if 'position' in node: - latitude = node['position']['latitude'] - longitude = node['position']['longitude'] - print (f"System: location data for {number} is {latitude},{longitude}") + if number == node["num"]: + if "position" in node: + latitude = node["position"]["latitude"] + longitude = node["position"]["longitude"] + print( + f"System: location data for {number} is {latitude},{longitude}" + ) position = [latitude, longitude] return position else: - print (f"{log_timestamp()} System: No location data for {number}") + print( + f"{log_timestamp()} System: No location data for {number}" + ) return position else: - print (f"{log_timestamp()} System: No nodes found") + print(f"{log_timestamp()} System: No nodes found") return position def onReceive(self, packet, interface): @@ -68,43 +68,51 @@ def onReceive(self, packet, interface): message_from_id = 0 snr = 0 rssi = 0 + channel_number = 0 try: - if not 'decoded' in packet or not packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': + if ( + "decoded" not in packet + or not packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP" + ): return # print the packet for debugging - + # print(" -- START of Packet --") # print(packet) # print(" -- END of packet -- \n") - message_bytes = packet['decoded']['payload'] - message_string = message_bytes.decode('utf-8') - message_from_id = packet['from'] - snr = packet['rxSnr'] - rssi = packet['rxRssi'] + message_string = packet["decoded"]["payload"].decode("utf-8") + message_from_id = packet["from"] + + try: + snr = packet["rxSnr"] + rssi = packet["rxRssi"] + except KeyError: + pass if message_string == "SF": - print(f"{log_timestamp()} System: Ignoring SF (Store and Forward msg) From: {self.get_name_from_number(message_from_id)}") + print( + "{log_timestamp()} System: Ignoring SF (Store and Forward msg) From: " + f"{MessageProcessor.get_name_from_number(interface, message_from_id)}" + ) return - if packet.get('channel'): - channel_number = packet['channel'] - else: - channel_number = 0 - + if packet.get("channel"): + channel_number = packet["channel"] + # check if the packet has a hop count flag use it - if packet.get('hopsAway'): - hop_away = packet['hopsAway'] + if packet.get("hopsAway"): + hop_away = packet["hopsAway"] else: # if the packet does not have a hop count try other methods hop_away = 0 - if packet.get('hopLimit'): - hop_limit = packet['hopLimit'] + if packet.get("hopLimit"): + hop_limit = packet["hopLimit"] else: hop_limit = 0 - - if packet.get('hopStart'): - hop_start = packet['hopStart'] + + if packet.get("hopStart"): + hop_start = packet["hopStart"] else: hop_start = 0 @@ -116,19 +124,26 @@ def onReceive(self, packet, interface): hop_count = hop_away else: hop_count = hop_start - hop_limit - #print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}") + # print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}") hop = f"{hop_count} hops" wasHandled = False - location:list[float] = self.get_node_location(message_from_id) + location: list[float] = self.get_node_location(message_from_id) response = None - + for processor in self.message_processors: if processor.messageTrap(message_string): - response = processor.auto_response(message_string, snr, rssi, hop, message_from_id, location, self.get_node_list()) + response = processor.auto_response( + message_string, + snr, + rssi, + hop, + message_from_id, + location, + ) if response: wasHandled = True break @@ -136,17 +151,26 @@ def onReceive(self, packet, interface): if not response: response = self.help_message + from_name = MessageProcessor.get_name_from_number( + interface, message_from_id + ) # If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us - print(f"{log_timestamp()} System: Received DM: {message_string} From: {self.get_name_from_number(message_from_id)}") + print( + f"{log_timestamp()} System: Received DM: {message_string} From: {from_name}" + ) print(f"{log_timestamp()} System: To: {packet['to']}") print(f"{log_timestamp()} System: My Node Number is {self.myNodeNum}") - if packet['to'] == self.myNodeNum: - print(f"{log_timestamp()} Received DM: {message_string} on Channel: {channel_number} From: {self.get_name_from_number(message_from_id)}") + if packet["to"] == self.myNodeNum: + print( + f"{log_timestamp()} Received DM: '{message_string}' on Channel: {channel_number} From: {from_name}" + ) # respond with a direct message self.send_message(response, channel_number, message_from_id) else: if wasHandled: - print(f"{log_timestamp()} Received On Channel {channel_number}: {message_string} From: {self.get_name_from_number(message_from_id)}") + print( + f"{log_timestamp()} Received On Channel {channel_number}: '{message_string}' From: {from_name}" + ) if RESPOND_BY_DM_ONLY: # respond to channel message via direct message to keep the channel clean self.send_message(response, channel_number, message_from_id) @@ -154,100 +178,80 @@ def onReceive(self, packet, interface): # or respond to channel message on the channel itself self.send_message(response, channel_number, 0) else: - print(f"{log_timestamp()} System: Ignoring incoming channel {channel_number}: {message_string} From: {self.get_name_from_number(message_from_id)}") - + print( + f"{log_timestamp()} System: Ignoring incoming channel " + f"{channel_number}: '{message_string}' From: {from_name}" + ) + # wait a 700ms to avoid message collision from lora-ack time.sleep(0.7) except KeyError as e: print(f"System: Error processing packet: {e}") - print(packet) # print the packet for debugging + print(packet) # print the packet for debugging print("END of packet \n") return str(e) pass - def get_node_list(self): - node_list = [] - if self.interface.nodes: - for node in self.interface.nodes.values(): - # ignore own - if node['num'] != self.myNodeNum: - node_name = self.get_name_from_number(node['num']) - snr = node.get('snr', 0) - - # issue where lastHeard is not always present - last_heard = node.get('lastHeard', 0) - - # make a list of nodes with last heard time and SNR - item = (node_name, last_heard, snr) - node_list.append(item) - - node_list.sort(key=lambda x: x[1], reverse=True) - - return node_list - - else: - return "Error Processing Node List" - def send_message(self, message, ch, nodeid): + message_list = [] # if message over 160 characters, split it into multiple messages if len(message) > 160: - #message_list = [message[i:i+160] for i in range(0, len(message), 160)] + # message_list = [message[i:i+160] for i in range(0, len(message), 160)] # smarter word split - split_message = message.split() - line = '' - split_len = 160 - message_list = [] - for word in split_message: - if len(line+word) 160: + line = line[:160] + message_list.append(line) - line = word + ' ' - message_list.append(line) # needed add contents of the last 'line' into the list + else: + split_message = message.split(" ") + line = "" + split_len = 160 + message_list = [] + for word in split_message: + if len(line + word) < split_len: + line += word + " " + else: + message_list.append(line) + line = word + " " + message_list.append( + line + ) # needed add contents of the last 'line' into the list for m in message_list: if nodeid == 0: - #Send to channel - print (f"{log_timestamp()} System: Sending Multi-Chunk: {m} To: Channel:{ch}") + # Send to channel + print( + f"{log_timestamp()} System: Sending Multi-Chunk: {m} To: Channel:{ch}" + ) self.interface.sendText(text=m, channelIndex=ch) else: # Send to DM - print (f"{log_timestamp()} System: Sending Multi-Chunk: {m} To: {self.get_name_from_number(nodeid)}") - self.interface.sendText(text=m,channelIndex=ch, destinationId=nodeid) + print( + f"{log_timestamp()} System: Sending Multi-Chunk: {m} To: " + f"{MessageProcessor.get_name_from_number(self.interface, nodeid)}" + ) + self.interface.sendText( + text=m, channelIndex=ch, destinationId=nodeid + ) # # wait a 500ms to avoid message collision except after last message # if message_list.index(m) < len(message_list) - 1: # time.sleep(0.5) - else: # message is less than 160 characters + else: # message is less than 160 characters if nodeid == 0: # Send to channel - print (f"{log_timestamp()} System: Sending: {message} To: Channel:{ch}") + print(f"{log_timestamp()} System: Sending: {message} To: Channel:{ch}") self.interface.sendText(text=message, channelIndex=ch) else: # Send to DM - print (f"{log_timestamp()} System: Sending: {message} To: {self.get_name_from_number(nodeid)}") - self.interface.sendText(text=message, channelIndex=ch, destinationId=nodeid) - - def get_name_from_number(self, number, type='long'): - name = "" - for node in self.interface.nodes.values(): - if number == node['num']: - if type == 'long': - name = node['user']['longName'] - return name - elif type == 'short': - name = node['user']['shortName'] - return name - else: - pass - else: - name = str(self.decimal_to_hex(number)) # If long name not found, use the ID as string - return name - - def decimal_to_hex(self, decimal_number): - return f"!{decimal_number:08x}" - - - - + print( + f"{log_timestamp()} System: Sending: {message} To: " + f"{MessageProcessor.get_name_from_number(self.interface, nodeid)}" + ) + self.interface.sendText( + text=message, channelIndex=ch, destinationId=nodeid + ) diff --git a/store_forward_bot.py b/store_forward_bot.py new file mode 100644 index 0000000..dc26018 --- /dev/null +++ b/store_forward_bot.py @@ -0,0 +1,102 @@ +from meshtastic.stream_interface import StreamInterface +from db_operations import add_message, get_messages +from log import log_timestamp +from message_processor import MessageProcessor + + +class StoreForwardBot(MessageProcessor): + def __init__( + self, interface: StreamInterface + ): # myNodeNum:int, node_list:Dict[str, Dict]): + super(StoreForwardBot, self).__init__(interface) + self.trap_list = ["messages"] + pass + + def auto_response(self, message, snr, rssi, hop, sender_id, location: list[float]): + + message = message.lower().strip() + if "messages" in message: + # Check if the user added @foo to the message + messages = get_messages(5) + print(f"{log_timestamp()}SFBot: Messages {messages}") + # format messages to be more readable + messages_nice = [] + print(f"{log_timestamp()}SFBot: Messages {messages}") + for m in messages: + messages_nice.append(f"[{m[3]}] {m[2]}: {m[5]}") + if messages: + bot_response = "Last 5 messages:\n" + str("\n".join(messages_nice)) + else: + bot_response = "No messages found" + + return bot_response + + def onReceive(self, packet, interface): + sender_id = 0 + channel_number = 0 + shortName = "Unknown" + longName = "Unknown" + reply_id = 0 + + try: + if ( + "decoded" not in packet + or not packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP" + ): + return + + # print(f"SFBot: START packet:") + # print(packet) # print the packet for debugging + # print("SFBot: END of packet \n") + + message_string = packet["decoded"]["payload"].decode("utf-8") + + if packet["decoded"].get("replyId"): + reply_id = packet["decoded"]["replyId"] + + message_id = packet["id"] + + if packet.get("channel"): + channel_number = packet["channel"] + + sender_id = packet["from"] + + for node in interface.nodes.values(): + if sender_id == node["num"]: + longName = node["user"]["longName"] + shortName = node["user"]["shortName"] + + print( + f"{log_timestamp()}SFBot: Message #{message_id} received from {longName}:{shortName} " + f"as reply to #{reply_id} on channel {channel_number}: {message_string}" + ) + + if packet["to"] == self.myNodeNum: + print( + f"{log_timestamp()}SFBot: Direct Message received for me, ignoring: {message_string}" + ) + return + + if channel_number != 0: + print( + f"{log_timestamp()}SFBot: ingoring channel other then 0: {message_string}" + ) + return + + # add to database + add_message( + message_id, + sender_id, + shortName, + longName, + reply_id, + channel_number, + message_string, + ) + print(f"{log_timestamp()}SFBot: Message added to database") + + except KeyError as e: + print(f"SFBot: Error processing packet: {e}") + print(packet) # print the packet for debugging + print("END of packet \n") + return str(e) diff --git a/tests/test_basic_bot.py b/tests/test_basic_bot.py index 12f69a2..01b3e6e 100644 --- a/tests/test_basic_bot.py +++ b/tests/test_basic_bot.py @@ -1,18 +1,124 @@ import os import sys -import pytest +from unittest.mock import patch, MagicMock sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from basic_bot import BasicBot -def test_where_am_i_with_valid_coordinates(): - # Test with invalid coordinates - bot = BasicBot() + +@patch("meshtastic.serial_interface.SerialInterface") +def test_where_am_i_with_valid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = BasicBot(mock_interface) lat = "37.7749" lon = "-122.4194" + location = bot.where_am_i(lat, lon) print(f"location: {location}") - assert location == "South Van Ness Avenue San Francisco California 94103 United States Grid: CM87ss" + assert ( + location + == "South Van Ness Avenue San Francisco California 94103 United States Grid: CM87ss" + ) assert location != bot.NO_DATA_NOGPS assert location != bot.ERROR_FETCHING_DATA + + +@patch("meshtastic.serial_interface.SerialInterface") +def test_auto_response_ping(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + bot = BasicBot(mock_interface) + message = "ping" + snr = 10 + rssi = -80 + hop = 2 + message_from_id = "123" + location = [0, 0] + + response = bot.auto_response(message, snr, rssi, hop, message_from_id, location) + + assert response == "PONG, SNR:10 RSSI:-80 HOP 2" + + +@patch("meshtastic.serial_interface.SerialInterface") +def test_auto_response_ack(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = BasicBot(mock_interface) + message = "ack" + snr = 8 + rssi = -90 + hop = 1 + message_from_id = "456" + location = [0, 0] + + response = bot.auto_response(message, snr, rssi, hop, message_from_id, location) + + assert response == "ACK-ACK! SNR:8 RSSI:-90 HOP 1" + + +@patch("meshtastic.serial_interface.SerialInterface") +def test_auto_response_lheard(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = BasicBot(mock_interface) + message = "lheard" + snr = 5 + rssi = -70 + hop = 3 + message_from_id = "789" + location = [0, 0] + + response = bot.auto_response(message, snr, rssi, hop, message_from_id, location) + + assert response == "Last 5 nodes heard:\n" + + +@patch("meshtastic.serial_interface.SerialInterface") +def test_auto_response_whereami(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = BasicBot(mock_interface) + message = "whereami" + snr = 0 + rssi = 0 + hop = 0 + message_from_id = "987" + location = [37.7749, -122.4194] + + response = bot.auto_response(message, snr, rssi, hop, message_from_id, location) + + assert ( + response + == "South Van Ness Avenue San Francisco California 94103 United States Grid: CM87ss" + ) + + +@patch("meshtastic.serial_interface.SerialInterface") +def test_auto_response_joke(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = BasicBot(mock_interface) + message = "joke" + snr = 0 + rssi = 0 + hop = 0 + message_from_id = "654" + location = [0, 0] + + response = bot.auto_response(message, snr, rssi, hop, message_from_id, location) + + assert response is not None diff --git a/tests/test_storage_forward_bot.py b/tests/test_storage_forward_bot.py new file mode 100644 index 0000000..535825c --- /dev/null +++ b/tests/test_storage_forward_bot.py @@ -0,0 +1,60 @@ +import os +import sys +import pytest +from unittest.mock import patch, MagicMock + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from store_forward_bot import StoreForwardBot + + +@pytest.fixture +@patch("meshtastic.serial_interface.SerialInterface") +def bot(mock_interface): + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + return StoreForwardBot(mock_interface) + + +@pytest.fixture +def valid_packet(): + return { + "decoded": { + "portnum": "TEXT_MESSAGE_APP", + "payload": b"Test message", + "replyId": 1, + }, + "id": 123, + "from": 456, + "to": 2, + "channel": 0, + } + + +@patch("store_forward_bot.add_message") +@patch("store_forward_bot.log_timestamp", return_value="[Timestamp] ") +def test_onReceive_valid_packet( + mock_log_timestamp, mock_add_message, bot, valid_packet +): + bot.onReceive(valid_packet, MagicMock()) + mock_add_message.assert_called_once_with( + 123, 456, "Unknown", "Unknown", 1, 0, "Test message" + ) + + +@patch("store_forward_bot.add_message") +@patch( + "meshtastic.serial_interface.SerialInterface.getMyNodeInfo", return_value={"num": 1} +) +def test_onReceive_packet_not_for_bot(mock_add_message, bot, valid_packet): + packet_not_for_bot = valid_packet.copy() + # packet_not_for_bot["from"] = mock # to myself so DM< + bot.onReceive(packet_not_for_bot, MagicMock()) + mock_add_message.assert_not_called() + + +@patch("store_forward_bot.add_message") +def test_onReceive_invalid_portnum(mock_add_message, bot, valid_packet): + invalid_packet = valid_packet.copy() + invalid_packet["decoded"]["portnum"] = "INVALID_PORT" + bot.onReceive(invalid_packet, MagicMock()) + mock_add_message.assert_not_called() diff --git a/tests/test_weather_bot.py b/tests/test_weather_bot.py index ce46ec1..c67a98e 100644 --- a/tests/test_weather_bot.py +++ b/tests/test_weather_bot.py @@ -1,13 +1,17 @@ import os import sys -import pytest +from unittest.mock import patch, MagicMock sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from weather_bot import WeatherBot -from weather_bot import * -def test_get_weather_with_valid_coordinates(): - bot = WeatherBot() +@patch("meshtastic.serial_interface.SerialInterface") +def test_get_weather_with_valid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + bot = WeatherBot(mock_interface) # Test with valid coordinates lat = "37.7749" lon = "-122.4194" @@ -35,18 +39,28 @@ def test_get_weather_with_valid_coordinates(): assert "showers" not in weather assert "thunderstorms" not in weather -def test_get_weather_with_invalid_coordinates(): + +@patch("meshtastic.serial_interface.SerialInterface") +def test_get_weather_with_invalid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface # Test with invalid coordinates - bot = WeatherBot() + bot = WeatherBot(mock_interface) lat = 0 lon = 0 weather = bot.get_weather(lat, lon) print(f"weather: {weather}") assert weather == bot.NO_DATA_NOGPS -def test_get_tide_with_valid_coordinates(): + +@patch("meshtastic.serial_interface.SerialInterface") +def test_get_tide_with_valid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface # Test with valid coordinates - bot = WeatherBot() + bot = WeatherBot(mock_interface) lat = "37.7749" lon = "-122.4194" tide = bot.get_tide(lat, lon) @@ -54,8 +68,13 @@ def test_get_tide_with_valid_coordinates(): assert tide != bot.NO_DATA_NOGPS assert tide != bot.ERROR_FETCHING_DATA -def test_replace_weather(): - bot = WeatherBot() + +@patch("meshtastic.serial_interface.SerialInterface") +def test_replace_weather(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + bot = WeatherBot(mock_interface) # Test replacing days of the week assert bot.replace_weather("Monday") == "Mon " assert bot.replace_weather("Tuesday") == "Tue " @@ -93,9 +112,14 @@ def test_replace_weather(): assert bot.replace_weather("showers") == "shwrs" assert bot.replace_weather("thunderstorms") == "t-storms" -def test_get_moon_with_valid_coordinates(): + +@patch("meshtastic.serial_interface.SerialInterface") +def test_get_moon_with_valid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface # Test with valid coordinates - bot = WeatherBot() + bot = WeatherBot(mock_interface) lat = "37.7749" lon = "-122.4194" moon = bot.get_moon(lat, lon) @@ -108,9 +132,14 @@ def test_get_moon_with_valid_coordinates(): assert "Full Moon" in moon assert "New Moon" in moon -def test_solar_with_valid_coordinates(): + +@patch("meshtastic.serial_interface.SerialInterface") +def test_solar_with_valid_coordinates(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface # Test with valid coordinates - bot = WeatherBot() + bot = WeatherBot(mock_interface) solar = bot.solar_conditions() print(f"solar: {solar}") assert solar != bot.NO_DATA_NOGPS @@ -122,16 +151,27 @@ def test_solar_with_valid_coordinates(): assert "X-Ray Flux" in solar assert "Signal Noise" in solar -def test_drap_xray_conditions(): - bot = WeatherBot() + +@patch("meshtastic.serial_interface.SerialInterface") +def test_drap_xray_conditions(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + + bot = WeatherBot(mock_interface) xray = bot.drap_xray_conditions() print(f"xray: {xray}") assert xray != bot.NO_DATA_NOGPS assert xray != bot.ERROR_FETCHING_DATA assert "X-ray" in xray -def test_get_hf_band_conditions(): - bot = WeatherBot() + +@patch("meshtastic.serial_interface.SerialInterface") +def test_get_hf_band_conditions(mock_interface): + # Mock the SerialInterface to avoid actual serial communication + mock_interface = MagicMock() + mock_interface.return_value = mock_interface + bot = WeatherBot(mock_interface) hf = bot.hf_band_conditions() print(f"hf: {hf}") assert hf != bot.NO_DATA_NOGPS diff --git a/weather_bot.py b/weather_bot.py index e417a1e..6390563 100644 --- a/weather_bot.py +++ b/weather_bot.py @@ -5,21 +5,25 @@ import bs4 as bs from datetime import datetime from datetime import timedelta - from log import log_timestamp from message_processor import MessageProcessor +from meshtastic.stream_interface import StreamInterface class WeatherBot(MessageProcessor): - - DAYS_OF_WEATHER = 2 # weather forecast days, the first two rows are today and tonight - def __init__(self): - super().__init__() + DAYS_OF_WEATHER = ( + 2 # weather forecast days, the first two rows are today and tonight + ) + + def __init__(self, interface: StreamInterface): + super(WeatherBot, self).__init__(interface) self.trap_list = ["sun", "solar", "hfcond", "tide", "moon", "wxc", "wx"] pass - - def auto_response(self, message, snr, rssi, hop, message_from_id, location:list[float], node_list:list[str]): + + def auto_response( + self, message, snr, rssi, hop, message_from_id, location: list[float] + ): print(f"WeatherBot: Got message: {message}") message = message.lower().strip() @@ -35,34 +39,44 @@ def auto_response(self, message, snr, rssi, hop, message_from_id, location:list[ elif "moon" in message: bot_response = self.get_moon(str(location[0]), str(location[1])) elif "wxc" in message: - bot_response = self.get_weather(str(location[0]), str(location[1]),1) + bot_response = self.get_weather(str(location[0]), str(location[1]), 1) elif "wx" in message: bot_response = self.get_weather(str(location[0]), str(location[1])) - + return bot_response def hf_band_conditions(self): # ham radio HF band conditions hf_cond = "" - band_cond = requests.get("https://www.hamqsl.com/solarxml.php", timeout=super().URL_TIMEOUT) - print (f"{log_timestamp()} System: {band_cond}") + band_cond = requests.get( + "https://www.hamqsl.com/solarxml.php", timeout=self.URL_TIMEOUT + ) + print(f"{log_timestamp()} System: {band_cond}") - if(band_cond.ok): + if band_cond.ok: solarxml = xml.dom.minidom.parseString(band_cond.text) for i in solarxml.getElementsByTagName("band"): - hf_cond += i.getAttribute("time")[0]+i.getAttribute("name") +"="+str(i.childNodes[0].data)+"\n" + hf_cond += ( + i.getAttribute("time")[0] + + i.getAttribute("name") + + "=" + + str(i.childNodes[0].data) + + "\n" + ) else: hf_cond += "error fetching" - hf_cond = hf_cond[:-1] # remove the last newline + hf_cond = hf_cond[:-1] # remove the last newline return hf_cond def solar_conditions(self): # radio related solar conditions from hamsql.com solar_cond = "" - solar_cond = requests.get("https://www.hamqsl.com/solarxml.php", timeout=super().URL_TIMEOUT) - print (f"{log_timestamp()} System: {solar_cond}") + solar_cond = requests.get( + "https://www.hamqsl.com/solarxml.php", timeout=self.URL_TIMEOUT + ) + print(f"{log_timestamp()} System: {solar_cond}") - if(solar_cond.ok): + if solar_cond.ok: solar_xml = xml.dom.minidom.parseString(solar_cond.text) for i in solar_xml.getElementsByTagName("solardata"): solar_a_index = i.getElementsByTagName("aindex")[0].childNodes[0].data @@ -70,8 +84,23 @@ def solar_conditions(self): solar_xray = i.getElementsByTagName("xray")[0].childNodes[0].data solar_flux = i.getElementsByTagName("solarflux")[0].childNodes[0].data sunspots = i.getElementsByTagName("sunspots")[0].childNodes[0].data - signalnoise = i.getElementsByTagName("signalnoise")[0].childNodes[0].data - solar_cond = "A-Index: " + solar_a_index + "\nK-Index: " + solar_k_index + "\nSunspots: " + sunspots + "\nX-Ray Flux: " + solar_xray + "\nSolar Flux: " + solar_flux + "\nSignal Noise: " + signalnoise + signalnoise = ( + i.getElementsByTagName("signalnoise")[0].childNodes[0].data + ) + solar_cond = ( + "A-Index: " + + solar_a_index + + "\nK-Index: " + + solar_k_index + + "\nSunspots: " + + sunspots + + "\nX-Ray Flux: " + + solar_xray + + "\nSolar Flux: " + + solar_flux + + "\nSignal Noise: " + + signalnoise + ) else: solar_cond += "error fetching" return solar_cond @@ -79,12 +108,15 @@ def solar_conditions(self): def drap_xray_conditions(self): # DRAP X-ray flux conditions, from NOAA direct drap_cond = "" - drap_cond = requests.get("https://services.swpc.noaa.gov/text/drap_global_frequencies.txt", timeout=super().URL_TIMEOUT) - print (f"{log_timestamp()} System: {drap_cond}") - - if(drap_cond.ok): - drap_list = drap_cond.text.split('\n') - x_filter = '# X-RAY Message :' + drap_cond = requests.get( + "https://services.swpc.noaa.gov/text/drap_global_frequencies.txt", + timeout=self.URL_TIMEOUT, + ) + print(f"{log_timestamp()} System: {drap_cond}") + + if drap_cond.ok: + drap_list = drap_cond.text.split("\n") + x_filter = "# X-RAY Message :" for line in drap_list: if x_filter in line: xray_flux = line.split(": ")[1] @@ -101,25 +133,27 @@ def get_sun(self, lat=0, lon=0): obs.lat = str(lat) obs.lon = str(lon) else: - obs.lat = str(super().LATITUDE) - obs.lon = str(super().LONGITUDE) + obs.lat = str(self.LATITUDE) + obs.lon = str(self.LONGITUDE) sun.compute(obs) sun_table = {} - sun_table['azimuth'] = sun.az - sun_table['altitude'] = sun.alt + sun_table["azimuth"] = sun.az + sun_table["altitude"] = sun.alt # get the next rise and set times local_sunrise = ephem.localtime(obs.next_rising(sun)) local_sunset = ephem.localtime(obs.next_setting(sun)) - sun_table['rise_time'] = local_sunrise.strftime('%a %d %I:%M') - sun_table['set_time'] = local_sunset.strftime('%a %d %I:%M') + sun_table["rise_time"] = local_sunrise.strftime("%a %d %I:%M") + sun_table["set_time"] = local_sunset.strftime("%a %d %I:%M") # if sunset is before sunrise, then it's tomorrow if local_sunset < local_sunrise: local_sunset = ephem.localtime(obs.next_setting(sun)) + timedelta(1) - sun_table['set_time'] = local_sunset.strftime('%a %d %I:%M') - sun_data = "Sun Rise: " + sun_table['rise_time'] + "\nSet: " + sun_table['set_time'] - + sun_table["set_time"] = local_sunset.strftime("%a %d %I:%M") + sun_data = ( + "Sun Rise: " + sun_table["rise_time"] + "\nSet: " + sun_table["set_time"] + ) + return sun_data def get_moon(self, lat=0, lon=0): @@ -131,76 +165,106 @@ def get_moon(self, lat=0, lon=0): obs.lat = str(lat) obs.lon = str(lon) else: - obs.lat = str(super().LATITUDE) - obs.lon = str(super().LONGITUDE) - + obs.lat = str(self.LATITUDE) + obs.lon = str(self.LONGITUDE) + obs.date = datetime.now() moon.compute(obs) moon_table = {} - moon_phase = ['New Moon', 'Waxing Crescent', 'First Quarter', 'Waxing Gibbous', 'Full Moon', 'Waning Gibbous', 'Last Quarter', 'Waning Crescent'][round(moon.phase / (2 * ephem.pi) * 8) % 8] - moon_table['phase'] = moon_phase - moon_table['illumination'] = moon.phase - moon_table['azimuth'] = moon.az - moon_table['altitude'] = moon.alt + moon_phase = [ + "New Moon", + "Waxing Crescent", + "First Quarter", + "Waxing Gibbous", + "Full Moon", + "Waning Gibbous", + "Last Quarter", + "Waning Crescent", + ][round(moon.phase / (2 * ephem.pi) * 8) % 8] + moon_table["phase"] = moon_phase + moon_table["illumination"] = moon.phase + moon_table["azimuth"] = moon.az + moon_table["altitude"] = moon.alt local_moonrise = ephem.localtime(obs.next_rising(moon)) local_moonset = ephem.localtime(obs.next_setting(moon)) - moon_table['rise_time'] = local_moonrise.strftime('%a %d %I:%M%p') - moon_table['set_time'] = local_moonset.strftime('%a %d %I:%M%p') + moon_table["rise_time"] = local_moonrise.strftime("%a %d %I:%M%p") + moon_table["set_time"] = local_moonset.strftime("%a %d %I:%M%p") local_next_full_moon = ephem.localtime(ephem.next_full_moon((obs.date))) local_next_new_moon = ephem.localtime(ephem.next_new_moon((obs.date))) - moon_table['next_full_moon'] = local_next_full_moon.strftime('%a %b %d %I:%M%p') - moon_table['next_new_moon'] = local_next_new_moon.strftime('%a %b %d %I:%M%p') + moon_table["next_full_moon"] = local_next_full_moon.strftime("%a %b %d %I:%M%p") + moon_table["next_new_moon"] = local_next_new_moon.strftime("%a %b %d %I:%M%p") + + moon_data = ( + "Moon Rise:" + + moon_table["rise_time"] + + "\nSet:" + + moon_table["set_time"] + + "\nPhase:" + + moon_table["phase"] + + " @:" + + str("{0:.2f}".format(moon_table["illumination"])) + + "%" + + "\nFull Moon:" + + moon_table["next_full_moon"] + + "\nNew Moon:" + + moon_table["next_new_moon"] + ) - moon_data = "Moon Rise:" + moon_table['rise_time'] + "\nSet:" + moon_table['set_time'] + \ - "\nPhase:" + moon_table['phase'] + " @:" + str('{0:.2f}'.format(moon_table['illumination'])) + "%" \ - + "\nFull Moon:" + moon_table['next_full_moon'] + "\nNew Moon:" + moon_table['next_new_moon'] - return moon_data def get_tide(self, lat=0, lon=0): station_id = "" if float(lat) == 0 and float(lon) == 0: - return super().NO_DATA_NOGPS - station_lookup_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + str(lat) + "&lon=" + str(lon) + "&radius=50" - print (f"{log_timestamp()} System: {station_lookup_url}") + return self.NO_DATA_NOGPS + station_lookup_url = ( + "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + + str(lat) + + "&lon=" + + str(lon) + + "&radius=50" + ) + print(f"{log_timestamp()} System: {station_lookup_url}") try: - station_data = requests.get(station_lookup_url, timeout=super().URL_TIMEOUT) + station_data = requests.get(station_lookup_url, timeout=self.URL_TIMEOUT) if station_data.ok: station_json = station_data.json() else: - return super().ERROR_FETCHING_DATA + return self.ERROR_FETCHING_DATA except (requests.exceptions.RequestException, json.JSONDecodeError): - return super().ERROR_FETCHING_DATA + return self.ERROR_FETCHING_DATA if station_id is None: - return "no tide station found" - - station_id = station_json['stationList'][0]['stationId'] + return "no tide station found" + + station_id = station_json["stationList"][0]["stationId"] - station_url = "https://tidesandcurrents.noaa.gov/noaatidepredictions.html?id=" + station_id - print (f"{log_timestamp()} System: {station_url}") + station_url = ( + "https://tidesandcurrents.noaa.gov/noaatidepredictions.html?id=" + + station_id + ) + print(f"{log_timestamp()} System: {station_url}") try: - station_data = requests.get(station_url, timeout=super().URL_TIMEOUT) + station_data = requests.get(station_url, timeout=self.URL_TIMEOUT) if not station_data.ok: - return super().ERROR_FETCHING_DATA - except (requests.exceptions.RequestException): - return super().ERROR_FETCHING_DATA - + return self.ERROR_FETCHING_DATA + except requests.exceptions.RequestException: + return self.ERROR_FETCHING_DATA + # extract table class="table table-condensed" - soup = bs.BeautifulSoup(station_data.text, 'html.parser') - table = soup.find('table', class_='table table-condensed') + soup = bs.BeautifulSoup(station_data.text, "html.parser") + table = soup.find("table", class_="table table-condensed") # extract rows - rows = table.find_all('tr') + rows = table.find_all("tr") # extract data from rows tide_data = [] for row in rows: row_text = "" - cols = row.find_all('td') + cols = row.find_all("td") for col in cols: row_text += col.text + " " tide_data.append(row_text) @@ -211,43 +275,48 @@ def get_tide(self, lat=0, lon=0): # trim off last newline tide_string = tide_string[:-1] return tide_string - + def get_weather(self, lat=0, lon=0, unit=0): weather = "" if float(lat) == 0 and float(lon) == 0: - return super().NO_DATA_NOGPS - - weather_url = "https://forecast.weather.gov/MapClick.php?FcstType=text&lat=" + str(lat) + "&lon=" + str(lon) + return self.NO_DATA_NOGPS + + weather_url = ( + "https://forecast.weather.gov/MapClick.php?FcstType=text&lat=" + + str(lat) + + "&lon=" + + str(lon) + ) if unit == 1: weather_url += "&unit=1" - print (f"{log_timestamp()} System: {weather_url}") - + print(f"{log_timestamp()} System: {weather_url}") + try: - weather_data = requests.get(weather_url, timeout=super().URL_TIMEOUT) + weather_data = requests.get(weather_url, timeout=self.URL_TIMEOUT) if not weather_data.ok: - return super().ERROR_FETCHING_DATA - except (requests.exceptions.RequestException): - return super().ERROR_FETCHING_DATA - + return self.ERROR_FETCHING_DATA + except requests.exceptions.RequestException: + return self.ERROR_FETCHING_DATA - soup = bs.BeautifulSoup(weather_data.text, 'html.parser') - table = soup.find('div', id="detailed-forecast-body") + soup = bs.BeautifulSoup(weather_data.text, "html.parser") + table = soup.find("div", id="detailed-forecast-body") if table is None: return "no weather data found on NOAA for your location" else: # get rows - rows = table.find_all('div', class_="row") - + rows = table.find_all("div", class_="row") + # extract data from rows - for row in rows: + for index, row in enumerate(rows): # shrink the text line = self.replace_weather(row.text) + weather += line + "\n" # only grab a few days of weather - if len(weather.split("\n")) < self.DAYS_OF_WEATHER: - weather += line + "\n" + if index >= self.DAYS_OF_WEATHER - 1: + break # trim off last newline weather = weather[:-1] @@ -261,6 +330,7 @@ def replace_weather(self, row): "Thursday": "Thu ", "Friday": "Fri ", "Saturday": "Sat ", + "Sunday": "Sun ", "Today": "Today ", "Tonight": "Tonight ", "Tomorrow": "Tomorrow ", @@ -283,11 +353,11 @@ def replace_weather(self, row): "West": "W", "precipitation": "precip", "showers": "shwrs", - "thunderstorms": "t-storms" + "thunderstorms": "t-storms", } line = row for key, value in replacements.items(): line = line.replace(key, value) - + return line diff --git a/web.py b/web.py new file mode 100644 index 0000000..7131915 --- /dev/null +++ b/web.py @@ -0,0 +1,48 @@ +from flask import Flask, render_template_string +from db_operations import get_messages # Assuming this function is in db_operations.py + +app = Flask(__name__) + +HTML_TEMPLATE = """ + + + + Messages + + +

Messages

+ + + +""" + +@app.route('/') +def show_messages(): + messages = get_messages(100) + return render_template_string(HTML_TEMPLATE, messages=messages) + +@app.errorhandler(404) +@app.route('/404') +def page_not_found(e): + return render_template_string('Page Not Found'), 404 + +@app.errorhandler(500) +@app.route('/500') +def error_page(e): + return render_template_string(f'Error:
{e}'), 404 + +def create_app(): + return app + +if __name__ == '__main__': + #app.run(debug=True) + from waitress import serve + serve(app, host="0.0.0.0", port=8080) + +# run waitress-serve --port=8080 --call web:create_app to start the web server \ No newline at end of file