Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming smoothness #102

Open
wants to merge 72 commits into
base: production
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
d73635e
Capacity frontend fix (#64)
tripathiarpan20 Oct 12, 2024
a2436c3
Merge branch 'production' of https://github.com/namoray/vision-privat…
tripathiarpan20 Oct 14, 2024
9d9ec00
Merge branch 'production' of https://github.com/namoray/vision-privat…
tripathiarpan20 Oct 15, 2024
fa5c465
Merge branch 'production' of https://github.com/namoray/vision-privat…
tripathiarpan20 Oct 16, 2024
4fef461
Merge branch 'production' of https://github.com/namoray/vision-privat…
tripathiarpan20 Oct 17, 2024
015228a
Revert "Feat: Make synth data generation more customiseable (#67)"
tripathiarpan20 Oct 17, 2024
131925b
Merge branch 'production' of https://github.com/namoray/vision-privat…
tripathiarpan20 Oct 22, 2024
85da6ac
added penalty for bigger chunks
tripathiarpan20 Oct 28, 2024
827d164
Update streaming.py
tripathiarpan20 Oct 29, 2024
0745726
Update streaming.py
tripathiarpan20 Oct 29, 2024
c29e811
Update streaming.py
tripathiarpan20 Oct 29, 2024
1522a94
added streaming chunking penalty to reward_data table
tripathiarpan20 Oct 31, 2024
2b983d4
added multiplier to contenders_stats table
tripathiarpan20 Oct 31, 2024
5d6736e
simplified penalty factor
tripathiarpan20 Nov 1, 2024
55ccd03
tweaked scoring stats table
tripathiarpan20 Nov 1, 2024
9662210
debug
tripathiarpan20 Nov 1, 2024
3ef74f9
debug
tripathiarpan20 Nov 1, 2024
eb74418
debug nonstream
tripathiarpan20 Nov 1, 2024
aedecd9
debug
tripathiarpan20 Nov 2, 2024
4fcc073
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 5, 2024
7cbf8ab
revised penalty criteria
tripathiarpan20 Nov 6, 2024
66572cb
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 6, 2024
c5e8614
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 13, 2024
c598855
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 13, 2024
8e92050
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Nov 13, 2024
a72f207
Update streaming.py
tripathiarpan20 Nov 13, 2024
eff4edc
Update streaming.py
tripathiarpan20 Nov 13, 2024
fc57f8c
Merge branch 'feature/streaming-smoothness-dev' into feature/streamin…
tripathiarpan20 Nov 13, 2024
928f81a
Update calculations.py
tripathiarpan20 Nov 13, 2024
792d98b
tweaked migration
tripathiarpan20 Nov 13, 2024
f3c9f6a
Update 20241113230700_metric_bonus_scaling.sql
tripathiarpan20 Nov 13, 2024
1428e34
Update 20241113230700_metric_bonus_scaling.sql
tripathiarpan20 Nov 13, 2024
6e2e496
tweaks
tripathiarpan20 Nov 14, 2024
51047e2
Update weights.py
tripathiarpan20 Nov 14, 2024
5d08240
Update calculations.py
tripathiarpan20 Nov 14, 2024
3452271
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 14, 2024
7e30ab0
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 14, 2024
2ba59ba
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 15, 2024
99b4e53
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Nov 15, 2024
da87d36
added ttfb penalty
tripathiarpan20 Nov 15, 2024
ca3c561
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 18, 2024
ee26167
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Nov 18, 2024
5c28570
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 19, 2024
10737f5
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Nov 19, 2024
3028e03
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 20, 2024
b4e3c7b
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 20, 2024
5486f20
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 21, 2024
efbe10a
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 21, 2024
4a9b87a
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 21, 2024
c430cd3
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 25, 2024
0bc088a
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 27, 2024
26ae982
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 27, 2024
0b5d595
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 28, 2024
26c7657
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Nov 28, 2024
0e178e0
make `response_time_penalty_multiplier` actually impose a penalty lol
tripathiarpan20 Nov 28, 2024
6168ab9
Merge branch 'namoray:production' into production
tripathiarpan20 Nov 30, 2024
7aa5318
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Dec 2, 2024
3d92303
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 4, 2024
fb3905e
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 5, 2024
54a6cb0
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 6, 2024
1974464
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Dec 6, 2024
39ef8c1
Merge branch 'production' of https://github.com/tripathiarpan20/ninet…
tripathiarpan20 Dec 8, 2024
b5ff5cf
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 8, 2024
fe1d09e
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 9, 2024
4721a64
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 9, 2024
1db5819
Merge branch 'namoray:production' into production
tripathiarpan20 Dec 9, 2024
99f1666
Merge branch 'production' of https://github.com/tripathiarpan20/ninet…
tripathiarpan20 Dec 11, 2024
517297e
Merge branch 'production' into feature/streaming-smoothness
tripathiarpan20 Dec 12, 2024
eb3da24
merge revisions
tripathiarpan20 Dec 12, 2024
de87d3e
Update weights.py
tripathiarpan20 Dec 12, 2024
a354e4d
Update constants.py
tripathiarpan20 Dec 12, 2024
9c14289
review changes
tripathiarpan20 Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SCORING_PERIOD_TIME = 60 * 30 # 30 mins
VERSION_KEY = 62_100
VERSION_KEY = 62_200

PROD_NETUID = 19

Expand Down
1 change: 1 addition & 0 deletions core/models/utility_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class QueryResult(BaseModel):
node_id: Optional[int]
node_hotkey: Optional[str]
response_time: Optional[float]
response_time_penalty_multiplier: Optional[float]
stream_time: Optional[float]
task: str
status_code: Optional[int]
Expand Down
40 changes: 29 additions & 11 deletions validator/control_node/src/cycle/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class QualityScores:
combined_quality_scores: dict[str, float]
average_weighted_quality_scores: dict[str, float]
metric_bonuses: dict[str, float]
metrics: dict[str, float]
stream_metrics: dict[str, float]
metrics: dict[str, list[float]]
stream_metrics: dict[str, list[float]]
response_time_penalty_multipliers: dict[str, list[float]]


logger = get_logger(__name__)
Expand Down Expand Up @@ -99,11 +100,12 @@ async def _get_period_scores(

async def _calculate_metrics_and_quality_score(
psql_db: PSQLDB, task: str, netuid: int
) -> tuple[dict[str, float], dict[str, float], dict[str, float]]:
) -> tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]:
reward_datas: list[RewardData] = await _get_reward_datas(psql_db, task, netuid)

metrics = {}
quality_scores = {}
response_time_penalty_multipliers = {}
stream_metrics = {}
for reward_data in reward_datas:
if reward_data.metric is None or reward_data.quality_score is None:
Expand All @@ -117,11 +119,14 @@ async def _calculate_metrics_and_quality_score(
]
quality_scores[reward_data.node_hotkey] = quality_scores.get(
reward_data.node_hotkey, []
) + [reward_data.quality_score]
) + [reward_data.quality_score / reward_data.response_time_penalty_multiplier]
response_time_penalty_multipliers[reward_data.node_hotkey] = response_time_penalty_multipliers.get(
reward_data.node_hotkey, []
) + [reward_data.response_time_penalty_multiplier]
stream_metrics[reward_data.node_hotkey] = stream_metrics.get(
reward_data.node_hotkey, []
) + [reward_data.stream_metric]
return metrics, quality_scores, stream_metrics
return metrics, quality_scores, stream_metrics, response_time_penalty_multipliers


async def _calculate_metric_bonuses(metrics: dict[str, float]) -> dict[str, float]:
Expand All @@ -132,7 +137,6 @@ async def _calculate_metric_bonuses(metrics: dict[str, float]) -> dict[str, floa
metric_bonuses = _get_metric_bonuses(metric_scores)
return metric_bonuses


async def _calculate_normalised_period_score(
psql_db: PSQLDB, task: str, node_hotkey: str
) -> tuple[float, float]:
Expand Down Expand Up @@ -180,7 +184,7 @@ def _calculate_hotkey_effective_volume_for_task(
async def _process_quality_scores(
psql_db: PSQLDB, task: str, netuid: int
) -> QualityScores:
metrics, quality_scores, stream_metrics = await _calculate_metrics_and_quality_score(
metrics, quality_scores, stream_metrics, response_time_penalty_multipliers = await _calculate_metrics_and_quality_score(
psql_db, task, netuid
)

Expand All @@ -204,7 +208,8 @@ async def _process_quality_scores(
average_weighted_quality_scores,
metric_bonuses,
metrics,
stream_metrics
stream_metrics,
response_time_penalty_multipliers
)


Expand Down Expand Up @@ -301,11 +306,12 @@ async def calculate_scores_for_settings_weights(
await _process_quality_scores(psql_db, task, netuid)
)

combined_quality_scores, average_quality_scores, metric_bonuses, metrics, stream_metrics = quality_scores.combined_quality_scores,\
combined_quality_scores, average_quality_scores, metric_bonuses, metrics, stream_metrics, response_time_penalty_multipliers = quality_scores.combined_quality_scores,\
quality_scores.average_weighted_quality_scores, \
quality_scores.metric_bonuses, \
quality_scores.metrics, \
quality_scores.stream_metrics
quality_scores.stream_metrics, \
quality_scores.response_time_penalty_multipliers

effective_volumes, normalised_period_scores, period_score_multipliers = (
await _calculate_effective_volumes_for_task(
Expand Down Expand Up @@ -333,6 +339,16 @@ async def calculate_scores_for_settings_weights(
average_metric = (
sum(hotkey_metrics) / len(hotkey_metrics) if hotkey_metrics else 0
)
response_time_penalty_multipliers_hotkey = response_time_penalty_multipliers.get(
hotkey, []
)
average_response_time_penalty_multiplier = (
sum(response_time_penalty_multipliers_hotkey)
/ len(response_time_penalty_multipliers_hotkey)
if response_time_penalty_multipliers_hotkey
else 1
)

average_stream_metric = (
sum(hotkey_stream_metrics) / len(hotkey_stream_metrics) if hotkey_stream_metrics else 0
)
Expand All @@ -345,6 +361,7 @@ async def calculate_scores_for_settings_weights(
task=task,
average_quality_score=average_quality_scores.get(hotkey, 0),
metric_bonus=metric_bonuses.get(hotkey, 0),
average_response_time_penalty_multiplier=average_response_time_penalty_multiplier,
metric=average_metric,
stream_metric=average_stream_metric,
combined_quality_score=combined_quality_scores.get(hotkey, 0),
Expand Down Expand Up @@ -381,7 +398,8 @@ async def calculate_scores_for_settings_weights(
await _post_scoring_stats_to_local_db(config_main, contender_weights_info_objects, miner_weights_objects)
await _post_scoring_stats_to_nineteen(config_main, contender_weights_info_objects, miner_weights_objects)
except Exception as e:
logger.error(f"Failed to post scoring stats to nineteen.ai: {e}")
logger.error(f"Failed to post scoring stats to local db or nineteen: {e}")

scoring_stats_to_delete_locally = datetime.now() - timedelta(days=7)
async with await config_main.psql_db.connection() as connection:
await delete_weights_info_older_than(connection, scoring_stats_to_delete_locally)
Expand Down
1 change: 1 addition & 0 deletions validator/control_node/src/score_results/score_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async def _process_and_store_score(
node_hotkey=node_hotkey,
synthetic_query=synthetic_query,
response_time=result["response_time"],
response_time_penalty_multiplier=result["response_time_penalty_multiplier"],
volume=volume,
metric=metric,
stream_metric=stream_metric,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- migrate:up
ALTER TABLE reward_data ADD COLUMN response_time_penalty_multiplier FLOAT DEFAULT 1;
ALTER TABLE contenders_weights_stats ADD COLUMN average_response_time_penalty_multiplier FLOAT DEFAULT 1;
ALTER TABLE reward_data ALTER COLUMN response_time_penalty_multiplier DROP DEFAULT;
ALTER TABLE contenders_weights_stats ALTER COLUMN average_response_time_penalty_multiplier DROP DEFAULT;

-- migrate:down
ALTER TABLE reward_data DROP COLUMN response_time_penalty_multiplier;
ALTER TABLE contenders_weights_stats DROP COLUMN average_response_time_penalty_multiplier;
5 changes: 3 additions & 2 deletions validator/db/src/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ async def fetch_recent_most_rewards(
metric=row[7],
stream_metric=row[8],
response_time=row[9],
volume=row[10],
created_at=row[11],
response_time_penalty_multiplier=row[10],
volume=row[11],
created_at=row[12],
)
for row in priority_results + fill_results
]
Expand Down
8 changes: 6 additions & 2 deletions validator/db/src/sql/rewards_and_scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ async def sql_insert_reward_data(connection: Connection, data: RewardData) -> No
{dcst.COLUMN_ID}, {dcst.COLUMN_TASK}, {dcst.COLUMN_NODE_ID},
{dcst.COLUMN_QUALITY_SCORE}, {dcst.COLUMN_VALIDATOR_HOTKEY},
{dcst.COLUMN_MINER_HOTKEY}, {dcst.COLUMN_SYNTHETIC_QUERY},
{dcst.COLUMN_METRIC}, {dcst.COLUMN_STREAM_METRIC}, {dcst.COLUMN_RESPONSE_TIME}, {dcst.COLUMN_VOLUME}
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
{dcst.COLUMN_METRIC}, {dcst.COLUMN_STREAM_METRIC}, {dcst.COLUMN_RESPONSE_TIME},
{dcst.COLUMN_RESPONSE_TIME_PENALTY_MULTIPLIER}, {dcst.COLUMN_VOLUME}
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING {dcst.COLUMN_ID}
""",
data.id,
Expand All @@ -28,6 +29,7 @@ async def sql_insert_reward_data(connection: Connection, data: RewardData) -> No
data.metric,
data.stream_metric,
data.response_time,
data.response_time_penalty_multiplier,
data.volume,
)

Expand Down Expand Up @@ -217,6 +219,7 @@ async def select_recent_reward_data_for_a_task(
{dcst.COLUMN_METRIC},
{dcst.COLUMN_STREAM_METRIC},
{dcst.COLUMN_RESPONSE_TIME},
{dcst.COLUMN_RESPONSE_TIME_PENALTY_MULTIPLIER},
{dcst.COLUMN_VOLUME},
{dcst.COLUMN_CREATED_AT}
FROM {dcst.TABLE_REWARD_DATA}
Expand Down Expand Up @@ -253,6 +256,7 @@ async def select_recent_reward_data(
{dcst.COLUMN_METRIC},
{dcst.COLUMN_STREAM_METRIC},
{dcst.COLUMN_RESPONSE_TIME},
{dcst.COLUMN_RESPONSE_TIME_PENALTY_MULTIPLIER},
{dcst.COLUMN_VOLUME},
{dcst.COLUMN_CREATED_AT}
FROM {dcst.TABLE_REWARD_DATA}
Expand Down
4 changes: 3 additions & 1 deletion validator/db/src/sql/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ async def insert_scoring_stats(connection: Connection, scoring_stats: list[Conte
{dcst.COLUMN_CONTENDER_CAPACITY},
{dcst.COLUMN_NORMALISED_NET_SCORE},
{dcst.COLUMN_METRIC},
{dcst.COLUMN_AVG_RESPONSE_TIME_MULTIPLIER},
{dcst.COLUMN_STREAM_METRIC}
)
VALUES ($1, $2, $3, NOW(), $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
VALUES ($1, $2, $3, NOW(), $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
""",
[
(
Expand All @@ -46,6 +47,7 @@ async def insert_scoring_stats(connection: Connection, scoring_stats: list[Conte
stat.contender_capacity,
stat.normalised_net_score,
stat.metric,
stat.average_response_time_penalty_multiplier,
stat.stream_metric
)
for stat in scoring_stats
Expand Down
2 changes: 2 additions & 0 deletions validator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RewardData(BaseModel):
metric: float | None = None
stream_metric: float | None = None
response_time: float | None = None
response_time_penalty_multiplier: float = 1
volume: float | None = None
created_at: datetime = Field(default_factory=datetime.now)

Expand All @@ -92,6 +93,7 @@ def dict(self): # type: ignore
"metric": self.metric,
"stream_metric": self.stream_metric,
"response_time": self.response_time,
"response_time_penalty_multiplier": self.response_time_penalty_multiplier,
"volume": self.volume,
"created_at": self.created_at.isoformat(), # Convert datetime to ISO string
}
3 changes: 3 additions & 0 deletions validator/query_node/src/query/nonstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _get_500_query_result(node_id: int, contender: Contender) -> utility_models.
node_id=node_id,
node_hotkey=contender.node_hotkey,
response_time=None,
response_time_penalty_multiplier=1,
stream_time=None,
task=contender.task,
status_code=500,
Expand Down Expand Up @@ -147,6 +148,7 @@ async def query_nonstream(
node_id=node_id,
node_hotkey=contender.node_hotkey,
response_time=response_time,
response_time_penalty_multiplier = 1,
stream_time=response_time,
task=contender.task,
status_code=response.status_code,
Expand All @@ -167,6 +169,7 @@ async def query_nonstream(
node_id=node_id,
node_hotkey=contender.node_hotkey,
response_time=None,
response_time_penalty_multiplier = 1,
stream_time=None,
task=contender.task,
status_code=response.status_code,
Expand Down
57 changes: 54 additions & 3 deletions validator/query_node/src/query/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
from validator.utils.redis import redis_constants as rcst

from fiber.logging_utils import get_logger

import statistics
from validator.utils.query.query_utils import load_sse_jsons

logger = get_logger(__name__)

TTFB_PENALTY_FACTOR = 1.1
CHUNKING_PERCENTAGE_PENALTY_FACTOR = 1.35

#example: If you have 200 tokens/sec uniformly distributed for 8B model, it means that TTFB should be lower than 1 seconds to not get assigned TTFB_PENALTY_FACTOR
TTFB_THRESHOLD_MULTIPLIER = 200.0

GAUGE_ORGANIC_TOKENS_PER_SEC = metrics.get_meter(__name__).create_gauge(
"validator.query_node.query.streaming.organic.tokens_per_sec",
description="Average tokens per second metric for LLM streaming for any organic query"
Expand Down Expand Up @@ -94,6 +100,7 @@ def construct_500_query_result(node: Node, task: str) -> utility_models.QueryRes
success=False,
node_hotkey=node.hotkey,
formatted_response=None,
response_time_penalty_multiplier=1,
status_code=500,
response_time=None,
stream_time=None
Expand Down Expand Up @@ -131,10 +138,16 @@ async def consume_generator(

return False

text_jsons, status_code, first_message = [], 200, True # TODO: remove unused variable

text_jsons, status_code, first_message = [], 200, True
ttfb = None
stream_time_init = None
try:

total_chunks = 0
bundled_chunks = 0
time_between_chunks = []

latest_counter = None
async for text in async_chain(first_chunk, generator):
if isinstance(text, bytes):
text = text.decode()
Expand All @@ -145,7 +158,11 @@ async def consume_generator(
if isinstance(loaded_jsons, dict):
status_code = loaded_jsons.get(gcst.STATUS_CODE) # noqa
break
total_chunks += 1

if len(loaded_jsons) > 1:
bundled_chunks += 1

except (IndexError, json.JSONDecodeError) as e:
logger.warning(f"Error {e} when trying to load text: {text}")
break
Expand Down Expand Up @@ -183,6 +200,16 @@ async def consume_generator(

tokens += 1

if latest_counter:
time_between_chunks.append(time.time() - latest_counter)
latest_counter = time.time()
else:
# time_between_chunks.append(time.time() - time_to_first_chunk)
ttfb = time.time() - start_time
latest_counter = time.time()

response_time_penalty_multiplier = 1.0

if len(text_jsons) > 0:
last_payload = _get_formatted_payload("", False, add_finish_reason=True, task = task)
await _handle_event(
Expand All @@ -194,6 +221,29 @@ async def consume_generator(
logger.info(f" 👀 Queried node: {node.node_id} for task: {task}. Success: {not first_message}.")

response_time = time.time() - start_time

# Penalize for inconsistent interval between chunks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can make this a function please with configurable params

if len(time_between_chunks) > 0:
mean_interval = sum(time_between_chunks) / len(time_between_chunks)
std_dev_interval = statistics.stdev(time_between_chunks, mean_interval)

sporadic_count = sum(1 for interval in time_between_chunks if abs(interval - mean_interval) > std_dev_interval)
extra_sporadic_count = sum(1 for interval in time_between_chunks if abs(interval - mean_interval) > 3 *std_dev_interval)

# Assign penalty for inconsistent streaming, i.e, if either or both:
# (i) if streaming interval of at least 10% chunk is outside standard deviation of the mean
# (ii) if bundled chunk during streaming are >10% of total chunks
# (iii) if sporadic chunk outliers mostly lie beyond both 1xstd and 3xstd (that means chunky streaming seams pushed the overall mean higher)
if bundled_chunks > 0.1 * total_chunks or sporadic_count > 0.1 * len(time_between_chunks) or extra_sporadic_count >= 0.5 * sporadic_count:
response_time_penalty_multiplier = response_time_penalty_multiplier * CHUNKING_PERCENTAGE_PENALTY_FACTOR

try:
# If TTFB > x times the mean interval between consecutive streamed tokens, assign a penalty
if ttfb > TTFB_THRESHOLD_MULTIPLIER * mean_interval:
response_time_penalty_multiplier = TTFB_PENALTY_FACTOR
except Exception as e:
logger.error(f"Error in calculating TTFB: {e} ;")

try:
stream_time = time.time() - stream_time_init
except Exception as e:
Expand All @@ -204,6 +254,7 @@ async def consume_generator(
formatted_response=text_jsons if len(text_jsons) > 0 else None,
node_id=node.node_id,
response_time=response_time,
response_time_penalty_multiplier = response_time_penalty_multiplier,
stream_time=stream_time,
task=task,
success=not first_message,
Expand Down
4 changes: 3 additions & 1 deletion validator/utils/database/database_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
COLUMN_METRIC = "metric"
COLUMN_STREAM_METRIC = "stream_metric"
COLUMN_RESPONSE_TIME = "response_time"
COLUMN_RESPONSE_TIME_PENALTY_MULTIPLIER = "response_time_penalty_multiplier"
COLUMN_VOLUME = "volume"

# UID record columns
Expand All @@ -156,4 +157,5 @@
COLUMN_PERIOD_SCORE_MULTIPLIER = "period_score_multiplier"
COLUMN_NORMALISED_PERIOD_SCORE = "normalised_period_score"
COLUMN_CONTENDER_CAPACITY = "contender_capacity"
COLUMN_NORMALISED_NET_SCORE = "normalised_net_score"
COLUMN_NORMALISED_NET_SCORE = "normalised_net_score"
COLUMN_AVG_RESPONSE_TIME_MULTIPLIER = "average_response_time_penalty_multiplier"
1 change: 1 addition & 0 deletions validator/utils/post/nineteen.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class ContenderWeightsInfoPostObject(BaseModel):

average_quality_score: float
metric_bonus: float
average_response_time_penalty_multiplier: float
metric: float
stream_metric: float
combined_quality_score: float
Expand Down