Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support telemetry #83

Open
wants to merge 18 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
PORT = 5001
IMAGE_NAME = ai-dial-analytics-realtime
ARGS =
PORT ?= 5001
IMAGE_NAME ?= ai-dial-analytics-realtime
DEV_PYTHON ?= 3.10
ARGS ?=


.PHONY: all build serve docker_build docker_serve lint format test test_all docs clean help
.PHONY: all install build serve docker_build docker_serve lint format test test_all docs clean help


all: build


install:
poetry env use python$(DEV_PYTHON)
poetry install


build:
poetry build


serve:
poetry install --only main
poetry run uvicorn aidial_analytics_realtime.app:app --port=$(PORT) --env-file .env
poetry run uvicorn aidial_analytics_realtime.app:app --port=$(PORT) --reload --env-file .env


docker_build:
Expand Down
34 changes: 19 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Realtime analytics server for [AI DIAL](https://epam-rail.com). The service cons

Refer to [Documentation](https://github.com/epam/ai-dial/blob/main/docs/tutorials/realtime-analytics.md) to learn how to configure AI DAL Core and other necessary components.

# Usage
## Usage

Check the [AI DIAL Core](https://github.com/epam/ai-dial-core) documentation to configure the way to send the logs to the instance of the realtime analytics server.

Expand All @@ -25,17 +25,18 @@ The realtime analytics server analyzes the logs stream in the realtime and write
|user_hash| The unique hash for the user. |
|price| The calculated price of the request. |
|number_request_messages| The total number of messages in history for this request. |
|chat_id| The unique ID of this convestation. |
|chat_id| The unique ID of this conversation. |
|prompt_tokens| The number of tokens in the prompt including conversation history and the current message |
|completion_tokens| The number of completion tokens generated for this request |


# Configuration
## Configuration

Copy `.env.example` to `.env` and customize it for your environment.

### Connection to the InfluxDB

You need to specify the connection options to the InfluxDB instance using the environment variables:

|Variable|Description|
|---|---|
|INFLUX_URL|Url to the InfluxDB to write the analytics data |
Expand All @@ -46,13 +47,15 @@ You need to specify the connection options to the InfluxDB instance using the en
You can follow the [InfluxDB documentation](https://docs.influxdata.com/influxdb/v2/get-started/) to setup InfluxDB locally and acquire the required configuration parameters.

### Other configuration

Also, following environment valuables can be used to configure the service behavior:

|Variable|Default|Description|
|---|---|---|
|MODEL_RATES| {} | Specifies per-token price rates for models in JSON format|
|TOPIC_MODEL| ./topic_model | Specifies the name or path for the topic model. If the model is specified by name, it will be downloaded from, the [Huggingface]( https://huggingface.co/).|
|TOPIC_EMBEDDINGS_MODEL| None | Specifies the name or path for the embeddings model used with the topic model. If the model is specified by name, it will be downloaded from, the [Huggingface]( https://huggingface.co/). If None, the name will be used from the topic model config.|
|LOG_LEVEL|INFO|Log level. Use DEBUG for dev purposes and INFO in prod|

Example of the MODEL_RATES configuration:
```json
Expand Down Expand Up @@ -84,29 +87,28 @@ Example of the MODEL_RATES configuration:
}
```

## Developer environment

______
# Developer environment

This project uses [Python>=3.11](https://www.python.org/downloads/) and [Poetry>=1.6.1](https://python-poetry.org/) as a dependency manager.
This project uses [Python>=3.11](https://www.python.org/downloads/) and [Poetry>=1.6.1](https://python-poetry.org/) as a dependency manager.
Check out Poetry's [documentation on how to install it](https://python-poetry.org/docs/#installation) on your system before proceeding.

To install requirements:

```
```sh
poetry install
```

This will install all requirements for running the package, linting, formatting and tests.

# Build
## Build

To build the wheel packages run:

```sh
make build
```

# Run
## Run

To run the development server locally run:

Expand All @@ -116,21 +118,23 @@ make serve

The server will be running as http://localhost:5001

# Docker
## Docker

To build the docker image run:

```sh
make docker_build
```

To run the server locally from the docker image run:

```sh
make docker_serve
```

The server will be running as http://localhost:5001

# Lint
## Lint

Run the linting before committing:

Expand All @@ -144,15 +148,15 @@ To auto-fix formatting issues run:
make format
```

# Test
## Test

Run unit tests locally:

```sh
make test
```

# Clean
## Clean

To remove the virtual environment and build artifacts:

Expand Down
66 changes: 41 additions & 25 deletions aidial_analytics_realtime/analytics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from datetime import datetime
from decimal import Decimal
from enum import Enum
from logging import Logger
from typing import Awaitable, Callable
from uuid import uuid4

from influxdb_client import Point
Expand All @@ -14,8 +14,12 @@
get_chat_completion_response_contents,
get_embeddings_request_contents,
)
from aidial_analytics_realtime.influx_writer import InfluxWriterAsync
from aidial_analytics_realtime.rates import RatesCalculator
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.utils.concurrency import make_async
from aidial_analytics_realtime.utils.log_config import with_prefix
from aidial_analytics_realtime.utils.timer import Timer

identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)

Expand All @@ -25,7 +29,7 @@ class RequestType(Enum):
EMBEDDING = 2


def detect_lang(
async def detect_lang(
logger: Logger, request: dict, response: dict, request_type: RequestType
) -> str:
match request_type:
Expand All @@ -42,20 +46,29 @@ def detect_lang(
case _:
assert_never(request_type)

return to_string(detect_lang_by_text(text))
return to_string(await detect_lang_by_text(logger, text))


def detect_lang_by_text(text: str) -> str | None:
async def detect_lang_by_text(logger: logging.Logger, text: str) -> str | None:
text = text.strip()

if not text:
return None

logger = with_prefix(logger, "[langid]")

try:
lang, prob = identifier.classify(text)

def _task(text: str):
with Timer(logger.info):
return identifier.classify(text)

lang, prob = await make_async(_task, text)

if prob > 0.998:
return lang
except Exception:
except Exception as e:
logger.error(f"error: {str(e)}")
pass

return None
Expand All @@ -69,7 +82,7 @@ def build_execution_path(path: list | None):
return "undefined" if not path else "/".join(map(to_string, path))


def make_point(
async def make_point(
logger: Logger,
deployment: str,
model: str,
Expand Down Expand Up @@ -105,16 +118,21 @@ def make_point(
response_content = "\n".join(response_contents)

if chat_id:
topic = topic_model.get_topic_by_text(
"\n\n".join(request_contents + response_contents)
topic = to_string(
await topic_model.get_topic_by_text(
logger,
"\n\n".join(request_contents + response_contents),
)
)
case RequestType.EMBEDDING:
request_contents = get_embeddings_request_contents(logger, request)

request_content = "\n".join(request_contents)
if chat_id:
topic = topic_model.get_topic_by_text(
"\n\n".join(request_contents)
topic = to_string(
await topic_model.get_topic_by_text(
logger, "\n\n".join(request_contents)
)
)
case _:
assert_never(request_type)
Expand Down Expand Up @@ -156,7 +174,7 @@ def make_point(
(
"undefined"
if not chat_id
else detect_lang(logger, request, response, request_type)
else await detect_lang(logger, request, response, request_type)
),
)
.tag("upstream", to_string(upstream_url))
Expand Down Expand Up @@ -232,7 +250,7 @@ def make_rate_point(
return point


async def parse_usage_per_model(response: dict):
def parse_usage_per_model(response: dict) -> list[dict]:
statistics = response.get("statistics")
if statistics is None:
return []
Expand All @@ -249,7 +267,7 @@ async def parse_usage_per_model(response: dict):

async def on_message(
logger: Logger,
influx_writer: Callable[[Point], Awaitable[None]],
influx_writer: InfluxWriterAsync,
deployment: str,
model: str,
project_id: str,
Expand All @@ -268,11 +286,9 @@ async def on_message(
trace: dict | None,
execution_path: list | None,
):
logger.info(f"Chat completion response length {len(response)}")

usage_per_model = await parse_usage_per_model(response)
usage_per_model = parse_usage_per_model(response)
if token_usage is not None:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -292,9 +308,9 @@ async def on_message(
trace,
execution_path,
)
await influx_writer(point)
await influx_writer(logger, point)
elif len(usage_per_model) == 0:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -314,9 +330,9 @@ async def on_message(
trace,
execution_path,
)
await influx_writer(point)
await influx_writer(logger, point)
else:
point = make_point(
point = await make_point(
logger,
deployment,
model,
Expand All @@ -336,10 +352,10 @@ async def on_message(
trace,
execution_path,
)
await influx_writer(point)
await influx_writer(logger, point)

for usage in usage_per_model:
point = make_point(
point = await make_point(
logger,
deployment,
usage["model"],
Expand All @@ -359,4 +375,4 @@ async def on_message(
trace,
execution_path,
)
await influx_writer(point)
await influx_writer(logger, point)
Loading
Loading