From ef6db39b792f4d57fe6803a13b297c5989012731 Mon Sep 17 00:00:00 2001 From: Martin Dyer Date: Mon, 5 Aug 2024 14:21:09 +0100 Subject: [PATCH 1/3] Add GCN heartbeat --- gtecs/alert/sentinel.py | 61 +++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/gtecs/alert/sentinel.py b/gtecs/alert/sentinel.py index 2be1a06..00117ce 100644 --- a/gtecs/alert/sentinel.py +++ b/gtecs/alert/sentinel.py @@ -204,7 +204,7 @@ def _kafka_listener_thread( # There's also no way to select specific notice types # SCIMMA have said they will be reworking their GCN system at some point... topics = ['gcn.notice'] - # Also subscribe to the heartbeat topic, do we can check if we're still connected + # Also subscribe to the heartbeat topic, so we can check if we're still connected topics += ['sys.heartbeat'] elif broker == 'NASA': broker_url = 'kafka://kafka.gcn.nasa.gov/' @@ -247,6 +247,8 @@ def _kafka_listener_thread( 'gcn.classic.voevent.ICECUBE_ASTROTRACK_GOLD', 'gcn.classic.voevent.ICECUBE_CASCADE', ] + # Also subscribe to the heartbeat topic, so we can check if we're still connected + topics += ['gcn.heartbeat'] else: raise ValueError('Broker must be "SCIMMA" or "NASA"') @@ -271,21 +273,23 @@ def _kafka_listener_thread( # # but it doesn't seem to be implemented in hop-client yet. # start_position = datetime.now() - timedelta(hours=12) + # One of the advantages of the newer brokers is monitoring the heartbeat topic + # to make sure we're connected. + # However, if we backdate with a new group ID we'll get weeks and weeks of + # heartbeat messages, which is very annoying. + # So instead we'll sneaky read the latest heartbeat message right now. + # That gives the starting point for that topic, so when we start the stream + # below it'll only have to handle a few seconds of heartbeat messages + # rather than weeks. if broker == 'SCIMMA': - # One of the advantages of the SCIMMA broker is monitoring the heartbeat topic - # to make sure we're connected. - # However, if we backdate with a new group ID we'll get weeks and weeks of - # heartbeat messages, which is very annoying. - # So instead we'll sneaky read the latest heartbeat message right now. - # That gives the starting point for that topic, so when we start the stream - # below it'll only have to handle a few seconds of heartbeat messages - # rather than weeks. url = broker_url + 'sys.heartbeat' - stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True) - consumer = stream.open(url, mode='r', group_id=group_id) - for payload, metadata in consumer.read_raw(metadata=True, autocommit=True): - if metadata.topic == 'sys.heartbeat': - break + elif broker == 'NASA': + url = broker_url + 'gcn.heartbeat' + stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True) + consumer = stream.open(url, mode='r', group_id=group_id) + for payload, metadata in consumer.read_raw(metadata=True, autocommit=True): + if 'heartbeat' in metadata.topic: + break else: self.log.debug('Starting Kafka stream from latest message') start_position = StartPosition.LATEST @@ -307,21 +311,20 @@ def _kafka_listener_thread( if not self.running: break - if broker == 'SCIMMA': - # We can use the system heartbeat to check if we're still connected - heartbeat_timeout = 60 - latest_message_time = 0 - # Because of the sys.heartbeat messages we should be getting a message - # every few seconds, so we can use this timestamp to check if we're - # still connected. - if (latest_message_time and - time.time() - latest_message_time > heartbeat_timeout): - raise TimeoutError(f'No heartbeat in {heartbeat_timeout}s') - latest_message_time = time.time() - - if metadata.topic == 'sys.heartbeat': - # No need to process heartbeat messages - continue + # We can use the system heartbeat to check if we're still connected + heartbeat_timeout = 60 + latest_message_time = 0 + # Because of the heartbeat messages we should be getting a message + # every few seconds, so we can use this timestamp to check if we're + # still connected. + time_delta = time.time() - latest_message_time + if latest_message_time > 0 and time_delta > heartbeat_timeout: + raise TimeoutError(f'No heartbeat in {time_delta}s') + latest_message_time = time.time() + + if 'heartbeat' in metadata.topic: + # No need to process heartbeat messages + continue # Create the notice and add it to the queue try: From 7bd0a9ba9b0e1a82a927e3b937e41402b462a7f5 Mon Sep 17 00:00:00 2001 From: Martin Dyer Date: Mon, 5 Aug 2024 15:12:18 +0100 Subject: [PATCH 2/3] Add separate heartbeat thread --- gtecs/alert/sentinel.py | 93 ++++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 30 deletions(-) diff --git a/gtecs/alert/sentinel.py b/gtecs/alert/sentinel.py index 00117ce..9ffa145 100644 --- a/gtecs/alert/sentinel.py +++ b/gtecs/alert/sentinel.py @@ -34,6 +34,7 @@ def __init__(self): # sentinel variables self.running = False + self.latest_message_time = time.time() self.notice_queue = [] self.latest_notice = None self.received_notices = 0 @@ -49,25 +50,34 @@ def run(self, host, port, timeout=5): """Run the sentinel as a Pyro daemon.""" self.running = True - # Start threads - # TODO: Switch between socket & kafka, or even have both (as a backup)? - t1 = threading.Thread( - target=self._kafka_listener_thread, - args=( - params.KAFKA_USER, - params.KAFKA_PASSWORD, - params.KAFKA_BROKER, - params.KAFKA_GROUP_ID, - params.KAFKA_BACKDATE, - ), - ) - # t1 = threading.Thread(target=self._socket_listener_thread) - t1.daemon = True - t1.start() - - t2 = threading.Thread(target=self._handler_thread) - t2.daemon = True - t2.start() + # Start listener and handler threads + listener_mode = 'KAFKA' # TODO: params (or have both?) + if listener_mode == 'SOCKET': + listener_thread = threading.Thread(target=self._socket_listener_thread) + listener_thread.daemon = True + listener_thread.start() + elif listener_mode == 'KAFKA': + listener_thread = threading.Thread( + target=self._kafka_listener_thread, + args=( + params.KAFKA_USER, + params.KAFKA_PASSWORD, + params.KAFKA_BROKER, + params.KAFKA_GROUP_ID, + params.KAFKA_BACKDATE, + ), + ) + listener_thread.daemon = True + listener_thread.start() + # Also start a thread to monitor the latest message time + heartbeat_thread = threading.Thread(target=self._kafka_heartbeat_thread) + heartbeat_thread.daemon = True + heartbeat_thread.start() + else: + raise ValueError('Listener mode must be "SOCKET" or "KAFKA"') + handler_thread = threading.Thread(target=self._handler_thread) + handler_thread.daemon = True + handler_thread.start() # Check the Pyro address is available try: @@ -122,6 +132,7 @@ def _socket_listener_thread(self): # Define basic handler function to create a Notice instance and add it to the queue def _handler(payload, _root): + self.latest_message_time = time.time() notice = Notice.from_payload(payload) self.notice_queue.append(notice) @@ -179,7 +190,6 @@ def _listen(vo_socket, handler): self.log.info('Closed socket connection') self.log.info('Alert listener thread stopped') - return def _kafka_listener_thread( self, @@ -310,17 +320,10 @@ def _kafka_listener_thread( for payload, metadata in consumer.read_raw(metadata=True, autocommit=True): if not self.running: break - - # We can use the system heartbeat to check if we're still connected - heartbeat_timeout = 60 - latest_message_time = 0 # Because of the heartbeat messages we should be getting a message # every few seconds, so we can use this timestamp to check if we're # still connected. - time_delta = time.time() - latest_message_time - if latest_message_time > 0 and time_delta > heartbeat_timeout: - raise TimeoutError(f'No heartbeat in {time_delta}s') - latest_message_time = time.time() + self.latest_message_time = time.time() if 'heartbeat' in metadata.topic: # No need to process heartbeat messages @@ -365,7 +368,38 @@ def _kafka_listener_thread( self.log.info('Closed connection') self.log.info('Alert listener thread stopped') - return + + def _kafka_heartbeat_thread(self, timeout=60): + """Monitor the latest message timestamp to check if we're still connected.""" + self.log.info('Heartbeat thread started') + + timed_out = False + while self.running: + # Because of the heartbeat messages we should be getting a message + # every few seconds, so we can use this timestamp to check if we're + # still connected. + time_delta = time.time() - self.latest_message_time + if time_delta > timeout: + # We haven't received a message in a while. + # We can't kill the listener thread from here, this is just for our own logging. + self.log.warning(f'No new messages for {time_delta:.0f}s') + if not timed_out: + # Only send the Slack message once + # Obviously this will fail if the network is down, + # it's more useful if there's an issue on the broker's end. + send_slack_msg(f'Sentinel reports no new messages for {time_delta:.0f}s') + timed_out = True + timed_out_time = time.time() + else: + if timed_out: + # We've started receiving messages again! + self.log.info('Connection restored') + time_delta = time.time() - timed_out_time + send_slack_msg(f'Sentinel connection restored after {time_delta:.0f}s') + timed_out = False + time.sleep(5) + + self.log.info('Heartbeat thread stopped') def _handler_thread(self): """Monitor the notice queue and handle any new notices.""" @@ -420,7 +454,6 @@ def _handler_thread(self): time.sleep(0.1) self.log.info('Alert handler thread stopped') - return def _fermi_skymap_thread(self, notice, timeout=600): """Listen for the official skymap for Fermi notices.""" From dd9b3f7412da14aaf64c8582398f6ec93be72439 Mon Sep 17 00:00:00 2001 From: Martin Dyer Date: Mon, 5 Aug 2024 15:16:48 +0100 Subject: [PATCH 3/3] Add command to clear queue --- gtecs/alert/sentinel.py | 4 ++++ scripts/sentinel | 19 ++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/gtecs/alert/sentinel.py b/gtecs/alert/sentinel.py index 9ffa145..79abd2d 100644 --- a/gtecs/alert/sentinel.py +++ b/gtecs/alert/sentinel.py @@ -525,6 +525,10 @@ def get_queue(self): # We could return raw payloads I guess... return [notice.ivorn for notice in self.notice_queue] + def clear_queue(self): + """Clear the current notice queue.""" + self.notice_queue = [] + def run(): """Start the sentinel.""" diff --git a/scripts/sentinel b/scripts/sentinel index 26ec3f2..04cdfc9 100755 --- a/scripts/sentinel +++ b/scripts/sentinel @@ -69,11 +69,19 @@ def query(command, args): print(proxy.ingest_from_file(args[0])) elif command == 'queue': - with Pyro4.Proxy(params.PYRO_URI) as proxy: - queue = proxy.get_queue() - print(f'There are {len(queue)} notices currently in the queue') - for i, ivorn in enumerate(queue): - print(i, ivorn) + if len(args) == 0: + with Pyro4.Proxy(params.PYRO_URI) as proxy: + queue = proxy.get_queue() + print(f'There are {len(queue)} notices currently in the queue') + for i, ivorn in enumerate(queue): + print(i, ivorn) + elif len(args) == 1 and args[0] == 'clear': + with Pyro4.Proxy(params.PYRO_URI) as proxy: + proxy.clear_queue() + print('Queue cleared') + else: + print('ERROR: Invalid arguments for "queue" command') + print('Usage: sentinel queue [clear]') elif command == 'topics': with Pyro4.Proxy(params.PYRO_URI) as proxy: @@ -97,6 +105,7 @@ def print_instructions(): ' shutdown shut down the sentinel', ' ingest [path|ivorn] add the given notice to the queue', ' queue print the notices currently in the queue', + ' queue clear clear all notices currently in the queue', ' topics print all subscribed Kafka topics', ' log [tail args] print sentinel log (alias for tail)', ' help print these instructions',