Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Improve backend service URL handling and logging #2246

Open
wants to merge 14 commits into
base: dev/v0.7.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import time
import uuid

import httpx
import traceback
Expand All @@ -12,6 +14,19 @@


class FedMLHttpInference:
_http_client = None # Class variable for shared HTTP client

@classmethod
async def get_http_client(cls):
if cls._http_client is None:
limits = httpx.Limits(
max_keepalive_connections=100,
max_connections=1000,
keepalive_expiry=60
)
cls._http_client = httpx.AsyncClient(limits=limits)
return cls._http_client

def __init__(self):
pass

Expand All @@ -28,8 +43,9 @@ async def is_inference_ready(inference_url, path="ready", timeout=None):

# TODO (Raphael): Support more methods and return codes rules.
try:
async with httpx.AsyncClient() as client:
ready_response = await client.get(url=ready_url, timeout=timeout)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
ready_response = await client.get(url=ready_url, timeout=timeout)

if isinstance(ready_response, (Response, StreamingResponse)):
error_code = ready_response.status_code
Expand Down Expand Up @@ -88,23 +104,35 @@ async def run_http_inference_with_curl_request(


async def stream_generator(inference_url, input_json, method="POST"):
async with httpx.AsyncClient() as client:
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"


async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json,
timeout=None, method="POST"):
response_ok = True
# request_id = str(uuid.uuid4())[:8]
# start_time = time.time()
# logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}")

try:
async with httpx.AsyncClient() as client:
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
# end_time = time.time()
# elapsed_time = end_time - start_time
# logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds")
except Exception as e:
# end_time = time.time()
# elapsed_time = end_time - start_time
# logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}")
response_ok = False
model_inference_result = {"error": e}
return response_ok, model_inference_result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def serve_model_on_premise(self, model_name, endpoint_name, master_device_ids,

if use_remote:
if not self.deploy_model(model_name, device_type, target_devices, "", user_api_key,
additional_params_dict, use_local_deployment, endpoint_id=endpoint_id):
additional_params_dict, use_local_deployment, endpoint_name=endpoint_name,
endpoint_id=endpoint_id):
print("Failed to deploy model")
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
request_json = dict()
logging.info("[Worker] Model deployment is starting...")

logging.info("=" * 80)
logging.info("[Device Model Deployment] Received start deployment request: {}".format(request_json))
logging.info("=" * 80)

# Real gpu per replica (container-level)
num_gpus = gpu_per_replica
gpu_ids, gpu_attach_cmd = None, ""
Expand Down Expand Up @@ -213,6 +217,25 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
detach=True,
command=customized_image_entry_cmd,
)

logging.info("=" * 80)
logging.info("[Device Model Deployment] Creating container with following parameters:")
logging.info("=" * 80)
logging.info("Image: {}".format(inference_image_name))
logging.info("Container name: {}".format(default_server_container_name))
logging.info("Volumes:")
for vol in volumes:
logging.info(" - {}".format(vol))
logging.info("Ports: [{}]".format(port_inside_container))
logging.info("Environment variables:")
for key, value in environment.items():
logging.info(" {} = {}".format(key, value))
logging.info("Host config:")
for key, value in host_config_dict.items():
logging.info(" {} = {}".format(key, value))
logging.info("Command: {}".format(customized_image_entry_cmd))
logging.info("=" * 80)

client.api.start(container=new_container.get("Id"))
except Exception as e:
logging.error(f"Failed to create the container with exception {e}, traceback : {traceback.format_exc()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import traceback
import os
import uuid

from typing import Any, Mapping, MutableMapping, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -46,46 +47,46 @@ class Settings:
redis_password=Settings.redis_password)


@api.middleware("http")
async def auth_middleware(request: Request, call_next):
if "/inference" in request.url.path or "/api/v1/predict" in request.url.path:
try:
# Attempt to parse the JSON body.
request_json = await request.json()
except json.JSONDecodeError:
return JSONResponse(
{"error": True, "message": "Invalid JSON."},
status_code=status.HTTP_400_BAD_REQUEST)

# Get endpoint's total pending requests.
end_point_id = request_json.get("end_point_id", None)
pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id)
if pending_requests_num:
# Fetch metrics of the past k=3 requests.
pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics(
end_point_id=end_point_id,
k_recent=3)

# Get the request timeout from the endpoint settings.
request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \
.get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT)

# Only proceed if the past k metrics collection is not empty.
if pask_k_metrics:
# Measure the average latency in seconds(!), hence the 0.001 multiplier.
past_k_latencies_sec = \
[float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics]
mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec)

# If timeout threshold is exceeded then cancel and return time out error.
should_block = (mean_latency * pending_requests_num) > request_timeout_s
if should_block:
return JSONResponse(
{"error": True, "message": "Request timed out."},
status_code=status.HTTP_504_GATEWAY_TIMEOUT)

response = await call_next(request)
return response
# @api.middleware("http")
# async def auth_middleware(request: Request, call_next):
# if "/inference" in request.url.path or "/api/v1/predict" in request.url.path:
# try:
# # Attempt to parse the JSON body.
# request_json = await request.json()
# except json.JSONDecodeError:
# return JSONResponse(
# {"error": True, "message": "Invalid JSON."},
# status_code=status.HTTP_400_BAD_REQUEST)

# # Get endpoint's total pending requests.
# end_point_id = request_json.get("end_point_id", None)
# pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id)
# if pending_requests_num:
# # Fetch metrics of the past k=3 requests.
# pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics(
# end_point_id=end_point_id,
# k_recent=3)

# # Get the request timeout from the endpoint settings.
# request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \
# .get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT)

# # Only proceed if the past k metrics collection is not empty.
# if pask_k_metrics:
# # Measure the average latency in seconds(!), hence the 0.001 multiplier.
# past_k_latencies_sec = \
# [float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics]
# mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec)

# # If timeout threshold is exceeded then cancel and return time out error.
# should_block = (mean_latency * pending_requests_num) > request_timeout_s
# if should_block:
# return JSONResponse(
# {"error": True, "message": "Request timed out."},
# status_code=status.HTTP_504_GATEWAY_TIMEOUT)

# response = await call_next(request)
# return response


@api.on_event("startup")
Expand Down Expand Up @@ -198,7 +199,6 @@ async def _predict(
# Always increase the pending requests counter on a new incoming request.
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, increase=True)
inference_response = {}

try:
in_end_point_id = end_point_id
in_end_point_name = input_json.get("end_point_name", None)
Expand Down Expand Up @@ -259,6 +259,11 @@ async def _predict(
input_list["stream"] = input_list.get("stream", stream_flag)
output_list = input_json.get("outputs", [])

# request_uuid = str(uuid.uuid4()) # Generate unique request ID
# inference_start_time = time.time()
# start_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_start_time))
# logging.info(f"[Request {request_uuid}] Starting send_inference_request at {start_time_str}")

# main execution of redirecting the inference request to the idle device
inference_response = await send_inference_request(
idle_device,
Expand All @@ -269,6 +274,11 @@ async def _predict(
inference_type=in_return_type,
connectivity_type=connectivity_type,
path=path, request_method=request_method)

# inference_end_time = time.time()
# end_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_end_time))
# inference_duration = inference_end_time - inference_start_time
# logging.info(f"[Request {request_uuid}] Completed send_inference_request at {end_time_str}, duration: {inference_duration:.3f} seconds")

# Calculate model metrics
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,34 @@ def __init__(self, topic, payload):
request_json = json.loads(payload)
self.msg_topic = topic
self.request_json = request_json
self.run_id = request_json["end_point_id"]
self.end_point_name = request_json["end_point_name"]
self.token = request_json["token"]
self.user_id = request_json["user_id"]
self.user_name = request_json["user_name"]
self.device_ids = request_json["device_ids"]
self.device_objs = request_json["device_objs"]
self.run_id = request_json.get("end_point_id")
self.end_point_name = request_json.get("end_point_name", "")
self.token = request_json.get("token", "")
self.user_id = request_json.get("user_id")
self.user_name = request_json.get("user_name", "")
self.device_ids = request_json.get("device_ids", [])
self.device_objs = request_json.get("device_objs", [])

self.model_config = request_json["model_config"]
self.model_name = self.model_config["model_name"]
self.model_id = self.model_config["model_id"]
self.model_version = self.model_config["model_version"]
self.model_storage_url = self.model_config["model_storage_url"]
self.scale_min = self.model_config.get("instance_scale_min", 0)
self.scale_max = self.model_config.get("instance_scale_max", 0)
self.inference_engine = self.model_config.get("inference_engine", 0)
self.inference_end_point_id = self.run_id
# check if model_config is in request_json and is not None
self.scale_min = 1
self.max_unavailable_rate = 0.1
if "model_config" in request_json and request_json["model_config"] is not None:
self.model_config = request_json["model_config"]
self.model_name = self.model_config["model_name"]
self.model_id = self.model_config["model_id"]
self.model_version = self.model_config["model_version"]
self.model_storage_url = self.model_config.get("model_storage_url", "")
self.scale_min = self.model_config.get("instance_scale_min", 1)
self.scale_max = self.model_config.get("instance_scale_max", 1)
self.inference_engine = self.model_config.get("inference_engine")
self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1)

self.inference_end_point_id = self.run_id
self.request_json["run_id"] = self.run_id

self.gpu_topology = self.get_devices_avail_gpus()
self.gpu_per_replica = self.get_gpu_per_replica()

self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1)

def get_devices_avail_gpus(self):
"""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import fedml
from fedml.core.mlops import MLOpsRuntimeLog, MLOpsConfigs
from fedml.core.mlops.mlops_runtime_log import MLOpsFormatter
from .device_model_msg_object import FedMLModelMsgObject
from .device_client_constants import ClientConstants
from .device_model_cache import FedMLModelCache
from .device_server_constants import ServerConstants
Expand Down Expand Up @@ -278,6 +279,17 @@ def process_deployment_result_message(self, topic=None, payload=None):
end_point_id, end_point_name, payload_json["model_name"], "",
ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
message_center=self.message_center)

# when report failed to the MLOps, need to delete the replica has successfully deployed and release the gpu
model_config = dict()
model_config["model_name"] = payload_json["model_name"]
model_config["model_id"] = payload_json["model_id"]
model_config["model_version"] = payload_json["model_version"]
# add model_config to the payload for the delete request
payload_json["model_config"] = model_config
payload_for_del_deploy = json.dumps(payload_json)
model_msg_object = FedMLModelMsgObject(topic, payload_for_del_deploy)
self.send_deployment_delete_request_to_edges(payload_for_del_deploy, model_msg_object, message_center=self.message_center)
return

# Failure handler, send the rollback message to the worker devices only if it has not been rollback
Expand Down Expand Up @@ -442,21 +454,27 @@ def start_device_inference_gateway():
python_program = get_python_program()
inference_port = ServerConstants.get_inference_master_gateway_port()
if not ServerConstants.is_running_on_k8s():
logging.info(f"start the model inference gateway...")
inference_gw_cmd = "fedml.computing.scheduler.model_scheduler.device_model_inference:api"
inference_gateway_pids = RunProcessUtils.get_pid_from_cmd_line(inference_gw_cmd)
if inference_gateway_pids is None or len(inference_gateway_pids) <= 0:
cur_dir = os.path.dirname(__file__)
fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir)))
inference_gateway_process = ServerConstants.exec_console_with_script(f"{python_program} "
f"-m uvicorn {inference_gw_cmd} "
f"--host 0.0.0.0 "
f"--port {str(inference_port)} "
f"--reload --reload-delay 3 "
f"--reload-dir {fedml_base_dir} "
f"--log-level info",
should_capture_stdout=False,
should_capture_stderr=False)
workers = 4
logging.info(f"start the model inference gateway workers[{workers}] no uvloop/httptools...")
inference_gateway_process = ServerConstants.exec_console_with_script(
f"{python_program} -m uvicorn {inference_gw_cmd} "
f"--host 0.0.0.0 "
f"--port {str(inference_port)} "
f"--workers {workers} "
# f"--loop uvloop "
# f"--http httptools "
f"--limit-concurrency 1024 "
f"--backlog 2048 "
f"--timeout-keep-alive 60 "
f"--log-level warning ",
should_capture_stdout=False,
should_capture_stderr=False
)
return inference_gateway_process
else:
return inference_gateway_pids[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ def callback_start_deployment(self, topic, payload):

# Get deployment params
request_json = json.loads(payload)

logging.info("=" * 80)
logging.info("[Master Protocol Manager] Received start deployment request: {}".format(request_json))
logging.info("=" * 80)

run_id = request_json["end_point_id"]
end_point_name = request_json["end_point_name"]
token = request_json["token"]
Expand Down
Loading