Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove synthetic schedule adjustment relative to organics #228

Open
wants to merge 8 commits into
base: production
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions validator/control_node/src/cycle/refresh_contenders.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ async def _get_contenders_from_nodes(config: Config, nodes: list[Node]) -> List[
total_requests_made=0,
requests_429=0,
requests_500=0,

period_score=None,
)
)
Expand Down
19 changes: 5 additions & 14 deletions validator/control_node/src/cycle/schedule_synthetic_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,12 @@ async def schedule_synthetics_until_done(config: Config):
requests_to_skip = schedule.remaining_requests - latest_remaining_requests
GAUGE_REQUESTS_TO_SKIP.set(requests_to_skip, {"task": schedule.task})

if requests_to_skip > 0:
logger.info(f"Skipping {requests_to_skip} requests for task {schedule.task}")
await _schedule_synthetic_query(config.redis_db, schedule.task, max_len=100)

schedule.next_schedule_time += schedule.interval * requests_to_skip
schedule.remaining_requests = latest_remaining_requests
heapq.heappush(task_schedules, schedule)
continue
else:
await _schedule_synthetic_query(config.redis_db, schedule.task, max_len=100)


remaining_requests = latest_remaining_requests - 1
await _update_redis_remaining_requests(config.redis_db, schedule.task, remaining_requests)
schedule.next_schedule_time = time.time() + schedule.interval
schedule.remaining_requests = remaining_requests
remaining_requests = latest_remaining_requests - 1
await _update_redis_remaining_requests(config.redis_db, schedule.task, remaining_requests)
schedule.next_schedule_time = time.time() + schedule.interval
schedule.remaining_requests = remaining_requests

GAUGE_TOTAL_REQUESTS.set(schedule.total_requests, {"task": schedule.task})
GAUGE_SCHEDULE_REMAINING_REQUESTS.set(schedule.remaining_requests, {"task": schedule.task})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- migrate:up

ALTER TABLE contenders ADD COLUMN tasks_queued_for_checking FLOAT DEFAULT 0;
ALTER TABLE contenders_history ADD COLUMN tasks_queued_for_checking FLOAT DEFAULT 0;
ALTER TABLE tasks ADD COLUMN is_synthetic BOOLEAN NOT NULL DEFAULT TRUE;

-- migrate:down
ALTER TABLE contenders DROP COLUMN tasks_queued_for_checking;
ALTER TABLE contenders_history DROP COLUMN tasks_queued_for_checking;
ALTER TABLE contetasksnders DROP COLUMN is_synthetic;
28 changes: 19 additions & 9 deletions validator/db/src/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from core.models import utility_models
from core import task_config as tcfg

from validator.models import Contender
from validator.db.src.database import PSQLDB
from validator.db.src.sql.rewards_and_scores import (
insert_task,
Expand All @@ -17,16 +17,19 @@
select_recent_reward_data_for_a_task,
select_recent_reward_data,
delete_specific_task,
get_synthetic_count,
get_contender_task_count
)
from asyncpg import Connection

from validator.models import RewardData


MAX_TASKS_IN_DB_STORE = 1000
db_lock = asyncio.Lock()

SYNTHETIC_FRACTION = 0.6

db_lock = asyncio.Lock()

async def insert_task_results(
connection: Connection, task: str, result: utility_models.QueryResult, synthetic_query: bool, payload: dict
) -> None:
Expand All @@ -44,23 +47,31 @@ async def insert_task_results(
if hotkey is None:
return None
data = json.dumps(data_to_store)
await insert_task(connection, task, data, hotkey)
await insert_task(connection, task, data, hotkey, synthetic_query)


async def potentially_store_result_in_db(
psql_db: PSQLDB, result: utility_models.QueryResult, task: str, synthetic_query: bool, payload: dict
psql_db: PSQLDB, contender: Contender, result: utility_models.QueryResult, task: str, synthetic_query: bool, payload: dict
) -> None:
task_config = tcfg.get_enabled_task_config(task)
if task_config is None:
return
target_percentage = task_config.weight
target_number_of_tasks_to_store = int(MAX_TASKS_IN_DB_STORE * target_percentage)
# Reserve x% of the task quota for synthetics & (100-x)% for organics; TODO: how do we deal with cases when there's not enough organic traffic for the task?
# Within the x% & (100-y)% reserve, if the contender already has an entry in the tasks table, don't store the result (i.e, the other task category took care of it)
async with await psql_db.connection() as connection:
number_of_these_tasks_already_stored = await select_count_rows_of_task_stored_for_scoring(connection, task)

synthetic_count = await get_synthetic_count(connection, task)
contender_task_count = await get_contender_task_count(connection, contender.id)

if number_of_these_tasks_already_stored <= target_number_of_tasks_to_store:
await insert_task_results(
connection=connection, task=task, result=result, payload=payload, synthetic_query=synthetic_query
)
if ( (synthetic_query and synthetic_count <= SYNTHETIC_FRACTION * target_number_of_tasks_to_store) or \
(not synthetic_query and (1 - SYNTHETIC_FRACTION) * target_number_of_tasks_to_store) ) and contender_task_count == 0:
await insert_task_results(
connection=connection, task=task, result=result, payload=payload, synthetic_query=synthetic_query
)


async def select_and_delete_task_result(psql_db: PSQLDB, task: str) -> tuple[list[dict[str, Any]], str] | None:
Expand All @@ -77,7 +88,6 @@ async def select_and_delete_task_result(psql_db: PSQLDB, task: str) -> tuple[lis




# TODO: Implement this
async def clean_tables_of_hotkeys(connection: Connection, node_hotkeys: List[str]) -> None:
...
Expand Down
2 changes: 2 additions & 0 deletions validator/db/src/sql/contenders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def migrate_contenders_to_contender_history(connection: Connection) -> Non
{dcst.REQUESTS_429},
{dcst.REQUESTS_500},
{dcst.PERIOD_SCORE},
{dcst.COLUMN_TASKS_QUEUED_FOR_CHECKING},
{dcst.CREATED_AT},
{dcst.UPDATED_AT}
)
Expand All @@ -92,6 +93,7 @@ async def migrate_contenders_to_contender_history(connection: Connection) -> Non
{dcst.REQUESTS_429},
{dcst.REQUESTS_500},
{dcst.PERIOD_SCORE},
{dcst.COLUMN_TASKS_QUEUED_FOR_CHECKING},
{dcst.CREATED_AT},
{dcst.UPDATED_AT}
FROM {dcst.CONTENDERS_TABLE}
Expand Down
27 changes: 23 additions & 4 deletions validator/db/src/sql/rewards_and_scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,35 @@ async def insert_uid_record(connection: Connection, data: List[tuple]) -> None:
)


async def insert_task(connection: Connection, task_name: str, checking_data: str, hotkey: str) -> None:
async def insert_task(connection: Connection, task_name: str, checking_data: str, hotkey: str, is_synthetic: bool) -> None:
await connection.executemany(
f"""
INSERT INTO {dcst.TABLE_TASKS} ({dcst.COLUMN_TASK_NAME}, {dcst.COLUMN_CHECKING_DATA}, {dcst.COLUMN_MINER_HOTKEY})
VALUES ($1, $2, $3)
INSERT INTO {dcst.TABLE_TASKS} ({dcst.COLUMN_TASK_NAME}, {dcst.COLUMN_CHECKING_DATA}, {dcst.COLUMN_MINER_HOTKEY}, {dcst.COLUMN_IS_SYNTHETIC})
VALUES ($1, $2, $3, $4)
""",
((task_name, checking_data, hotkey),),
((task_name, checking_data, hotkey, is_synthetic)),
)


async def get_synthetic_count(connection: Connection, task_name: str) -> int:
result = await connection.fetchval(
f"""
SELECT COUNT(*) FROM {dcst.TABLE_TASKS} WHERE {dcst.COLUMN_TASK_NAME} = $1 AND {dcst.COLUMN_IS_SYNTHETIC} = true
""",
task_name,
)
return result if result is not None else 0

async def get_contender_task_count(connection: Connection, contender: str) -> int:
query = f"""
SELECT tasks_queued_for_checking
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's best to use the cst for the col name here

FROM {dcst.CONTENDERS_TABLE}
WHERE contender_id = $1
"""
result = await connection.fetchval(query, contender)
return result if result is not None else 0


##### Delete stuff


Expand Down
1 change: 1 addition & 0 deletions validator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Contender(BaseModel):
requests_429: int = Field(0, description="HTTP 429 requests")
requests_500: int = Field(0, description="HTTP 500 requests")
period_score: Optional[float] = Field(None, description="Period score")
tasks_queued_for_checking: int = Field(0, description="Tasks queued for checking")

@property
def id(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion validator/query_node/src/process_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def process_task(config: Config, message: rdc.QueryQueueMessage):
logger.debug(f"Acknowledging job id : {message.job_id}")
await _acknowledge_job(config.redis_db, message.job_id)
logger.debug(f"Successfully acknowledged job id : {message.job_id} ✅")
await _decrement_requests_remaining(config.redis_db, task)
# await _decrement_requests_remaining(config.redis_db, task)
else:
logger.info("Got synthetic query, processing...")

Expand Down
2 changes: 1 addition & 1 deletion validator/query_node/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def adjust_contender_from_result(
await update_contender_capacities(config.psql_db, contender, capacity_consumed)

await db_functions.potentially_store_result_in_db(
config.psql_db, query_result, query_result.task, synthetic_query=synthetic_query, payload=payload
config.psql_db, contender, query_result, query_result.task, synthetic_query=synthetic_query, payload=payload
)
logger.debug(f"Adjusted node {contender.node_id} for task {query_result.task}.")

Expand Down
2 changes: 2 additions & 0 deletions validator/utils/database/database_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@
COLUMN_ID = "id"
COLUMN_CREATED_AT = "created_at"
COLUMN_MINER_HOTKEY = "node_hotkey"
COLUMN_IS_SYNTHETIC = "is_synthetic"

# `tasks` table column names
COLUMN_TASK_NAME = "task_name"
COLUMN_CHECKING_DATA = "checking_data"
COLUMN_TASKS_QUEUED_FOR_CHECKING = "tasks_queued_for_checking"

# `reward_data` table column names
COLUMN_TASK = "task"
Expand Down