Skip to content

Commit

Permalink
[bugfix-combination] Add model configuration details for deployment f…
Browse files Browse the repository at this point in the history
…ailure handling
  • Loading branch information
charlieyl committed Feb 13, 2025
1 parent 3bc0666 commit ad22de4
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,34 @@ def __init__(self, topic, payload):
request_json = json.loads(payload)
self.msg_topic = topic
self.request_json = request_json
self.run_id = request_json["end_point_id"]
self.end_point_name = request_json["end_point_name"]
self.token = request_json["token"]
self.user_id = request_json["user_id"]
self.user_name = request_json["user_name"]
self.device_ids = request_json["device_ids"]
self.device_objs = request_json["device_objs"]
self.run_id = request_json.get("end_point_id")
self.end_point_name = request_json.get("end_point_name", "")
self.token = request_json.get("token", "")
self.user_id = request_json.get("user_id")
self.user_name = request_json.get("user_name", "")
self.device_ids = request_json.get("device_ids", [])
self.device_objs = request_json.get("device_objs", [])

self.model_config = request_json["model_config"]
self.model_name = self.model_config["model_name"]
self.model_id = self.model_config["model_id"]
self.model_version = self.model_config["model_version"]
self.model_storage_url = self.model_config["model_storage_url"]
self.scale_min = self.model_config.get("instance_scale_min", 0)
self.scale_max = self.model_config.get("instance_scale_max", 0)
self.inference_engine = self.model_config.get("inference_engine", 0)
self.inference_end_point_id = self.run_id
# check if model_config is in request_json and is not None
self.scale_min = 1
self.max_unavailable_rate = 0.1
if "model_config" in request_json and request_json["model_config"] is not None:
self.model_config = request_json["model_config"]
self.model_name = self.model_config["model_name"]
self.model_id = self.model_config["model_id"]
self.model_version = self.model_config["model_version"]
self.model_storage_url = self.model_config["model_storage_url"]
self.scale_min = self.model_config.get("instance_scale_min", 0)
self.scale_max = self.model_config.get("instance_scale_max", 0)
self.inference_engine = self.model_config.get("inference_engine", 0)
self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1)

self.inference_end_point_id = self.run_id
self.request_json["run_id"] = self.run_id

self.gpu_topology = self.get_devices_avail_gpus()
self.gpu_per_replica = self.get_gpu_per_replica()

self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1)

def get_devices_avail_gpus(self):
"""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,17 @@ def process_deployment_result_message(self, topic=None, payload=None):
end_point_id, end_point_name, payload_json["model_name"], "",
ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
message_center=self.message_center)

# when report failed to the MLOps, need to delete the replica has successfully deployed and release the gpu
model_msg_object = FedMLModelMsgObject(topic, payload)
self.send_deployment_delete_request_to_edges(payload, model_msg_object, message_center=self.message_center)
model_config = dict()
model_config["model_name"] = payload_json["model_name"]
model_config["model_id"] = payload_json["model_id"]
model_config["model_version"] = payload_json["model_version"]
# add model_config to the payload for the delete request
payload_json["model_config"] = model_config
payload_for_del_deploy = json.dumps(payload_json)
model_msg_object = FedMLModelMsgObject(topic, payload_for_del_deploy)
self.send_deployment_delete_request_to_edges(payload_for_del_deploy, model_msg_object, message_center=self.message_center)
return

# Failure handler, send the rollback message to the worker devices only if it has not been rollback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center,

# Send failed result back to master
_ = self.send_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
model_id, model_name, inference_output_url, inference_model_version, inference_port,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
model_id, model_name, inference_output_url, model_version, inference_port,
inference_engine, model_metadata, model_config)

self.status_reporter.run_id = self.run_id
Expand All @@ -272,7 +272,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
# Send failed successful result back to master
logging.info("Finished deployment, continue to send results to master...")
result_payload = self.send_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
model_id, model_name, inference_output_url, model_version, inference_port_external,
inference_engine, model_metadata, model_config, replica_no=rank + 1,
connectivity=connectivity
Expand All @@ -283,7 +283,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
logging.info("inference_port_external {} != inference_port {}".format(
inference_port_external, inference_port))
result_payload = self.construct_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
model_id, model_name, inference_output_url, model_version, inference_port,
inference_engine, model_metadata, model_config, replica_no=rank + 1,
connectivity=connectivity
Expand Down Expand Up @@ -317,7 +317,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,

# Report the deletion msg to master
result_payload = self.send_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED,
model_id, model_name, inference_output_url, model_version, inference_port_external,
inference_engine, model_metadata, model_config, replica_no=rank_to_delete + 1)

Expand Down Expand Up @@ -395,8 +395,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
run_id, self.edge_id, replica_occupied_gpu_ids)

self.send_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
model_id, model_name, inference_output_url, inference_model_version, inference_port,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
model_id, model_name, inference_output_url, model_version, inference_port,
inference_engine, model_metadata, model_config)

self.status_reporter.run_id = self.run_id
Expand All @@ -407,7 +407,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
else:
logging.info("Finished deployment, continue to send results to master...")
result_payload = self.send_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
model_id, model_name, inference_output_url, model_version, inference_port_external,
inference_engine, model_metadata, model_config, replica_no=rank + 1,
connectivity=connectivity
Expand All @@ -417,7 +417,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
logging.info("inference_port_external {} != inference_port {}".format(
inference_port_external, inference_port))
result_payload = self.construct_deployment_results(
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
model_id, model_name, inference_output_url, model_version, inference_port,
inference_engine, model_metadata, model_config, replica_no=rank + 1,
connectivity=connectivity
Expand All @@ -441,12 +441,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
logging.error(f"Unsupported op {op} with op num {op_num}")
return False

def construct_deployment_results(self, end_point_name, device_id, model_status,
def construct_deployment_results(self, end_point_name, device_id, device_ids, model_status,
model_id, model_name, model_inference_url,
model_version, inference_port, inference_engine,
model_metadata, model_config, replica_no=1,
connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT):
deployment_results_payload = {"end_point_id": self.run_id, "end_point_name": end_point_name,
"device_ids": device_ids,
"model_id": model_id, "model_name": model_name,
"model_url": model_inference_url, "model_version": model_version,
"port": inference_port,
Expand All @@ -460,7 +461,7 @@ def construct_deployment_results(self, end_point_name, device_id, model_status,
}
return deployment_results_payload

def send_deployment_results(self, end_point_name, device_id, model_status,
def send_deployment_results(self, end_point_name, device_id, device_ids, model_status,
model_id, model_name, model_inference_url,
model_version, inference_port, inference_engine,
model_metadata, model_config, replica_no=1,
Expand All @@ -469,7 +470,7 @@ def send_deployment_results(self, end_point_name, device_id, model_status,
self.run_id, device_id)

deployment_results_payload = self.construct_deployment_results(
end_point_name, device_id, model_status,
end_point_name, device_id, device_ids, model_status,
model_id, model_name, model_inference_url,
model_version, inference_port, inference_engine,
model_metadata, model_config, replica_no=replica_no, connectivity=connectivity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def callback_start_deployment(self, topic, payload):
ClientConstants.save_run_process(run_id, process.pid)

def callback_delete_deployment(self, topic, payload):
logging.info("[Worker] callback_delete_deployment")
logging.info("[Worker] callback_delete_deployment, topic: {}, payload: {}".format(topic, payload))

# Parse payload as the model message object.
model_msg_object = FedMLModelMsgObject(topic, payload)
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def finalize_options(self):

setup(
name="fedml",
version="0.9.2",
version="0.9.6-dev",
author="FedML Team",
author_email="[email protected]",
description="A research and production integrated edge-cloud library for "
Expand Down

0 comments on commit ad22de4

Please sign in to comment.