From 88d85782aa6bf8655137cd43cf6aa0c75435a62c Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 03:26:18 +0100 Subject: [PATCH] Updated logging according to new schema --- workers/fund_public_goods/db/tables/logs.py | 27 +++- .../workers/functions/create_strategy.py | 126 ++++++++++++++---- 2 files changed, 118 insertions(+), 35 deletions(-) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index 71f11ec..93e9967 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -1,14 +1,29 @@ from typing import Literal from supabase import Client +import datetime -def insert( +def create( db: Client, run_id: str, - step: Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"], - message: str + step_name: str, ): - db.table("logs").insert({ + return db.table("logs").insert({ "run_id": run_id, - "step": step, - "value": message + "step_name": step_name, }).execute() + +def update( + db: Client, + log_id: str, + status: Literal["IN_PROGRESS", "COMPLETED", "ERRORED"], + value: str | None +): + ended_at = None + if status == "COMPLETED" or status == "ERRORED": + ended_at = datetime.datetime.now().isoformat() + + return db.table("logs").update({ + "status": status, + "value": value, + "ended_at": ended_at + }).eq("id", log_id).execute() \ No newline at end of file diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 7a6018b..9bb18f0 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -2,11 +2,13 @@ from fund_public_goods.agents.researcher.functions.evaluate_projects import ( evaluate_projects, ) +from fund_public_goods.agents.researcher.models.answer import Answer from fund_public_goods.agents.researcher.models.evaluated_project import ( EvaluatedProject, ) from fund_public_goods.agents.researcher.models.project import Project from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject +from fund_public_goods.db.tables.steps import get_steps import inngest from fund_public_goods.db.tables.projects import get_projects from fund_public_goods.db.tables.runs import get_prompt @@ -14,24 +16,28 @@ from fund_public_goods.workers.events import CreateStrategyEvent from fund_public_goods.db import client, logs from supabase import Client +import typing + + +StepNames = typing.Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] +step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] def fetch_projects_data(supabase: Client) -> list[Project]: response = get_projects(supabase) - projects = [] + + projects: list[Project] = [] for item in response.data: - answers = [] + answers: list[Answer] = [] for application in item.get("applications", []): for answer in application.get("answers", []): - answers.append( - { - "question": answer.get("question", ""), - "answer": answer.get("answer", None), - } - ) - + answers.append(Answer( + question=answer.get("question", ""), + answer=answer.get("answer", None) + )) + project = Project( id=item.get("id", ""), title=item.get("title", ""), @@ -39,11 +45,26 @@ def fetch_projects_data(supabase: Client) -> list[Project]: website=item.get("website", ""), answers=answers, ) + projects.append(project) return projects +def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: + log_ids: dict[StepNames, str] = {} + + for step_name in step_names: + new_log = logs.create( + db=supabase, + run_id=run_id, + step_name=step_name, + ).data + + log_ids[step_name] = new_log[0].id + + return log_ids + @inngest.create_function( fn_id="on_create_strategy", trigger=CreateStrategyEvent.trigger, @@ -55,22 +76,44 @@ async def create_strategy( data = CreateStrategyEvent.Data.model_validate(ctx.event.data) run_id = data.run_id supabase = client.create_admin() - + prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) + + log_ids = await step.run("initialize_logs", lambda: initialize_logs(supabase, run_id)) + + await step.run( + "start_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["FETCH_PROJECTS"], + value=None, + ), + ) json_projects = await step.run( "fetch_projects_data", lambda: fetch_projects_data(supabase) ) - projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore + projects = [Project(**json_project) for json_project in json_projects] # type: ignore await step.run( - "fetched_projects_data", - lambda: logs.insert( - supabase, - run_id, - "FETCH_PROJECTS", - f"Found {len(projects)} projects", + "completed_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["FETCH_PROJECTS"], + value=f"Found {len(projects)} projects", + ), + ) + + await step.run( + "start_assess_projects", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["EVALUATE_PROJECTS"], + value=None, ), ) @@ -80,12 +123,22 @@ async def create_strategy( assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore await step.run( - "assessed_projects", - lambda: logs.insert( + "completed_assess_projects", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["EVALUATE_PROJECTS"], + value=f"Evaluated {len(assessed_projects)} projects", + ), + ) + + await step.run( + "start_determine_funding", + lambda: logs.update( supabase, - run_id, - "EVALUATE_PROJECTS", - f"Evaluated {len(assessed_projects)} projects", + status="IN_PROGRESS", + log_id=log_ids["ANALYZE_FUNDING"], + value=None, ), ) @@ -95,12 +148,22 @@ async def create_strategy( weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore await step.run( - "determined_funding", - lambda: logs.insert( + "completed_determine_funding", + lambda: logs.update( supabase, - run_id, - "ANALYZE_FUNDING", - "Determined the relative funding that the best matching projects need", + status="COMPLETED", + log_id=log_ids["ANALYZE_FUNDING"], + value="Determined the relative funding that the best matching projects need", + ), + ) + + await step.run( + "start_synthesize_results", + lambda: logs.update( + supabase, + status="IN_PROGRESS", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value=None ), ) @@ -109,8 +172,13 @@ async def create_strategy( ) await step.run( - "saved_results_to_db", - lambda: logs.insert(supabase, run_id, "SYNTHESIZE_RESULTS", "Results generated"), + "completed_synthesize_results", + lambda: logs.update( + supabase, + status="COMPLETED", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value="Results generated" + ), ) return "done"