From f0554f72f2a681f87865cb65196ddc20d6fd8976 Mon Sep 17 00:00:00 2001 From: Anton Dubovik Date: Fri, 27 Dec 2024 14:20:07 +0000 Subject: [PATCH 1/3] feat: improved logs --- aidial_analytics_realtime/analytics.py | 34 +++--- aidial_analytics_realtime/app.py | 104 +++++++++++++++--- aidial_analytics_realtime/influx_writer.py | 11 +- aidial_analytics_realtime/topic_model.py | 26 +++-- aidial_analytics_realtime/utils/log_config.py | 37 ++++++- aidial_analytics_realtime/utils/timer.py | 40 +++++++ pyproject.toml | 3 + tests/influx_writer_mock.py | 2 +- tests/test_app.py | 2 +- 9 files changed, 210 insertions(+), 49 deletions(-) create mode 100644 aidial_analytics_realtime/utils/timer.py diff --git a/aidial_analytics_realtime/analytics.py b/aidial_analytics_realtime/analytics.py index 9ecda3f..18c4225 100644 --- a/aidial_analytics_realtime/analytics.py +++ b/aidial_analytics_realtime/analytics.py @@ -1,8 +1,8 @@ +import logging from datetime import datetime from decimal import Decimal from enum import Enum from logging import Logger -from typing import Awaitable, Callable from uuid import uuid4 from influxdb_client import Point @@ -14,8 +14,11 @@ get_chat_completion_response_contents, get_embeddings_request_contents, ) +from aidial_analytics_realtime.influx_writer import InfluxWriterAsync from aidial_analytics_realtime.rates import RatesCalculator from aidial_analytics_realtime.topic_model import TopicModel +from aidial_analytics_realtime.utils.log_config import with_prefix +from aidial_analytics_realtime.utils.timer import Timer identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True) @@ -42,20 +45,25 @@ def detect_lang( case _: assert_never(request_type) - return to_string(detect_lang_by_text(text)) + return to_string(detect_lang_by_text(logger, text)) -def detect_lang_by_text(text: str) -> str | None: +def detect_lang_by_text(logger: logging.Logger, text: str) -> str | None: text = text.strip() if not text: return None + logger = with_prefix(logger, "[langid]") + try: - lang, prob = identifier.classify(text) + with Timer(logger.info): + lang, prob = identifier.classify(text) + if prob > 0.998: return lang - except Exception: + except Exception as e: + logger.error(f"error: {str(e)}") pass return None @@ -106,7 +114,7 @@ def make_point( if chat_id: topic = topic_model.get_topic_by_text( - "\n\n".join(request_contents + response_contents) + logger, "\n\n".join(request_contents + response_contents) ) case RequestType.EMBEDDING: request_contents = get_embeddings_request_contents(logger, request) @@ -114,7 +122,7 @@ def make_point( request_content = "\n".join(request_contents) if chat_id: topic = topic_model.get_topic_by_text( - "\n\n".join(request_contents) + logger, "\n\n".join(request_contents) ) case _: assert_never(request_type) @@ -249,7 +257,7 @@ async def parse_usage_per_model(response: dict): async def on_message( logger: Logger, - influx_writer: Callable[[Point], Awaitable[None]], + influx_writer: InfluxWriterAsync, deployment: str, model: str, project_id: str, @@ -268,8 +276,6 @@ async def on_message( trace: dict | None, execution_path: list | None, ): - logger.info(f"Chat completion response length {len(response)}") - usage_per_model = await parse_usage_per_model(response) if token_usage is not None: point = make_point( @@ -292,7 +298,7 @@ async def on_message( trace, execution_path, ) - await influx_writer(point) + await influx_writer(logger, point) elif len(usage_per_model) == 0: point = make_point( logger, @@ -314,7 +320,7 @@ async def on_message( trace, execution_path, ) - await influx_writer(point) + await influx_writer(logger, point) else: point = make_point( logger, @@ -336,7 +342,7 @@ async def on_message( trace, execution_path, ) - await influx_writer(point) + await influx_writer(logger, point) for usage in usage_per_model: point = make_point( @@ -359,4 +365,4 @@ async def on_message( trace, execution_path, ) - await influx_writer(point) + await influx_writer(logger, point) diff --git a/aidial_analytics_realtime/app.py b/aidial_analytics_realtime/app.py index b36426a..107fd51 100644 --- a/aidial_analytics_realtime/app.py +++ b/aidial_analytics_realtime/app.py @@ -1,9 +1,12 @@ +import asyncio import contextlib import json import logging import re from datetime import datetime +import aiohttp +import starlette.requests import uvicorn from fastapi import Depends, FastAPI, Request from fastapi.responses import JSONResponse @@ -21,7 +24,12 @@ from aidial_analytics_realtime.time import parse_time from aidial_analytics_realtime.topic_model import TopicModel from aidial_analytics_realtime.universal_api_utils import merge -from aidial_analytics_realtime.utils.log_config import configure_loggers, logger +from aidial_analytics_realtime.utils.log_config import ( + app_logger, + configure_loggers, + with_prefix, +) +from aidial_analytics_realtime.utils.timer import Timer RATE_PATTERN = r"/v1/(.+?)/rate" CHAT_COMPLETION_PATTERN = r"/openai/deployments/(.+?)/chat/completions" @@ -49,6 +57,7 @@ async def lifespan(app: FastAPI): async def on_rate_message( + logger: logging.Logger, deployment: str, project_id: str, chat_id: str, @@ -59,7 +68,7 @@ async def on_rate_message( response: dict, influx_writer: InfluxWriterAsync, ): - logger.info(f"Rate message length {len(request) + len(response)}") + app_logger.info(f"Rate message length {len(request) + len(response)}") request_body = json.loads(request["body"]) point = make_rate_point( deployment, @@ -70,10 +79,11 @@ async def on_rate_message( timestamp, request_body, ) - await influx_writer(point) + await influx_writer(logger, point) async def on_chat_completion_message( + logger: logging.Logger, deployment: str, project_id: str, chat_id: str, @@ -149,6 +159,7 @@ async def on_chat_completion_message( async def on_embedding_message( + logger: logging.Logger, deployment: str, project_id: str, chat_id: str, @@ -193,6 +204,7 @@ async def on_embedding_message( async def on_log_message( + logger: logging.Logger, message: dict, influx_writer: InfluxWriterAsync, topic_model: TopicModel, @@ -219,6 +231,7 @@ async def on_log_message( if re.search(RATE_PATTERN, uri): await on_rate_message( + logger, deployment, project_id, chat_id, @@ -232,6 +245,7 @@ async def on_log_message( elif re.search(CHAT_COMPLETION_PATTERN, uri): await on_chat_completion_message( + logger, deployment, project_id, chat_id, @@ -252,6 +266,7 @@ async def on_log_message( elif re.search(EMBEDDING_PATTERN, uri): await on_embedding_message( + logger, deployment, project_id, chat_id, @@ -271,7 +286,7 @@ async def on_log_message( ) else: - logger.warning(f"Unsupported message type: {uri!r}") + app_logger.warning(f"Unsupported message type: {uri!r}") @app.post("/data") @@ -281,28 +296,81 @@ async def on_log_messages( topic_model: TopicModel = Depends(), rates_calculator: RatesCalculator = Depends(), ): + request_logger = app_logger + data = await request.json() - statuses = [] - for idx, item in enumerate(data): - try: - await on_log_message( - json.loads(item["message"]), - influx_writer, - topic_model, - rates_calculator, - ) - except Exception as e: - logging.exception(f"Error processing message #{idx}") - statuses.append({"status": "error", "error": str(e)}) - else: - statuses.append({"status": "success"}) + n = len(data) + request_logger.info(f"number of messages: {n}") + + statuses: list[dict] = [] + + async with Timer(request_logger.info): + for i, item in enumerate(data, start=1): + message_logger = with_prefix(request_logger, f"[{i}/{n}]") + + async with Timer(message_logger.info): + status = await process_message( + message_logger, + json.loads(item["message"]), + influx_writer, + topic_model, + rates_calculator, + ) + + statuses.append(status) + + if request_logger.isEnabledFor(logging.DEBUG): + request_logger.debug(f"response: {json.dumps(statuses)}") # Returning 200 code even if processing of some messages has failed, # since the log broker that sends the messages may decide to retry the failed requests. return JSONResponse(content=statuses, status_code=200) +async def process_message( + logger: logging.Logger, + message: dict, + influx_writer: InfluxWriterAsync, + topic_model: TopicModel, + rates_calculator: RatesCalculator, +) -> dict: + try: + await on_log_message( + logger, + message, + influx_writer, + topic_model, + rates_calculator, + ) + logger.info("success") + return {"status": "success"} + except starlette.requests.ClientDisconnect as e: + logger.error("client disconnect") + return { + "status": "error", + "error": str(e), + "reason": "client disconnect", + } + except aiohttp.ClientConnectionError as e: + logger.error("connection error") + return { + "status": "error", + "error": str(e), + "reason": "connection error", + } + except asyncio.TimeoutError as e: + logger.error("timeout") + return { + "status": "error", + "error": str(e), + "reason": "timeout", + } + except Exception as e: + logger.exception("caught exception") + return {"status": "error", "error": str(e)} + + @app.get("/health") def health(): return {"status": "ok"} diff --git a/aidial_analytics_realtime/influx_writer.py b/aidial_analytics_realtime/influx_writer.py index a1da8a7..9765a92 100644 --- a/aidial_analytics_realtime/influx_writer.py +++ b/aidial_analytics_realtime/influx_writer.py @@ -1,10 +1,14 @@ import os +from logging import Logger from typing import Awaitable, Callable, Tuple from influxdb_client import Point from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync -InfluxWriterAsync = Callable[[Point], Awaitable[None]] +from aidial_analytics_realtime.utils.log_config import with_prefix +from aidial_analytics_realtime.utils.timer import Timer + +InfluxWriterAsync = Callable[[Logger, Point], Awaitable[None]] def create_influx_writer() -> Tuple[InfluxDBClientAsync, InfluxWriterAsync]: @@ -18,7 +22,8 @@ def create_influx_writer() -> Tuple[InfluxDBClientAsync, InfluxWriterAsync]: ) influx_write_api = client.write_api() - async def influx_writer_impl(record: Point): - await influx_write_api.write(bucket=influx_bucket, record=record) + async def influx_writer_impl(logger: Logger, record: Point): + with Timer(with_prefix(logger, "[influx]").info): + await influx_write_api.write(bucket=influx_bucket, record=record) return client, influx_writer_impl diff --git a/aidial_analytics_realtime/topic_model.py b/aidial_analytics_realtime/topic_model.py index b0d6c22..a9916a8 100644 --- a/aidial_analytics_realtime/topic_model.py +++ b/aidial_analytics_realtime/topic_model.py @@ -1,7 +1,11 @@ +import logging import os from bertopic import BERTopic +from aidial_analytics_realtime.utils.log_config import with_prefix +from aidial_analytics_realtime.utils.timer import Timer + class TopicModel: def __init__( @@ -18,14 +22,20 @@ def __init__( self.model = BERTopic.load( topic_model_name, topic_embeddings_model_name ) - self.model.transform(["test"]) # Make sure the model is loaded - def get_topic_by_text(self, text): - topics, _ = self.model.transform([text]) - topic = self.model.get_topic_info(topics[0]) + # Disable tqdm progress bars on batch encoding + self.model.verbose = False + + # Make sure the model is loaded + self.model.transform(["test"]) + + def get_topic_by_text(self, logger: logging.Logger, text): + with Timer(with_prefix(logger, "[topic]").info): + topics, _ = self.model.transform([text]) + topic = self.model.get_topic_info(topics[0]) - if "GeneratedName" in topic: - # "GeneratedName" is an expected name for the human readable topic representation - return topic["GeneratedName"][0][0][0] + if "GeneratedName" in topic: + # "GeneratedName" is an expected name for the human readable topic representation + return topic["GeneratedName"][0][0][0] - return topic["Name"][0] + return topic["Name"][0] diff --git a/aidial_analytics_realtime/utils/log_config.py b/aidial_analytics_realtime/utils/log_config.py index 652a2f4..bf34b56 100644 --- a/aidial_analytics_realtime/utils/log_config.py +++ b/aidial_analytics_realtime/utils/log_config.py @@ -1,9 +1,13 @@ import logging +import os import sys +from typing import Callable +from typing_extensions import override from uvicorn.logging import DefaultFormatter -logger = logging.getLogger("app") +app_logger = logging.getLogger("app") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") def configure_loggers(): @@ -14,8 +18,8 @@ def configure_loggers(): log.handlers = [] log.propagate = True - # Setting up log levels - logger.setLevel(logging.DEBUG) + # Setting log levels for the analytics application + app_logger.setLevel(LOG_LEVEL) # Configuring the root logger root = logging.getLogger() @@ -31,9 +35,34 @@ def configure_loggers(): # if they are already configured if not root_has_stderr_handler: formatter = DefaultFormatter( - fmt="%(asctime)s [%(levelname)s] - %(message)s" + fmt="%(levelprefix)s | %(asctime)s | %(process)d | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + use_colors=True, ) handler = logging.StreamHandler(sys.stderr) handler.setFormatter(formatter) root.addHandler(handler) + + +class _MessageHookLogger(logging.LoggerAdapter): + _on_message: Callable[[str], str] + + def __init__( + self, logger: logging.Logger, on_message: Callable[[str], str] + ): + super().__init__(logger) + self._on_message = on_message + + @override + def process(self, msg, kwargs): + return self._on_message(msg), kwargs + + +def with_prefix(logger: logging.Logger, prefix: str) -> logging.Logger: + def on_message(msg: str) -> str: + if msg and msg[0].isalnum(): + return f"{prefix} {msg}" + return f"{prefix}{msg}" + + return _MessageHookLogger(logger, on_message) # type: ignore diff --git a/aidial_analytics_realtime/utils/timer.py b/aidial_analytics_realtime/utils/timer.py new file mode 100644 index 0000000..7ae764b --- /dev/null +++ b/aidial_analytics_realtime/utils/timer.py @@ -0,0 +1,40 @@ +import time +from typing import Callable + + +class Timer: + start: float + format: str + printer: Callable[[str], None] + + def __init__( + self, + printer: Callable[[str], None] = print, + *, + format: str = "{elapsed}", + ): + self.format = format + self.printer = printer + + def _elapsed(self) -> str: + elapsed = time.perf_counter() - self.start + return f"{elapsed:.3f}s" + + def _on_enter(self): + self.start = time.perf_counter() + return self + + def _on_exit(self): + self.printer(self.format.format(elapsed=self._elapsed())) + + async def __aenter__(self): + return self._on_enter() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self._on_exit() + + def __enter__(self): + return self._on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._on_exit() diff --git a/pyproject.toml b/pyproject.toml index 313bbd3..cb93232 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,3 +101,6 @@ exclude = [ markers = [ "with_external: marks tests may require external resources, like the download models (deselect with '-m \"not with_external\"')", ] +filterwarnings = [ + "ignore::DeprecationWarning:bertopic._bertopic:17" +] \ No newline at end of file diff --git a/tests/influx_writer_mock.py b/tests/influx_writer_mock.py index 3c51e47..ae25cd6 100644 --- a/tests/influx_writer_mock.py +++ b/tests/influx_writer_mock.py @@ -2,5 +2,5 @@ class InfluxWriterMock: def __init__(self): self.points = [] - async def __call__(self, record): + async def __call__(self, logger, record): self.points.append(str(record)) diff --git a/tests/test_app.py b/tests/test_app.py index 83fb2e6..692578a 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -8,7 +8,7 @@ class TestTopicModel: - def get_topic_by_text(self, text): + def get_topic_by_text(self, logger, text): return "TestTopic" From 719e01727841c1e8822b9f43b5c4399784cfd7e9 Mon Sep 17 00:00:00 2001 From: Anton Dubovik Date: Mon, 30 Dec 2024 11:48:05 +0000 Subject: [PATCH 2/3] feat: sending time's logs to debug --- aidial_analytics_realtime/analytics.py | 2 +- aidial_analytics_realtime/app.py | 4 ++-- aidial_analytics_realtime/influx_writer.py | 2 +- aidial_analytics_realtime/topic_model.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/aidial_analytics_realtime/analytics.py b/aidial_analytics_realtime/analytics.py index 18c4225..9d3e596 100644 --- a/aidial_analytics_realtime/analytics.py +++ b/aidial_analytics_realtime/analytics.py @@ -57,7 +57,7 @@ def detect_lang_by_text(logger: logging.Logger, text: str) -> str | None: logger = with_prefix(logger, "[langid]") try: - with Timer(logger.info): + with Timer(logger.debug): lang, prob = identifier.classify(text) if prob > 0.998: diff --git a/aidial_analytics_realtime/app.py b/aidial_analytics_realtime/app.py index 107fd51..4d45b37 100644 --- a/aidial_analytics_realtime/app.py +++ b/aidial_analytics_realtime/app.py @@ -305,11 +305,11 @@ async def on_log_messages( statuses: list[dict] = [] - async with Timer(request_logger.info): + async with Timer(request_logger.debug): for i, item in enumerate(data, start=1): message_logger = with_prefix(request_logger, f"[{i}/{n}]") - async with Timer(message_logger.info): + async with Timer(message_logger.debug): status = await process_message( message_logger, json.loads(item["message"]), diff --git a/aidial_analytics_realtime/influx_writer.py b/aidial_analytics_realtime/influx_writer.py index 9765a92..7f9b086 100644 --- a/aidial_analytics_realtime/influx_writer.py +++ b/aidial_analytics_realtime/influx_writer.py @@ -23,7 +23,7 @@ def create_influx_writer() -> Tuple[InfluxDBClientAsync, InfluxWriterAsync]: influx_write_api = client.write_api() async def influx_writer_impl(logger: Logger, record: Point): - with Timer(with_prefix(logger, "[influx]").info): + with Timer(with_prefix(logger, "[influx]").debug): await influx_write_api.write(bucket=influx_bucket, record=record) return client, influx_writer_impl diff --git a/aidial_analytics_realtime/topic_model.py b/aidial_analytics_realtime/topic_model.py index a9916a8..48fcfbe 100644 --- a/aidial_analytics_realtime/topic_model.py +++ b/aidial_analytics_realtime/topic_model.py @@ -30,7 +30,7 @@ def __init__( self.model.transform(["test"]) def get_topic_by_text(self, logger: logging.Logger, text): - with Timer(with_prefix(logger, "[topic]").info): + with Timer(with_prefix(logger, "[topic]").debug): topics, _ = self.model.transform([text]) topic = self.model.get_topic_info(topics[0]) From 8e0370fb9eae6bea504c16caf0f74dd7149ebb4c Mon Sep 17 00:00:00 2001 From: Anton Dubovik Date: Mon, 30 Dec 2024 11:57:50 +0000 Subject: [PATCH 3/3] feat: added milliseconds to the logs --- aidial_analytics_realtime/utils/log_config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aidial_analytics_realtime/utils/log_config.py b/aidial_analytics_realtime/utils/log_config.py index bf34b56..fe11b52 100644 --- a/aidial_analytics_realtime/utils/log_config.py +++ b/aidial_analytics_realtime/utils/log_config.py @@ -36,7 +36,6 @@ def configure_loggers(): if not root_has_stderr_handler: formatter = DefaultFormatter( fmt="%(levelprefix)s | %(asctime)s | %(process)d | %(name)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", use_colors=True, )