-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfailed_chute_cleanup.py
93 lines (86 loc) · 2.72 KB
/
failed_chute_cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import json
import asyncio
from loguru import logger
from sqlalchemy import text, select
import api.database.orms # noqa
from api.config import settings
from api.database import get_session
from api.chute.schemas import Chute
QUERY = """
SELECT
chutes.name,
chutes.chute_id,
chutes.version,
chutes.created_at,
NOT EXISTS (
SELECT 1
FROM instance_audit
WHERE instance_audit.chute_id = chutes.chute_id
AND instance_audit.version = chutes.version
AND verified_at IS NOT NULL
) as never_deployed,
(
SELECT COUNT(DISTINCT miner_hotkey)
FROM instance_audit
WHERE instance_audit.chute_id = chutes.chute_id
AND instance_audit.version = chutes.version
AND created_at < NOW() - INTERVAL '3 hours'
) >= 5 as has_five_miners
FROM chutes
WHERE NOT EXISTS (
SELECT 1
FROM instance_audit
WHERE instance_audit.chute_id = chutes.chute_id
AND instance_audit.version = chutes.version
AND verified_at IS NOT NULL
)
AND (
SELECT COUNT(DISTINCT miner_hotkey)
FROM instance_audit
WHERE instance_audit.chute_id = chutes.chute_id
AND instance_audit.version = chutes.version
AND created_at < NOW() - INTERVAL '90 minutes'
) >= 5
"""
async def clean_failed_chutes():
"""
Find chutes that were attempted to be deployed by at least 5 miners
without success and are at least 90 minutes old. These will never
work and should be culled.
"""
to_broadcast = []
async with get_session() as session:
result = await session.execute(text(QUERY))
for name, chute_id, created_at, _, __ in result:
chute = (
(await session.execute(select(Chute).where(Chute.chute_id == chute_id)))
.unique()
.scalar_one_or_none()
)
if not chute:
continue # how would it possibly get here?
logger.warning(
f"Chute {name} {chute_id} created {created_at} failed to deploy, wiping..."
)
to_broadcast.append(
{
"chute_id": chute.chute_id,
"version": chute.version,
}
)
await session.delete(chute)
await session.commit()
for data in to_broadcast:
await settings.redis_client.publish(
"miner_broadcast",
json.dumps(
{
"reason": "chute_deleted",
"data": data,
}
),
)
if to_broadcast:
logger.success(f"Successfully purged {len(to_broadcast)} chutes that failed to deploy.")
if __name__ == "__main__":
asyncio.run(clean_failed_chutes())