diff --git a/fund-public-good-ai.code-workspace b/fund-public-good-ai.code-workspace new file mode 100644 index 0000000..d230356 --- /dev/null +++ b/fund-public-good-ai.code-workspace @@ -0,0 +1,12 @@ +{ + "folders": [ + { + "name": "workers", + "path": "workers" + }, + { + "name": "web", + "path": "web" + } + ] +} \ No newline at end of file diff --git a/web/app/actions/startWorker.ts b/web/app/actions/startWorker.ts index 0234c2f..a8f9586 100644 --- a/web/app/actions/startWorker.ts +++ b/web/app/actions/startWorker.ts @@ -9,6 +9,7 @@ export const startWorker = async ( prompt: string ): Promise => { const response = await fetch(`${process.env.WORKERS_URL}/api/workers`, { + cache: "no-store", method: "POST", body: JSON.stringify({ prompt }), headers: { @@ -38,6 +39,7 @@ export const regenerateStrategy = async ( const response = await fetch( `${process.env.WORKERS_URL}/api/workers/${workerId}/runs`, { + cache: "no-store", method: "POST", body: JSON.stringify({ prompt }), headers: { diff --git a/workers/fund_public_goods/api/runs.py b/workers/fund_public_goods/api/runs.py index 38ee5d8..88a7b3a 100644 --- a/workers/fund_public_goods/api/runs.py +++ b/workers/fund_public_goods/api/runs.py @@ -1,5 +1,5 @@ from fund_public_goods.inngest_client import inngest_client -from fund_public_goods.events import CreateStrategyEvent +from fund_public_goods.workers.events import CreateStrategyEvent from fund_public_goods.db import client, tables from fastapi import APIRouter, HTTPException from pydantic import BaseModel diff --git a/workers/fund_public_goods/api/workers.py b/workers/fund_public_goods/api/workers.py index 34d2ba7..53eaccb 100644 --- a/workers/fund_public_goods/api/workers.py +++ b/workers/fund_public_goods/api/workers.py @@ -1,5 +1,5 @@ from fund_public_goods.inngest_client import inngest_client -from fund_public_goods.events import CreateStrategyEvent +from fund_public_goods.workers.events import CreateStrategyEvent from fund_public_goods.db import client, tables from fastapi import APIRouter, HTTPException from pydantic import BaseModel diff --git a/workers/fund_public_goods/build_check.py b/workers/fund_public_goods/build_check.py index 1af3f3b..39ac56a 100644 --- a/workers/fund_public_goods/build_check.py +++ b/workers/fund_public_goods/build_check.py @@ -3,7 +3,7 @@ def run(): # Run mypy check - result = subprocess.run(["mypy", "--follow-imports=skip", "."], capture_output=True) + result = subprocess.run(["mypy", "."], capture_output=True) print(result.stdout.decode()) if result.returncode != 0: print("Type checking failed") diff --git a/workers/fund_public_goods/db/tables/projects.py b/workers/fund_public_goods/db/tables/projects.py index 6d6178c..dbd9b04 100644 --- a/workers/fund_public_goods/db/tables/projects.py +++ b/workers/fund_public_goods/db/tables/projects.py @@ -1,20 +1,29 @@ -from supabase import Client +from typing import Any, Dict +from supabase import Client, PostgrestAPIResponse import uuid def insert( - db: Client, - title: str, - recipient: str, - description: str, - website: str + db: Client, title: str, recipient: str, description: str, website: str ) -> str: id = str(uuid.uuid4()) - db.table("projects").insert({ - "id": id, - "title": title, - "recipient": recipient, - "description": description, - "website": website - }).execute() + db.table("projects").insert( + { + "id": id, + "title": title, + "recipient": recipient, + "description": description, + "website": website, + } + ).execute() return id + + +def get_projects(db: Client) -> PostgrestAPIResponse[Dict[str, Any]]: + return ( + db.table("projects") + .select( + "id, title, description, website, applications(id, recipient, round, answers)" + ) + .execute() + ) diff --git a/workers/fund_public_goods/db/tables/runs.py b/workers/fund_public_goods/db/tables/runs.py index 1cd75ea..caa9f7a 100644 --- a/workers/fund_public_goods/db/tables/runs.py +++ b/workers/fund_public_goods/db/tables/runs.py @@ -1,12 +1,23 @@ from supabase import Client import uuid +from ..client import create_admin def insert(db: Client, worker_id: str, prompt: str) -> str: id = str(uuid.uuid4()) - db.table("runs").insert({ - "id": id, - "worker_id": worker_id, - "prompt": prompt - }).execute() + db.table("runs").insert( + {"id": id, "worker_id": worker_id, "prompt": prompt} + ).execute() return id + + +def get_prompt(db: Client, run_id: str) -> str: + return ( + db.table("runs") + .select("prompt") + .eq("id", run_id) + .limit(1) + .single() + .execute() + .data + ) diff --git a/workers/fund_public_goods/db/tables/strategy_entries.py b/workers/fund_public_goods/db/tables/strategy_entries.py index 391ce01..536b762 100644 --- a/workers/fund_public_goods/db/tables/strategy_entries.py +++ b/workers/fund_public_goods/db/tables/strategy_entries.py @@ -1,4 +1,5 @@ from supabase import Client +from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject def insert( @@ -8,13 +9,31 @@ def insert( reasoning: str, impact: float, interest: float, - weight: float + weight: float, ): - db.table("strategy_entries").insert({ - "run_id": run_id, - "project_id": project_id, - "reasoning": reasoning, - "impact": impact, - "interest": interest, - "weight": weight, - }).execute() + db.table("strategy_entries").insert( + { + "run_id": run_id, + "project_id": project_id, + "reasoning": reasoning, + "impact": impact, + "interest": interest, + "weight": weight, + } + ).execute() + + +def insert_multiple(db: Client, run_id: str, strategies: list[WeightedProject]) -> None: + db.table("strategy_entries").insert( + [ + { + "run_id": run_id, + "reasoning": entry.evaluation.reasoning, + "weight": entry.weight, + "impact": entry.evaluation.impact, + "interest": entry.evaluation.interest, + "project_id": entry.project.id, + } + for entry in strategies + ] + ).execute() diff --git a/workers/fund_public_goods/functions/create_strategy.py b/workers/fund_public_goods/functions/create_strategy.py deleted file mode 100644 index feecda5..0000000 --- a/workers/fund_public_goods/functions/create_strategy.py +++ /dev/null @@ -1,143 +0,0 @@ -from fund_public_goods.agents.researcher.functions.assign_weights import assign_weights -from fund_public_goods.agents.researcher.functions.evaluate_projects import evaluate_projects -from fund_public_goods.agents.researcher.models.evaluated_project import EvaluatedProject -from fund_public_goods.agents.researcher.models.project import Project -from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject -import inngest -from fund_public_goods.events import CreateStrategyEvent -from fund_public_goods.db import client, logs -from supabase import Client - -def save_strategy_to_db(supabase: Client, run_id: str, entries: list[WeightedProject]): - supabase.table('strategy_entries').insert([{ - "run_id": run_id, - "reasoning": entry.evaluation.reasoning, - "weight": entry.weight, - "impact": entry.evaluation.impact, - "interest": entry.evaluation.interest, - "project_id": entry.project.id - } for entry in entries]).execute() - -def fetch_projects_data(supabase: Client): - response = supabase.table("projects").select("id, title, description, website, applications(id, recipient, round, answers)").execute() - projects = [] - - for item in response.data: - project_data = item.get('data', {}) - project_id = item.get('id', '') - - answers = [] - for application in item.get('applications', []): - for ans in application.get('answers', []): - answer = { - "question": ans.get('question', ''), - "answer": ans.get('answer', None) - } - answers.append(answer) - - project = { - "id": project_id, - "title": project_data.get('title', ''), - "description": project_data.get('description', ''), - "website": project_data.get('website', ''), - "answers": answers - } - projects.append(project) - - return projects - - -def extract_prompt(supabase: Client, run_id: str): - return supabase.table('runs').select("prompt").eq("id", run_id).limit(1).single().execute().data - - -@inngest.create_function( - fn_id="on_create_strategy", - trigger=CreateStrategyEvent.trigger, -) -async def create_strategy( - ctx: inngest.Context, - step: inngest.Step, -) -> str | None: - data = CreateStrategyEvent.Data.model_validate(ctx.event.data) - run_id = data.run_id - supabase = client.create_admin() - - await step.run( - "extracting_prompt", - lambda: logs.insert( - supabase, - run_id, - "Extracting prompt from run_id" - ), - ) - - prompt = await step.run( - "extract_prompt", - lambda: extract_prompt(supabase, run_id) - ) - - await step.run( - "fetching_projects_info", - lambda: logs.insert( - supabase, - run_id, - "Getting information from data sources" - ), - ) - - json_projects = await step.run( - "fetch_projects_data", - lambda: fetch_projects_data(supabase) - ) - - projects: list[Project] = [Project(**json_project) for json_project in json_projects] - - await step.run( - "assessing", - lambda: logs.insert( - supabase, - run_id, - "Assessing impact of each project realted to the users interest", - ), - ) - - json_asessed_projects = await step.run( - "assess_projects", - lambda: evaluate_projects(prompt, projects) - ) - assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore - - await step.run( - "determining_funding", - lambda: logs.insert( - supabase, - run_id, - "Determining the relative funding that the best matching projects need", - ), - ) - - json_weighted_projects: list[WeightedProject] = await step.run( - "determine_funding", - lambda: assign_weights(assessed_projects) - ) - weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore - - await step.run("saving_results_to_db", lambda: logs.insert( - supabase, - run_id, - "Generating results" - )) - - await step.run( - "save_strategy_to_db", - lambda: save_strategy_to_db(supabase, run_id, weighted_projects) - ) - - await step.run("result", lambda: logs.insert( - supabase, - run_id, - "STRATEGY_CREATED" - )) - - return "done" diff --git a/workers/fund_public_goods/gitcoin/functions/index_gitcoin_page.py b/workers/fund_public_goods/gitcoin/functions/index_gitcoin_page.py index e264510..69a6d10 100644 --- a/workers/fund_public_goods/gitcoin/functions/index_gitcoin_page.py +++ b/workers/fund_public_goods/gitcoin/functions/index_gitcoin_page.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, cast +from typing import cast import inngest from pydantic import parse_obj_as from fund_public_goods.gitcoin.models import ApplicationInfo, ProjectApplicationInfo, ProjectInfo, RoundInfo @@ -31,7 +31,7 @@ async def index_gitcoin_page( data = IndexGitcoinPageEvent.Data.model_validate(ctx.event.data) rounds = await step.run("fetch_rounds", lambda: fetch_rounds(data.url, first=1, skip=data.skip_rounds)) - rounds = parse_obj_as(List[RoundInfo], rounds) + rounds = parse_obj_as(list[RoundInfo], rounds) if not rounds: await step.run("stop_job", lambda: stop_job(data.job_id)) @@ -42,7 +42,7 @@ async def index_gitcoin_page( apps = await step.run("fetch_project_applications", lambda: fetch_project_applications(data.url, round.id, first=data.project_page_size, skip=data.skip_projects)) - apps = parse_obj_as(List[ApplicationInfo], apps) + apps = parse_obj_as(list[ApplicationInfo], apps) if not apps: await step.run("update_job_progress", lambda: update_job_progress(data.job_id, data.skip_rounds + 1, 0)) diff --git a/workers/fund_public_goods/gitcoin/models.py b/workers/fund_public_goods/gitcoin/models.py index 397896e..0e57356 100644 --- a/workers/fund_public_goods/gitcoin/models.py +++ b/workers/fund_public_goods/gitcoin/models.py @@ -1,4 +1,3 @@ -from typing import List from pydantic import BaseModel, ConfigDict, Field class GitcoinIndexingJob(BaseModel): diff --git a/workers/fund_public_goods/gitcoin/scrapers.py b/workers/fund_public_goods/gitcoin/scrapers.py index 019ba49..a9ed3b2 100644 --- a/workers/fund_public_goods/gitcoin/scrapers.py +++ b/workers/fund_public_goods/gitcoin/scrapers.py @@ -1,4 +1,3 @@ -from typing import List import requests from fund_public_goods.gitcoin.models import RoundInfo, ApplicationInfo @@ -11,7 +10,7 @@ def fetch_json_from_ipfs(pointer: str) -> dict: else: raise Exception(f"Failed to fetch data from IPFS for pointer {pointer} with status code: {response.status_code}") -def fetch_rounds(url: str, skip: int, first: int) -> List[RoundInfo]: +def fetch_rounds(url: str, skip: int, first: int) -> list[RoundInfo]: data = { "query": """ query GetRounds($first: Int, $skip: Int) { @@ -38,7 +37,7 @@ def fetch_rounds(url: str, skip: int, first: int) -> List[RoundInfo]: else: raise Exception(f"Request failed with status code: {response.status_code}") -def fetch_project_applications(url: str, round_id: str, skip: int, first: int) -> List[ApplicationInfo]: +def fetch_project_applications(url: str, round_id: str, skip: int, first: int) -> list[ApplicationInfo]: data = { "query": """ query GetRoundById($roundId: String, $first: Int, $skip: Int) { diff --git a/workers/fund_public_goods/main.py b/workers/fund_public_goods/main.py index 7b2a5ca..9ab6e2b 100644 --- a/workers/fund_public_goods/main.py +++ b/workers/fund_public_goods/main.py @@ -2,9 +2,9 @@ from fastapi import FastAPI import inngest.fast_api from mangum import Mangum -from fund_public_goods.gitcoin.functions import functions as gitcoin_functions from .inngest_client import inngest_client -from .functions import functions +from fund_public_goods.gitcoin.functions import functions as gitcoin_functions +from fund_public_goods.workers.functions import functions as worker_functions from .api import workers, runs from .get_version import router as get_version_router @@ -12,13 +12,8 @@ app = FastAPI() -functions += gitcoin_functions -inngest.fast_api.serve( - app, - inngest_client, - functions + gitcoin_functions, -) +inngest.fast_api.serve(app, inngest_client, [*worker_functions, *gitcoin_functions]) app.include_router(workers.router) app.include_router(runs.router) app.include_router(get_version_router) diff --git a/workers/fund_public_goods/workers/__init__.py b/workers/fund_public_goods/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/fund_public_goods/events/__init__.py b/workers/fund_public_goods/workers/events/__init__.py similarity index 100% rename from workers/fund_public_goods/events/__init__.py rename to workers/fund_public_goods/workers/events/__init__.py diff --git a/workers/fund_public_goods/events/create_strategy_event.py b/workers/fund_public_goods/workers/events/create_strategy_event.py similarity index 100% rename from workers/fund_public_goods/events/create_strategy_event.py rename to workers/fund_public_goods/workers/events/create_strategy_event.py diff --git a/workers/fund_public_goods/functions/__init__.py b/workers/fund_public_goods/workers/functions/__init__.py similarity index 100% rename from workers/fund_public_goods/functions/__init__.py rename to workers/fund_public_goods/workers/functions/__init__.py diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py new file mode 100644 index 0000000..944cc60 --- /dev/null +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -0,0 +1,116 @@ +from fund_public_goods.agents.researcher.functions.assign_weights import assign_weights +from fund_public_goods.agents.researcher.functions.evaluate_projects import ( + evaluate_projects, +) +from fund_public_goods.agents.researcher.models.evaluated_project import ( + EvaluatedProject, +) +from fund_public_goods.agents.researcher.models.project import Project +from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject +import inngest +from fund_public_goods.db.tables.projects import get_projects +from fund_public_goods.db.tables.runs import get_prompt +from fund_public_goods.db.tables.strategy_entries import insert_multiple +from fund_public_goods.workers.events import CreateStrategyEvent +from fund_public_goods.db import client, logs +from supabase import Client + + +def fetch_projects_data(supabase: Client) -> list[Project]: + response = get_projects(supabase) + projects = [] + + for item in response.data: + answers = [] + + for application in item.get("applications", []): + for answer in application.get("answers", []): + answers.append( + { + "question": answer.get("question", ""), + "answer": answer.get("answer", None), + } + ) + + project = Project( + id=item.get("id", ""), + title=item.get("title", ""), + description=item.get("description", ""), + website=item.get("website", ""), + answers=answers, + ) + projects.append(project) + + return projects + + +@inngest.create_function( + fn_id="on_create_strategy", + trigger=CreateStrategyEvent.trigger, +) +async def create_strategy( + ctx: inngest.Context, + step: inngest.Step, +) -> str | None: + data = CreateStrategyEvent.Data.model_validate(ctx.event.data) + run_id = data.run_id + supabase = client.create_admin() + + await step.run( + "extracting_prompt", + lambda: logs.insert(supabase, run_id, "Extracting prompt from run_id"), + ) + + prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) + + await step.run( + "fetching_projects_info", + lambda: logs.insert(supabase, run_id, "Getting information from data sources"), + ) + + json_projects = await step.run( + "fetch_projects_data", lambda: fetch_projects_data(supabase) + ) + + projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore + + await step.run( + "assessing", + lambda: logs.insert( + supabase, + run_id, + "Assessing impact of each project realted to the users interest", + ), + ) + + json_asessed_projects = await step.run( + "assess_projects", lambda: evaluate_projects(prompt, projects) + ) + assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore + + await step.run( + "determining_funding", + lambda: logs.insert( + supabase, + run_id, + "Determining the relative funding that the best matching projects need", + ), + ) + + json_weighted_projects: list[WeightedProject] = await step.run( + "determine_funding", lambda: assign_weights(assessed_projects) + ) + weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore + + await step.run( + "saving_results_to_db", + lambda: logs.insert(supabase, run_id, "Generating results"), + ) + + await step.run( + "save_strategy_to_db", lambda: insert_multiple(supabase, run_id, weighted_projects) + ) + + await step.run("result", lambda: logs.insert(supabase, run_id, "STRATEGY_CREATED")) + + return "done" diff --git a/workers/pydantic.mypy b/workers/pydantic.mypy new file mode 100644 index 0000000..895701c --- /dev/null +++ b/workers/pydantic.mypy @@ -0,0 +1,2 @@ +[mypy] +plugins = pydantic.mypy \ No newline at end of file diff --git a/workers/pyproject.toml b/workers/pyproject.toml index 50506d0..6ba3adb 100644 --- a/workers/pyproject.toml +++ b/workers/pyproject.toml @@ -26,6 +26,9 @@ types-requests = "^2.31.0.20240106" [tool.mypy] check_untyped_defs = true ignore_missing_imports = true +plugins = [ + "pydantic.mypy" +] [tool.poetry.scripts] build-check = "fund_public_goods.build_check:run"