Skip to content

Commit

Permalink
Merge branch 'development' into fix/fixed-content-gathering
Browse files Browse the repository at this point in the history
  • Loading branch information
adubovik committed Jan 14, 2025
2 parents fd017f2 + 65e1856 commit 07a54bd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 42 deletions.
48 changes: 28 additions & 20 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
PORT = 5001
IMAGE_NAME = ai-dial-analytics-realtime
PORT ?= 5001
IMAGE_NAME ?= ai-dial-analytics-realtime
VENV ?= .venv
POETRY ?= $(VENV)/bin/poetry
POETRY_VERSION ?= 1.6.1
ARGS =


.PHONY: all build serve docker_build docker_serve lint format test test_all docs clean help
.PHONY: all init_env build serve docker_build docker_serve lint format test test_all docs clean help


all: build


build:
poetry build
init_env:
python -m venv $(VENV)
$(VENV)/bin/pip install poetry==$(POETRY_VERSION) --quiet


serve:
poetry install --only main
poetry run uvicorn aidial_analytics_realtime.app:app --port=$(PORT) --env-file .env
build: init_env
$(POETRY) build


serve: init_env
$(POETRY) install --only main
$(POETRY) run uvicorn aidial_analytics_realtime.app:app --reload --port=$(PORT) --env-file .env


docker_build:
Expand All @@ -26,24 +34,24 @@ docker_serve: docker_build
docker run --platform linux/amd64 --env-file ./.env --rm -p $(PORT):5000 $(IMAGE_NAME):dev


lint:
poetry install --only nox
poetry run nox -s lint
lint: init_env
$(POETRY) install --only nox
$(POETRY) run nox -s lint


format:
poetry install --only nox
poetry run nox -s format
format: init_env
$(POETRY) install --only nox
$(POETRY) run nox -s format


test:
poetry install --only nox
poetry run -- nox -s tests -- -m "not with_external" $(ARGS)
test: init_env
$(POETRY) install --only nox
$(POETRY) run -- nox -s tests -- -m "not with_external" $(ARGS)


test_all:
poetry install --only nox
poetry run -- nox -s tests -- $(ARGS)
test_all: init_env
$(POETRY) install --only nox
$(POETRY) run -- nox -s tests -- $(ARGS)


docs:
Expand Down
29 changes: 17 additions & 12 deletions aidial_analytics_realtime/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
)
from aidial_analytics_realtime.rates import RatesCalculator
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.utils.concurrency import (
run_in_cpu_tasks_executor,
)

identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)

Expand All @@ -25,7 +28,7 @@ class RequestType(Enum):
EMBEDDING = 2


def detect_lang(
async def detect_lang(
logger: Logger, request: dict, response: dict, request_type: RequestType
) -> str:
match request_type:
Expand All @@ -42,17 +45,17 @@ def detect_lang(
case _:
assert_never(request_type)

return to_string(detect_lang_by_text(text))
return to_string(await detect_lang_by_text(text))


def detect_lang_by_text(text: str) -> str | None:
async def detect_lang_by_text(text: str) -> str | None:
text = text.strip()

if not text:
return None

try:
lang, prob = identifier.classify(text)
lang, prob = await run_in_cpu_tasks_executor(identifier.classify, text)
if prob > 0.998:
return lang
except Exception:
Expand All @@ -69,7 +72,7 @@ def build_execution_path(path: list | None):
return "undefined" if not path else "/".join(map(to_string, path))


def make_point(
async def make_point(
logger: Logger,
deployment: str,
model: str,
Expand Down Expand Up @@ -106,7 +109,7 @@ def make_point(

if chat_id:
topic = to_string(
topic_model.get_topic_by_text(
await topic_model.get_topic_by_text(
"\n\n".join(request_contents + response_contents)
)
)
Expand All @@ -116,7 +119,9 @@ def make_point(
request_content = "\n".join(request_contents)
if chat_id:
topic = to_string(
topic_model.get_topic_by_text("\n\n".join(request_contents))
await topic_model.get_topic_by_text(
"\n\n".join(request_contents)
)
)
case _:
assert_never(request_type)
Expand Down Expand Up @@ -158,7 +163,7 @@ def make_point(
(
"undefined"
if not chat_id
else detect_lang(logger, request, response, request_type)
else await detect_lang(logger, request, response, request_type)
),
)
.tag("upstream", to_string(upstream_url))
Expand Down Expand Up @@ -274,7 +279,7 @@ async def on_message(

usage_per_model = await parse_usage_per_model(response)
if token_usage is not None:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -296,7 +301,7 @@ async def on_message(
)
await influx_writer(point)
elif len(usage_per_model) == 0:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -318,7 +323,7 @@ async def on_message(
)
await influx_writer(point)
else:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -341,7 +346,7 @@ async def on_message(
await influx_writer(point)

for usage in usage_per_model:
point = make_point(
point = await make_point(
logger,
deployment,
usage["model"],
Expand Down
16 changes: 9 additions & 7 deletions aidial_analytics_realtime/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from aidial_analytics_realtime.time import parse_time
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.universal_api_utils import merge
from aidial_analytics_realtime.utils.concurrency import cpu_task_executor
from aidial_analytics_realtime.utils.log_config import configure_loggers, logger

RATE_PATTERN = r"/v1/(.+?)/rate"
Expand All @@ -31,16 +32,17 @@
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
influx_client, influx_writer = create_influx_writer()
async with influx_client:
app.dependency_overrides[InfluxWriterAsync] = lambda: influx_writer
with cpu_task_executor:
async with influx_client:
app.dependency_overrides[InfluxWriterAsync] = lambda: influx_writer

topic_model = TopicModel()
app.dependency_overrides[TopicModel] = lambda: topic_model
topic_model = TopicModel()
app.dependency_overrides[TopicModel] = lambda: topic_model

rates_calculator = RatesCalculator()
app.dependency_overrides[RatesCalculator] = lambda: rates_calculator
rates_calculator = RatesCalculator()
app.dependency_overrides[RatesCalculator] = lambda: rates_calculator

yield
yield


app = FastAPI(lifespan=lifespan)
Expand Down
13 changes: 11 additions & 2 deletions aidial_analytics_realtime/topic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from bertopic import BERTopic

from aidial_analytics_realtime.utils.concurrency import (
run_in_cpu_tasks_executor,
)


class TopicModel:
def __init__(
Expand All @@ -18,9 +22,14 @@ def __init__(
self.model = BERTopic.load(
topic_model_name, topic_embeddings_model_name
)
self.model.transform(["test"]) # Make sure the model is loaded

def get_topic_by_text(self, text: str) -> str | None:
# Make sure the model is loaded
self._get_topic_by_text("test")

async def get_topic_by_text(self, text: str) -> str | None:
return await run_in_cpu_tasks_executor(self._get_topic_by_text, text)

def _get_topic_by_text(self, text: str) -> str | None:
text = text.strip()
if not text:
return None
Expand Down
15 changes: 15 additions & 0 deletions aidial_analytics_realtime/utils/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, ParamSpec, TypeVar

_T = TypeVar("_T")
_P = ParamSpec("_P")

cpu_task_executor = ThreadPoolExecutor()


async def run_in_cpu_tasks_executor(
func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
) -> _T:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(cpu_task_executor, func, *args) # type: ignore
2 changes: 1 addition & 1 deletion tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ async def __call__(self, record):


class TestTopicModel:
def get_topic_by_text(self, text):
async def get_topic_by_text(self, text: str) -> str | None:
return text or None

0 comments on commit 07a54bd

Please sign in to comment.