Skip to content

Commit

Permalink
Updated logging according to new schema
Browse files Browse the repository at this point in the history
  • Loading branch information
namesty committed Jan 25, 2024
1 parent 66fee47 commit 88d8578
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 35 deletions.
27 changes: 21 additions & 6 deletions workers/fund_public_goods/db/tables/logs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
from typing import Literal
from supabase import Client
import datetime

def insert(
def create(
db: Client,
run_id: str,
step: Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"],
message: str
step_name: str,
):
db.table("logs").insert({
return db.table("logs").insert({
"run_id": run_id,
"step": step,
"value": message
"step_name": step_name,
}).execute()

def update(
db: Client,
log_id: str,
status: Literal["IN_PROGRESS", "COMPLETED", "ERRORED"],
value: str | None
):
ended_at = None
if status == "COMPLETED" or status == "ERRORED":
ended_at = datetime.datetime.now().isoformat()

return db.table("logs").update({
"status": status,
"value": value,
"ended_at": ended_at
}).eq("id", log_id).execute()
126 changes: 97 additions & 29 deletions workers/fund_public_goods/workers/functions/create_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,69 @@
from fund_public_goods.agents.researcher.functions.evaluate_projects import (
evaluate_projects,
)
from fund_public_goods.agents.researcher.models.answer import Answer
from fund_public_goods.agents.researcher.models.evaluated_project import (
EvaluatedProject,
)
from fund_public_goods.agents.researcher.models.project import Project
from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject
from fund_public_goods.db.tables.steps import get_steps
import inngest
from fund_public_goods.db.tables.projects import get_projects
from fund_public_goods.db.tables.runs import get_prompt
from fund_public_goods.db.tables.strategy_entries import insert_multiple
from fund_public_goods.workers.events import CreateStrategyEvent
from fund_public_goods.db import client, logs
from supabase import Client
import typing


StepNames = typing.Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"]
step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"]


def fetch_projects_data(supabase: Client) -> list[Project]:
response = get_projects(supabase)
projects = []

projects: list[Project] = []

for item in response.data:
answers = []
answers: list[Answer] = []

for application in item.get("applications", []):
for answer in application.get("answers", []):
answers.append(
{
"question": answer.get("question", ""),
"answer": answer.get("answer", None),
}
)

answers.append(Answer(
question=answer.get("question", ""),
answer=answer.get("answer", None)
))

project = Project(
id=item.get("id", ""),
title=item.get("title", ""),
description=item.get("description", ""),
website=item.get("website", ""),
answers=answers,
)

projects.append(project)

return projects


def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]:
log_ids: dict[StepNames, str] = {}

for step_name in step_names:
new_log = logs.create(
db=supabase,
run_id=run_id,
step_name=step_name,
).data

log_ids[step_name] = new_log[0].id

return log_ids

@inngest.create_function(
fn_id="on_create_strategy",
trigger=CreateStrategyEvent.trigger,
Expand All @@ -55,22 +76,44 @@ async def create_strategy(
data = CreateStrategyEvent.Data.model_validate(ctx.event.data)
run_id = data.run_id
supabase = client.create_admin()

prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id))

log_ids = await step.run("initialize_logs", lambda: initialize_logs(supabase, run_id))

await step.run(
"start_fetch_projects_data",
lambda: logs.update(
db=supabase,
status="IN_PROGRESS",
log_id=log_ids["FETCH_PROJECTS"],
value=None,
),
)

json_projects = await step.run(
"fetch_projects_data", lambda: fetch_projects_data(supabase)
)

projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore
projects = [Project(**json_project) for json_project in json_projects] # type: ignore

await step.run(
"fetched_projects_data",
lambda: logs.insert(
supabase,
run_id,
"FETCH_PROJECTS",
f"Found {len(projects)} projects",
"completed_fetch_projects_data",
lambda: logs.update(
db=supabase,
status="COMPLETED",
log_id=log_ids["FETCH_PROJECTS"],
value=f"Found {len(projects)} projects",
),
)

await step.run(
"start_assess_projects",
lambda: logs.update(
db=supabase,
status="IN_PROGRESS",
log_id=log_ids["EVALUATE_PROJECTS"],
value=None,
),
)

Expand All @@ -80,12 +123,22 @@ async def create_strategy(
assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore

await step.run(
"assessed_projects",
lambda: logs.insert(
"completed_assess_projects",
lambda: logs.update(
db=supabase,
status="COMPLETED",
log_id=log_ids["EVALUATE_PROJECTS"],
value=f"Evaluated {len(assessed_projects)} projects",
),
)

await step.run(
"start_determine_funding",
lambda: logs.update(
supabase,
run_id,
"EVALUATE_PROJECTS",
f"Evaluated {len(assessed_projects)} projects",
status="IN_PROGRESS",
log_id=log_ids["ANALYZE_FUNDING"],
value=None,
),
)

Expand All @@ -95,12 +148,22 @@ async def create_strategy(
weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore

await step.run(
"determined_funding",
lambda: logs.insert(
"completed_determine_funding",
lambda: logs.update(
supabase,
run_id,
"ANALYZE_FUNDING",
"Determined the relative funding that the best matching projects need",
status="COMPLETED",
log_id=log_ids["ANALYZE_FUNDING"],
value="Determined the relative funding that the best matching projects need",
),
)

await step.run(
"start_synthesize_results",
lambda: logs.update(
supabase,
status="IN_PROGRESS",
log_id=log_ids["SYNTHESIZE_RESULTS"],
value=None
),
)

Expand All @@ -109,8 +172,13 @@ async def create_strategy(
)

await step.run(
"saved_results_to_db",
lambda: logs.insert(supabase, run_id, "SYNTHESIZE_RESULTS", "Results generated"),
"completed_synthesize_results",
lambda: logs.update(
supabase,
status="COMPLETED",
log_id=log_ids["SYNTHESIZE_RESULTS"],
value="Results generated"
),
)

return "done"

0 comments on commit 88d8578

Please sign in to comment.