Skip to content

Commit

Permalink
Add separate heartbeat thread
Browse files Browse the repository at this point in the history
  • Loading branch information
martinjohndyer committed Aug 5, 2024
1 parent ef6db39 commit 7bd0a9b
Showing 1 changed file with 63 additions and 30 deletions.
93 changes: 63 additions & 30 deletions gtecs/alert/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self):

# sentinel variables
self.running = False
self.latest_message_time = time.time()
self.notice_queue = []
self.latest_notice = None
self.received_notices = 0
Expand All @@ -49,25 +50,34 @@ def run(self, host, port, timeout=5):
"""Run the sentinel as a Pyro daemon."""
self.running = True

# Start threads
# TODO: Switch between socket & kafka, or even have both (as a backup)?
t1 = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
# t1 = threading.Thread(target=self._socket_listener_thread)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=self._handler_thread)
t2.daemon = True
t2.start()
# Start listener and handler threads
listener_mode = 'KAFKA' # TODO: params (or have both?)
if listener_mode == 'SOCKET':
listener_thread = threading.Thread(target=self._socket_listener_thread)
listener_thread.daemon = True
listener_thread.start()
elif listener_mode == 'KAFKA':
listener_thread = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
listener_thread.daemon = True
listener_thread.start()
# Also start a thread to monitor the latest message time
heartbeat_thread = threading.Thread(target=self._kafka_heartbeat_thread)
heartbeat_thread.daemon = True
heartbeat_thread.start()
else:
raise ValueError('Listener mode must be "SOCKET" or "KAFKA"')
handler_thread = threading.Thread(target=self._handler_thread)
handler_thread.daemon = True
handler_thread.start()

# Check the Pyro address is available
try:
Expand Down Expand Up @@ -122,6 +132,7 @@ def _socket_listener_thread(self):

# Define basic handler function to create a Notice instance and add it to the queue
def _handler(payload, _root):
self.latest_message_time = time.time()
notice = Notice.from_payload(payload)
self.notice_queue.append(notice)

Expand Down Expand Up @@ -179,7 +190,6 @@ def _listen(vo_socket, handler):
self.log.info('Closed socket connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_listener_thread(
self,
Expand Down Expand Up @@ -310,17 +320,10 @@ def _kafka_listener_thread(
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if not self.running:
break

# We can use the system heartbeat to check if we're still connected
heartbeat_timeout = 60
latest_message_time = 0
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
time_delta = time.time() - latest_message_time
if latest_message_time > 0 and time_delta > heartbeat_timeout:
raise TimeoutError(f'No heartbeat in {time_delta}s')
latest_message_time = time.time()
self.latest_message_time = time.time()

if 'heartbeat' in metadata.topic:
# No need to process heartbeat messages
Expand Down Expand Up @@ -365,7 +368,38 @@ def _kafka_listener_thread(
self.log.info('Closed connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_heartbeat_thread(self, timeout=60):
"""Monitor the latest message timestamp to check if we're still connected."""
self.log.info('Heartbeat thread started')

timed_out = False
while self.running:
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
time_delta = time.time() - self.latest_message_time
if time_delta > timeout:
# We haven't received a message in a while.
# We can't kill the listener thread from here, this is just for our own logging.
self.log.warning(f'No new messages for {time_delta:.0f}s')
if not timed_out:
# Only send the Slack message once
# Obviously this will fail if the network is down,
# it's more useful if there's an issue on the broker's end.
send_slack_msg(f'Sentinel reports no new messages for {time_delta:.0f}s')
timed_out = True
timed_out_time = time.time()
else:
if timed_out:
# We've started receiving messages again!
self.log.info('Connection restored')
time_delta = time.time() - timed_out_time
send_slack_msg(f'Sentinel connection restored after {time_delta:.0f}s')
timed_out = False
time.sleep(5)

self.log.info('Heartbeat thread stopped')

def _handler_thread(self):
"""Monitor the notice queue and handle any new notices."""
Expand Down Expand Up @@ -420,7 +454,6 @@ def _handler_thread(self):
time.sleep(0.1)

self.log.info('Alert handler thread stopped')
return

def _fermi_skymap_thread(self, notice, timeout=600):
"""Listen for the official skymap for Fermi notices."""
Expand Down

0 comments on commit 7bd0a9b

Please sign in to comment.