Skip to content

Commit

Permalink
Fix unit tests and add core counting to recovery logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DiegoTavares committed Dec 12, 2024
1 parent 7968673 commit 3b24dd1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
68 changes: 39 additions & 29 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def recoverCache(self):
running_frame = rqd.rqnetwork.RunningFrame(self, run_frame)
running_frame.frameAttendantThread = FrameAttendantThread(
self, run_frame, running_frame, recovery_mode=True)
# Make sure cores are accounted for
# pylint: disable=no-member
self.cores.idle_cores -= run_frame.num_cores
self.cores.booked_cores += run_frame.num_cores
# pylint: enable=no-member

running_frame.frameAttendantThread.start()
except:
pass
Expand Down Expand Up @@ -1487,7 +1493,9 @@ def recoverDocker(self):
container_id = runFrame.attributes.get("container_id")

# Recovered frame will stream back logs into a new file, therefore write a new header
self.__createEnvVariables()
self.__writeHeader()

try:
log_stream = None
with self.rqCore.docker_lock:
Expand Down Expand Up @@ -1544,45 +1552,47 @@ def recoverDocker(self):
returncode = -1
msg = "Failed to recover frame container"
logging.exception(msg)
self.rqlog.write("%s - %s" % (msg, e),
prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP)
self.rqlog.write("%s - The frame might have finishes during rqd's reinitialization "
"- %s" % (msg, e),
prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP)
finally:
# Clear up container after if finishes
if container:
container_id = container.short_id
container.remove()
docker_client.close()

# Find exitStatus and exitSignal
if returncode < 0:
# Exited with a signal
frameInfo.exitStatus = 1
frameInfo.exitSignal = -returncode
else:
frameInfo.exitStatus = returncode
frameInfo.exitSignal = 0
if container:
# Find exitStatus and exitSignal
if returncode < 0:
# Exited with a signal
frameInfo.exitStatus = 1
frameInfo.exitSignal = -returncode
else:
frameInfo.exitStatus = returncode
frameInfo.exitSignal = 0

# Log frame start info
log.warning("Frame %s.%s(%s) with pid %s finished on container %s with exitStatus %s %s ",
runFrame.job_name,
runFrame.frame_name,
frameInfo.frameId,
frameInfo.pid,
container_id,
frameInfo.exitStatus,
"" if frameInfo.exitStatus == 0 else " - " + runFrame.log_dir_file)
# Log frame start info
log.warning("Frame %s.%s(%s) with pid %s finished on container %s with exitStatus %s %s ",
runFrame.job_name,
runFrame.frame_name,
frameInfo.frameId,
frameInfo.pid,
container_id,
frameInfo.exitStatus,
"" if frameInfo.exitStatus == 0 else " - " + runFrame.log_dir_file)

try:
with open(tempStatFile, "r", encoding='utf-8') as statFile:
frameInfo.realtime = statFile.readline().split()[1]
frameInfo.utime = statFile.readline().split()[1]
frameInfo.stime = statFile.readline().split()[1]
statFile.close()
# pylint: disable=broad-except
except Exception:
pass # This happens when frames are killed
try:
with open(tempStatFile, "r", encoding='utf-8') as statFile:
frameInfo.realtime = statFile.readline().split()[1]
frameInfo.utime = statFile.readline().split()[1]
frameInfo.stime = statFile.readline().split()[1]
statFile.close()
# pylint: disable=broad-except
except Exception:
pass # This happens when frames are killed

self.__writeFooter()
self.__writeFooter()
self.__cleanup()

def runRecovery(self):
Expand Down
10 changes: 7 additions & 3 deletions rqd/tests/rqcore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ def test_recoverCache_expiredFile(self, mockExists, mockTime, mockGetmtime):

self.assertEqual(len(self.rqcore._RqCore__cache), 0)


def test_recoverCache_validBackup(self):
@mock.patch("rqd.rqcore.FrameAttendantThread", autospec=True)
def test_recoverCache_validBackup(self, attendant_patch):
"""Test recoverCache skips frames that fail to parse"""
self.rqcore.backup_cache_path = 'cache.dat'

Expand All @@ -662,14 +662,18 @@ def test_recoverCache_validBackup(self):
job_id = "job_id",
job_name = "job_name",
frame_id = frameId,
frame_name = "frame_name"
frame_name = "frame_name",
num_cores = 4
)
running_frame = rqd.rqnetwork.RunningFrame(self.rqcore, frame)
self.rqcore.cores.idle_cores = 8
self.rqcore.storeFrame(frameId, running_frame)
self.rqcore.backupCache()
self.__cache = {}
self.rqcore.recoverCache()
self.assertIn('frame123', self.rqcore._RqCore__cache)
self.assertEqual(4, self.rqcore.cores.idle_cores)
self.assertEqual(4, self.rqcore.cores.booked_cores)

def test_recoverCache_invalidFrame(self):
"""Test recoverCache loads frame data from valid backup file"""
Expand Down

0 comments on commit 3b24dd1

Please sign in to comment.