-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
141 lines (114 loc) · 4.02 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, EmailStr
from celery import Celery
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
from dotenv import load_dotenv
import time
import random
app = FastAPI()
load_dotenv("./.env")
celery_app = Celery(
"main",
backend=os.getenv("CELERY_BACKEND_URL"),
broker=os.getenv("CELERY_BROKER_URL"),
)
celery_app.conf.task_track_started = True
def send_email_sync(email: str, subject: str, body: str):
"""
Sends an email synchronously.
Args:
email (str): The recipient's email address.
subject (str): The subject of the email.
body (str): The body content of the email.
Returns:
None
"""
msg = MIMEMultipart()
msg["From"] = os.getenv("MAIL_FROM")
msg["To"] = email
msg["Subject"] = subject
msg.attach(MIMEText(body, "html"))
with smtplib.SMTP(os.getenv("MAIL_SERVER"), os.getenv("MAIL_PORT")) as server:
server.starttls()
server.login(os.getenv("MAIL_USERNAME"), os.getenv("MAIL_PASSWORD"))
server.send_message(msg)
class EmailSchema(BaseModel):
"""
Represents the schema for an email.
Attributes:
email (EmailStr): The email address.
subject (str): The subject of the email. Default is "Hello from FastAPI".
body (str): The body of the email. Default is "This is a test email sent from a background Celery task."
"""
email: EmailStr
subject: str = "Hello from FastAPI"
body: str = "This is a test email sent from a background Celery task."
@celery_app.task
def send_async_email(email: str, subject: str, body: str):
"""
Sends an email asynchronously using Celery.
Args:
email (str): The recipient's email address.
subject (str): The subject of the email.
body (str): The body of the email.
Raises:
Exception: If an error occurs while sending the email.
"""
try:
send_email_sync(email, subject, body)
except Exception as e:
raise e
@app.post("/send-email/")
async def send_email(email: EmailSchema):
"""
Sends an email asynchronously using Celery.
Args:
email (EmailSchema): The email details.
Returns:
dict: A dictionary containing the message and task ID.
"""
task = send_async_email.delay(email.email, email.subject, email.body)
return {"message": "Email sending initiated!", "task_id": task.id}
@celery_app.task(bind=True)
def long_task(self):
"""
Perform a long-running task.
This function simulates a long-running task by iterating a certain number of times and updating the task state
with progress information. It sleeps for 1 second between iterations.
Returns:
dict: A dictionary containing the current progress, total progress, status message, and result of the task.
"""
verb, adjective, noun = (
["Starting up", "Booting", "Repairing", "Loading", "Checking"],
["master", "radiant", "silent", "harmonic", "fast"],
["solar array", "particle reshaper", "cosmic ray", "orbiter", "bit"],
)
total = random.randint(10, 50)
for i in range(total):
message = (
f"{random.choice(verb)} {random.choice(adjective)} {random.choice(noun)}..."
)
self.update_state(
state="PROGRESS", meta={"current": i, "total": total, "status": message}
)
time.sleep(1)
return {"current": 100, "total": 100, "status": "Task completed!", "result": 42}
@app.get("/status/{task_id}")
async def task_status(task_id: str):
"""
Get the status of a task.
Args:
task_id (str): The ID of the task.
Returns:
JSONResponse: The status of the task.
"""
task = long_task.AsyncResult(task_id)
if task.state == "PENDING":
task = long_task.AsyncResult(task_id)
response = {"state": task.state, "status": "Pending..."}
response["status"] = str(task.info)
return JSONResponse(response)