Skip to content

Commit

Permalink
Limit the number of kill retries
Browse files Browse the repository at this point in the history
There's an error condition on rqd where a frame that cannot be killed will end up preventing the host from picking up new jobs. This logic limits the number of repeated killRequests to give host a chance to pick up new jobs. At the same time, blocked frames are logged to spcue.log to be handled manually.

(cherry picked from commit aea4864ef66aca494fb455a7c103e4a832b63d41)
  • Loading branch information
DiegoTavares committed Oct 4, 2023
1 parent ee9748c commit 75506df
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 169 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*
* @param frame
*/
void updateFrameMemoryError(FrameInterface frame);
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Update Memory usage data and LLU time for the given frame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.dao.DataAccessException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -184,7 +185,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {

@Override
public boolean clearVirtualProcAssignement(ProcInterface proc) {
return procDao.clearVirtualProcAssignment(proc);
try {
return procDao.clearVirtualProcAssignment(proc);
} catch (DataAccessException e) {
return false;
}
}

@Transactional(propagation = Propagation.REQUIRED)
Expand Down Expand Up @@ -345,8 +350,8 @@ public void clearFrame(DispatchFrame frame) {

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void updateFrameMemoryError(FrameInterface frame) {
frameDao.updateFrameMemoryError(frame);
public boolean updateFrameMemoryError(FrameInterface frame) {
return frameDao.updateFrameMemoryError(frame);
}

@Transactional(propagation = Propagation.SUPPORTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,8 @@ public interface Dispatcher {
// without being penalized for it.
public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2;

// Percentage of used memory to consider a risk for triggering oom-killer
public static final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = 0.95;

// How much can a frame exceed its reserved memory.
// - 0.5 means 50% above reserve
// - -1.0 makes the feature inactive
public static final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = 0.6;
// How long to keep track of a frame kill request
public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3;

// A higher number gets more deep booking but less spread on the cue.
public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public DispatchRqdKillFrameMemory(String hostname, FrameInterface frame, String
public void run() {
long startTime = System.currentTimeMillis();
try {
if (!isTestMode) {
if (dispatchSupport.updateFrameMemoryError(frame) && !isTestMode) {
rqdClient.killFrame(hostname, frame.getFrameId(), message);
} else {
logger.warn("Could not update frame " + frame.getFrameId() +
" status to EXIT_STATUS_MEMORY_FAILURE. Canceling kill request!");
}
dispatchSupport.updateFrameMemoryError(frame);
} catch (RqdClientException e) {
logger.info("Failed to contact host " + hostname + ", " + e);
logger.warn("Failed to contact host " + hostname + ", " + e);
} finally {
long elapsedTime = System.currentTimeMillis() - startTime;
logger.info("RQD communication with " + hostname +
Expand Down
16 changes: 15 additions & 1 deletion cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,21 @@ dispatcher.booking_queue.max_pool_size=6
# Queue capacity for booking.
dispatcher.booking_queue.queue_capacity=1000

# Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success
# Percentage of used memory to consider a risk for triggering oom-killer
dispatcher.oom_max_safe_used_memory_threshold=0.95

# How much can a frame exceed its reserved memory.
# - 0.5 means 50% above reserve
# - -1.0 makes the feature inactive
# This feature is being kept inactive for now as we work on improving the
# frame retry logic (See commit comment for more details).
dispatcher.oom_frame_overboard_allowed_threshold=-1.0

# How many times should cuebot send a kill request for the same frame-host before reporting
# the frame as stuck
dispatcher.frame_kill_retry_limit=3

# Whether to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success
depend.satisfy_only_on_frame_success=true

# Jobs will be archived to the history tables after being completed for this long.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,17 @@ public void testMemoryAndLlu() {
@Rollback(true)
public void testMemoryAggressionRss() {
jobLauncher.testMode = true;
dispatcher.setTestMode(true);

jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml"));

DispatchHost host = getHost(hostname);
List<VirtualProc> procs = dispatcher.dispatchHost(host);
assertEquals(1, procs.size());
VirtualProc proc = procs.get(0);

long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved *
(1.0 + Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD));
// 1.6 = 1 + dispatcher.oom_frame_overboard_allowed_threshold
long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * 1.6);

// Test rss overboard
RunningFrameInfo info = RunningFrameInfo.newBuilder()
Expand All @@ -493,15 +495,17 @@ public void testMemoryAggressionRss() {
@Rollback(true)
public void testMemoryAggressionMaxRss() {
jobLauncher.testMode = true;
dispatcher.setTestMode(true);
jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml"));

DispatchHost host = getHost(hostname);
List<VirtualProc> procs = dispatcher.dispatchHost(host);
assertEquals(1, procs.size());
VirtualProc proc = procs.get(0);

// 0.6 = dispatcher.oom_frame_overboard_allowed_threshold
long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved *
(1.0 + (2 * Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)));
(1.0 + (2 * 0.6)));

// Test rss>90% and maxRss overboard
RunningFrameInfo info = RunningFrameInfo.newBuilder()
Expand Down
11 changes: 5 additions & 6 deletions cuebot/src/test/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ dispatcher.kill_queue.queue_capacity=1000
dispatcher.booking_queue.core_pool_size=6
dispatcher.booking_queue.max_pool_size=6
dispatcher.booking_queue.queue_capacity=1000

# The minimum amount of free space in the temporary directory (mcp) to book a host.
# E.g: 1G = 1048576 kB => dispatcher.min_bookable_free_temp_dir_kb=1048576
# Default = 1G = 1048576 kB
# If equals to -1, it means the feature is turned off
dispatcher.min_bookable_free_temp_dir_kb=1048576
dispatcher.min_bookable_free_temp_dir_kb=1048576
dispatcher.min_bookable_free_mcp_kb=1048576
dispatcher.oom_max_safe_used_memory_threshold=0.95
dispatcher.oom_frame_overboard_allowed_threshold=0.6
dispatcher.frame_kill_retry_limit=3
2 changes: 2 additions & 0 deletions rqd/rqd/rqdservicers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def KillRunningFrame(self, request, context):
frame = self.rqCore.getRunningFrame(request.frame_id)
if frame:
frame.kill(message=request.message)
else:
log.warning("Wasn't able to find frame(%s) to kill", request.frame_id)
return rqd.compiled_proto.rqd_pb2.RqdStaticKillRunningFrameResponse()

def ShutdownRqdNow(self, request, context):
Expand Down
1 change: 1 addition & 0 deletions rqd/rqd/rqnetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def kill(self, message=""):
else:
os.killpg(self.pid, rqd.rqconstants.KILL_SIGNAL)
finally:
log.warning("kill() successfully killed frameId=%s pid=%s", self.frameId, self.pid)
rqd.rqutil.permissionsLow()
except OSError as e:
log.warning(
Expand Down

0 comments on commit 75506df

Please sign in to comment.