Skip to content

Commit

Permalink
Merge pull request #89 from GOTO-OBS/gcn-heartbeat
Browse files Browse the repository at this point in the history
Add heartbeat thread for both Kafka brokers
  • Loading branch information
martinjohndyer authored Aug 5, 2024
2 parents ce05796 + dd9b3f7 commit 808265d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 56 deletions.
142 changes: 91 additions & 51 deletions gtecs/alert/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self):

# sentinel variables
self.running = False
self.latest_message_time = time.time()
self.notice_queue = []
self.latest_notice = None
self.received_notices = 0
Expand All @@ -49,25 +50,34 @@ def run(self, host, port, timeout=5):
"""Run the sentinel as a Pyro daemon."""
self.running = True

# Start threads
# TODO: Switch between socket & kafka, or even have both (as a backup)?
t1 = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
# t1 = threading.Thread(target=self._socket_listener_thread)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=self._handler_thread)
t2.daemon = True
t2.start()
# Start listener and handler threads
listener_mode = 'KAFKA' # TODO: params (or have both?)
if listener_mode == 'SOCKET':
listener_thread = threading.Thread(target=self._socket_listener_thread)
listener_thread.daemon = True
listener_thread.start()
elif listener_mode == 'KAFKA':
listener_thread = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
listener_thread.daemon = True
listener_thread.start()
# Also start a thread to monitor the latest message time
heartbeat_thread = threading.Thread(target=self._kafka_heartbeat_thread)
heartbeat_thread.daemon = True
heartbeat_thread.start()
else:
raise ValueError('Listener mode must be "SOCKET" or "KAFKA"')
handler_thread = threading.Thread(target=self._handler_thread)
handler_thread.daemon = True
handler_thread.start()

# Check the Pyro address is available
try:
Expand Down Expand Up @@ -122,6 +132,7 @@ def _socket_listener_thread(self):

# Define basic handler function to create a Notice instance and add it to the queue
def _handler(payload, _root):
self.latest_message_time = time.time()
notice = Notice.from_payload(payload)
self.notice_queue.append(notice)

Expand Down Expand Up @@ -179,7 +190,6 @@ def _listen(vo_socket, handler):
self.log.info('Closed socket connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_listener_thread(
self,
Expand All @@ -204,7 +214,7 @@ def _kafka_listener_thread(
# There's also no way to select specific notice types
# SCIMMA have said they will be reworking their GCN system at some point...
topics = ['gcn.notice']
# Also subscribe to the heartbeat topic, do we can check if we're still connected
# Also subscribe to the heartbeat topic, so we can check if we're still connected
topics += ['sys.heartbeat']
elif broker == 'NASA':
broker_url = 'kafka://kafka.gcn.nasa.gov/'
Expand Down Expand Up @@ -247,6 +257,8 @@ def _kafka_listener_thread(
'gcn.classic.voevent.ICECUBE_ASTROTRACK_GOLD',
'gcn.classic.voevent.ICECUBE_CASCADE',
]
# Also subscribe to the heartbeat topic, so we can check if we're still connected
topics += ['gcn.heartbeat']
else:
raise ValueError('Broker must be "SCIMMA" or "NASA"')

Expand All @@ -271,21 +283,23 @@ def _kafka_listener_thread(
# # but it doesn't seem to be implemented in hop-client yet.
# start_position = datetime.now() - timedelta(hours=12)

# One of the advantages of the newer brokers is monitoring the heartbeat topic
# to make sure we're connected.
# However, if we backdate with a new group ID we'll get weeks and weeks of
# heartbeat messages, which is very annoying.
# So instead we'll sneaky read the latest heartbeat message right now.
# That gives the starting point for that topic, so when we start the stream
# below it'll only have to handle a few seconds of heartbeat messages
# rather than weeks.
if broker == 'SCIMMA':
# One of the advantages of the SCIMMA broker is monitoring the heartbeat topic
# to make sure we're connected.
# However, if we backdate with a new group ID we'll get weeks and weeks of
# heartbeat messages, which is very annoying.
# So instead we'll sneaky read the latest heartbeat message right now.
# That gives the starting point for that topic, so when we start the stream
# below it'll only have to handle a few seconds of heartbeat messages
# rather than weeks.
url = broker_url + 'sys.heartbeat'
stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True)
consumer = stream.open(url, mode='r', group_id=group_id)
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if metadata.topic == 'sys.heartbeat':
break
elif broker == 'NASA':
url = broker_url + 'gcn.heartbeat'
stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True)
consumer = stream.open(url, mode='r', group_id=group_id)
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if 'heartbeat' in metadata.topic:
break
else:
self.log.debug('Starting Kafka stream from latest message')
start_position = StartPosition.LATEST
Expand All @@ -306,22 +320,14 @@ def _kafka_listener_thread(
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if not self.running:
break
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
self.latest_message_time = time.time()

if broker == 'SCIMMA':
# We can use the system heartbeat to check if we're still connected
heartbeat_timeout = 60
latest_message_time = 0
# Because of the sys.heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
if (latest_message_time and
time.time() - latest_message_time > heartbeat_timeout):
raise TimeoutError(f'No heartbeat in {heartbeat_timeout}s')
latest_message_time = time.time()

if metadata.topic == 'sys.heartbeat':
# No need to process heartbeat messages
continue
if 'heartbeat' in metadata.topic:
# No need to process heartbeat messages
continue

# Create the notice and add it to the queue
try:
Expand Down Expand Up @@ -362,7 +368,38 @@ def _kafka_listener_thread(
self.log.info('Closed connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_heartbeat_thread(self, timeout=60):
"""Monitor the latest message timestamp to check if we're still connected."""
self.log.info('Heartbeat thread started')

timed_out = False
while self.running:
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
time_delta = time.time() - self.latest_message_time
if time_delta > timeout:
# We haven't received a message in a while.
# We can't kill the listener thread from here, this is just for our own logging.
self.log.warning(f'No new messages for {time_delta:.0f}s')
if not timed_out:
# Only send the Slack message once
# Obviously this will fail if the network is down,
# it's more useful if there's an issue on the broker's end.
send_slack_msg(f'Sentinel reports no new messages for {time_delta:.0f}s')
timed_out = True
timed_out_time = time.time()
else:
if timed_out:
# We've started receiving messages again!
self.log.info('Connection restored')
time_delta = time.time() - timed_out_time
send_slack_msg(f'Sentinel connection restored after {time_delta:.0f}s')
timed_out = False
time.sleep(5)

self.log.info('Heartbeat thread stopped')

def _handler_thread(self):
"""Monitor the notice queue and handle any new notices."""
Expand Down Expand Up @@ -417,7 +454,6 @@ def _handler_thread(self):
time.sleep(0.1)

self.log.info('Alert handler thread stopped')
return

def _fermi_skymap_thread(self, notice, timeout=600):
"""Listen for the official skymap for Fermi notices."""
Expand Down Expand Up @@ -489,6 +525,10 @@ def get_queue(self):
# We could return raw payloads I guess...
return [notice.ivorn for notice in self.notice_queue]

def clear_queue(self):
"""Clear the current notice queue."""
self.notice_queue = []


def run():
"""Start the sentinel."""
Expand Down
19 changes: 14 additions & 5 deletions scripts/sentinel
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,19 @@ def query(command, args):
print(proxy.ingest_from_file(args[0]))

elif command == 'queue':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
queue = proxy.get_queue()
print(f'There are {len(queue)} notices currently in the queue')
for i, ivorn in enumerate(queue):
print(i, ivorn)
if len(args) == 0:
with Pyro4.Proxy(params.PYRO_URI) as proxy:
queue = proxy.get_queue()
print(f'There are {len(queue)} notices currently in the queue')
for i, ivorn in enumerate(queue):
print(i, ivorn)
elif len(args) == 1 and args[0] == 'clear':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
proxy.clear_queue()
print('Queue cleared')
else:
print('ERROR: Invalid arguments for "queue" command')
print('Usage: sentinel queue [clear]')

elif command == 'topics':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
Expand All @@ -97,6 +105,7 @@ def print_instructions():
' shutdown shut down the sentinel',
' ingest [path|ivorn] add the given notice to the queue',
' queue print the notices currently in the queue',
' queue clear clear all notices currently in the queue',
' topics print all subscribed Kafka topics',
' log [tail args] print sentinel log (alias for tail)',
' help print these instructions',
Expand Down

0 comments on commit 808265d

Please sign in to comment.