Skip to content

Commit

Permalink
Demo Workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
manthanguptaa committed Dec 3, 2024
1 parent 60b2733 commit 4d2f24e
Show file tree
Hide file tree
Showing 10 changed files with 470 additions and 7 deletions.
18 changes: 18 additions & 0 deletions api/routes/workflow_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from os import getenv
from phi.playground import Playground

from workflows.blog_post import BlogPostGenerator
from workflows.hackernews import HackerNewsReporter
from workflows.investment import InvestmentAnalyst

######################################################
## Router for the workflow playground
######################################################

# Create a playground instance
playground = Playground(workflows=[BlogPostGenerator, HackerNewsReporter, InvestmentAnalyst])
# Log the playground endpoint with phidata.app
if getenv("RUNTIME_ENV") == "dev":
playground.create_endpoint("http://localhost:8008")

playground_router = playground.get_router()
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ dependencies = [
"exa_py",
"fastapi[standard]",
"google-search-results",
"lxml_html_clean",
"mypy",
"nest_asyncio",
"newspaper4k",
"openai",
"pgvector",
"phidata[aws]==2.5.7",
"phidata[aws]==2.5.33",
"pillow",
"psycopg[binary]",
"pypdf",
Expand Down
22 changes: 16 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file was autogenerated by uv via the following command:
# ./scripts/generate_requirements.sh upgrade
# ./scripts/generate_requirements.sh
alembic==1.13.3
annotated-types==0.7.0
anyio==4.6.2.post1
Expand All @@ -16,8 +16,11 @@ duckdb==1.1.2
duckduckgo-search==6.3.2
email-validator==2.2.0
exa-py==1.4.0
exceptiongroup==1.2.2
fastapi==0.115.2
fastapi-cli==0.0.5
feedparser==6.0.11
filelock==3.16.1
frozendict==2.4.6
gitdb==4.0.11
gitpython==3.1.43
Expand All @@ -32,7 +35,9 @@ iniconfig==2.0.0
jinja2==3.1.4
jiter==0.6.1
jmespath==1.0.1
joblib==1.4.2
lxml==5.3.0
lxml-html-clean==0.4.1
mako==1.3.5
markdown-it-py==3.0.0
markupsafe==3.0.1
Expand All @@ -41,19 +46,21 @@ multitasking==0.0.11
mypy==1.12.0
mypy-extensions==1.0.0
nest-asyncio==1.6.0
numpy==2.1.2
newspaper4k==0.9.3.1
nltk==3.9.1
numpy==1.26.4
openai==1.52.0
packaging==24.1
pandas==2.2.3
peewee==3.17.7
pgvector==0.3.5
phidata==2.5.7
phidata==2.5.33
pillow==11.0.0
platformdirs==4.3.6
pluggy==1.5.0
primp==0.6.4
psycopg==3.1.19
psycopg-binary==3.1.19
psycopg==3.1.18
psycopg-binary==3.1.18
pydantic==2.9.2
pydantic-core==2.23.4
pydantic-settings==2.6.0
Expand All @@ -68,9 +75,11 @@ pytz==2024.2
pyyaml==6.0.2
regex==2024.9.11
requests==2.32.3
requests-file==2.1.0
rich==13.9.2
ruff==0.7.0
s3transfer==0.10.3
sgmllib3k==1.0.0
shellingham==1.5.4
six==1.16.0
smmap==5.0.1
Expand All @@ -79,6 +88,7 @@ soupsieve==2.6
sqlalchemy==2.0.36
starlette==0.40.0
tiktoken==0.8.0
tldextract==5.1.3
tomli==2.0.2
tqdm==4.66.5
typer==0.12.5
Expand All @@ -87,7 +97,7 @@ types-html5lib==1.1.11.20241018
types-pillow==10.2.0.20240822
typing-extensions==4.12.2
tzdata==2024.2
urllib3==2.2.3
urllib3==1.26.20
uvicorn==0.32.0
uvloop==0.21.0
watchfiles==0.24.0
Expand Down
Empty file added workflows/__init__.py
Empty file.
118 changes: 118 additions & 0 deletions workflows/blog_post.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import json
from typing import Optional, Iterator

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.postgres import PgWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger

from db.session import db_url


class NewsArticle(BaseModel):
title: str = Field(..., description="Title of the article.")
url: str = Field(..., description="Link to the article.")
summary: Optional[str] = Field(..., description="Summary of the article if available.")


class SearchResults(BaseModel):
articles: list[NewsArticle]


class BlogPostGenerator(Workflow):
description: str = "Generate a blog post on a given topic."

searcher: Agent = Agent(
tools=[DuckDuckGo()],
instructions=["Given a topic, search for 20 articles and return the 5 most relevant articles."],
response_model=SearchResults,
)

writer: Agent = Agent(
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
],
)

def get_cached_blog_post(self, topic: str) -> Optional[dict]:
if "blog_posts" in self.session_state:
for cached_blog_post in self.session_state["blog_posts"]:
if cached_blog_post["topic"] == topic:
return cached_blog_post
return None

def add_cached_blog_post(self, topic: str, blog_post: Optional[str]):
if "blog_posts" not in self.session_state:
self.session_state["blog_posts"] = []
self.session_state["blog_posts"].append({"topic": topic, "blog_post": blog_post})

def search_web(self, topic: str) -> Optional[SearchResults]:
search_results: Optional[SearchResults] = None
num_tries = 0

while search_results is None and num_tries < 3:
try:
num_tries += 1
searcher_response: RunResponse = self.searcher.run(topic)
if (
searcher_response
and searcher_response.content
and isinstance(searcher_response.content, SearchResults)
):
logger.info(f"Searcher found {len(searcher_response.content.articles)} articles.")
search_results = searcher_response.content
else:
logger.warning("Searcher response invalid, trying again...")
except Exception as e:
logger.warning(f"Error running searcher: {e}")

return search_results

def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
logger.info(f"Generating a blog post on: {topic}")

# Use the cached blog post if use_cache is True
if use_cache and (cached_blog_post := self.get_cached_blog_post(topic)):
logger.info("Found cached blog post")
yield RunResponse(
run_id=self.run_id,
event=RunEvent.workflow_completed,
content=cached_blog_post["blog_post"],
)
return

# Step 1: Search the web for articles on the topic
search_results = self.search_web(topic)

# If no search_results are found for the topic, end the workflow
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
run_id=self.run_id,
event=RunEvent.workflow_completed,
content=f"Sorry, could not find any articles on the topic: {topic}",
)
return

# Step 2: Write a blog post
logger.info("Writing blog post")
# Prepare the input for the writer
writer_input = {
"topic": topic,
"articles": [v.model_dump() for v in search_results.articles],
}
# Run the writer and yield the response
yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)

# Save the blog post in the session state for future runs
content: Optional[str] = self.writer.run_response.content
if content:
self.add_cached_blog_post(topic, content)

71 changes: 71 additions & 0 deletions workflows/hackernews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json
import httpx
from typing import Iterator

from phi.agent import Agent, RunResponse
from phi.workflow import Workflow
from phi.tools.newspaper4k import Newspaper4k
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


class HackerNewsReporter(Workflow):
description: str = "Get the top stories from Hacker News and write a report on them."

hn_agent: Agent = Agent(
description="Get the top stories from hackernews. "
"Share all possible information, including url, score, title and summary if available.",
show_tool_calls=True,
)

writer: Agent = Agent(
tools=[Newspaper4k()],
description="Write an engaging report on the top stories from hackernews.",
instructions=[
"You will be provided with top stories and their links.",
"Carefully read each article and think about the contents",
"Then generate a final New York Times worthy article",
"Break the article into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Share score, title, url and summary of every article.",
"Give the section relevant titles and provide details/facts/processes in each section."
"Ignore articles that you cannot read or understand.",
"REMEMBER: you are writing for the New York Times, so the quality of the article is important.",
],
)

def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
"""Use this function to get top stories from Hacker News.
Args:
num_stories (int): Number of stories to return. Defaults to 10.
Returns:
str: JSON string of top stories.
"""

# Fetch top story IDs
response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")
story_ids = response.json()

# Fetch story details
stories = []
for story_id in story_ids[:num_stories]:
story_response = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json")
story = story_response.json()
story["username"] = story["by"]
stories.append(story)
return json.dumps(stories)

def run(self, num_stories: int = 5) -> Iterator[RunResponse]:
# Set the tools for hn_agent here to avoid circular reference
self.hn_agent.tools = [self.get_top_hackernews_stories]

logger.info(f"Getting top {num_stories} stories from HackerNews.")
top_stories: RunResponse = self.hn_agent.run(num_stories=num_stories)
if top_stories is None or not top_stories.content:
yield RunResponse(run_id=self.run_id, content="Sorry, could not get the top stories.")
return

logger.info("Reading each story and writing a report.")
yield from self.writer.run(top_stories.content, stream=True)
79 changes: 79 additions & 0 deletions workflows/investment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Iterator
from pathlib import Path
from shutil import rmtree

from phi.agent import Agent, RunResponse
from phi.workflow import Workflow
from phi.tools.yfinance import YFinanceTools
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


reports_dir = Path(__file__).parent.joinpath("reports", "investment")
if reports_dir.is_dir():
rmtree(path=reports_dir, ignore_errors=True)
reports_dir.mkdir(parents=True, exist_ok=True)
stock_analyst_report = str(reports_dir.joinpath("stock_analyst_report.md"))
research_analyst_report = str(reports_dir.joinpath("research_analyst_report.md"))
investment_report = str(reports_dir.joinpath("investment_report.md"))


class InvestmentAnalyst(Workflow):
description: str = (
"Produce a research report on a list of companies and then rank them based on investment potential."
)

stock_analyst: Agent = Agent(
tools=[YFinanceTools(company_info=True, analyst_recommendations=True, company_news=True)],
description="You are a Senior Investment Analyst for Goldman Sachs tasked with producing a research report for a very important client.",
instructions=[
"You will be provided with a list of companies to write a report on.",
"Get the company information, analyst recommendations and news for each company",
"Generate an in-depth report for each company in markdown format with all the facts and details."
"Note: This is only for educational purposes.",
],
expected_output="Report in markdown format",
save_response_to_file=stock_analyst_report,
)

research_analyst: Agent = Agent(
name="Research Analyst",
description="You are a Senior Investment Analyst for Goldman Sachs tasked with producing a ranked list of companies based on their investment potential.",
instructions=[
"You will write a research report based on the information provided by the Stock Analyst.",
"Think deeply about the value of each stock.",
"Be discerning, you are a skeptical investor focused on maximising growth.",
"Then rank the companies in order of investment potential, with as much detail about your decision as possible.",
"Prepare a markdown report with your findings with as much detail as possible.",
],
expected_output="Report in markdown format",
save_response_to_file=research_analyst_report,
)

investment_lead: Agent = Agent(
name="Investment Lead",
description="You are a Senior Investment Lead for Goldman Sachs tasked with investing $100,000 for a very important client.",
instructions=[
"You have a stock analyst and a research analyst on your team.",
"The stock analyst has produced a preliminary report on a list of companies, and then the research analyst has ranked the companies based on their investment potential.",
"Review the report provided by the research analyst and produce a investment proposal for the client.",
"Provide the amount you'll exist in each company and a report on why.",
],
save_response_to_file=investment_report,
)

def run(self, companies: str) -> Iterator[RunResponse]:
logger.info(f"Getting investment reports for companies: {companies}")
initial_report: RunResponse = self.stock_analyst.run(companies)
if initial_report is None or not initial_report.content:
yield RunResponse(run_id=self.run_id, content="Sorry, could not get the stock analyst report.")
return

logger.info("Ranking companies based on investment potential.")
ranked_companies: RunResponse = self.research_analyst.run(initial_report.content)
if ranked_companies is None or not ranked_companies.content:
yield RunResponse(run_id=self.run_id, content="Sorry, could not get the ranked companies.")
return

logger.info("Reviewing the research report and producing an investment proposal.")
yield from self.investment_lead.run(ranked_companies.content, stream=True)
Loading

0 comments on commit 4d2f24e

Please sign in to comment.