-
Notifications
You must be signed in to change notification settings - Fork 2
/
process_searches.py
163 lines (134 loc) · 5.62 KB
/
process_searches.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json
from datetime import datetime, timezone
import os
import time
import hashlib
import redis
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
import settings
from app import create_app
from combined_config import all_entities_config
from oql.query import Query
from oql.results_table import ResultTable
from oql.validate import OQOValidator
app = create_app()
redis_db = redis.Redis.from_url(settings.CACHE_REDIS_URL)
search_queue = "search_queue"
# enable sentry
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
integrations=[FlaskIntegration()]
)
def fetch_results(query):
with app.app_context():
# params
entity = query.get("get_rows")
filter_works = query.get("filter_works")
filter_aggs = query.get("filter_aggs")
show_columns = query.get("show_columns")
sort_by_column = query.get("sort_by_column")
sort_by_order = query.get("sort_by_order")
# query object
query = Query(
entity=entity,
filter_works=filter_works,
filter_aggs=filter_aggs,
show_columns=show_columns,
sort_by_column=sort_by_column,
sort_by_order=sort_by_order,
)
json_data = query.execute()
# results table
results_table = ResultTable(
entity=entity,
show_columns=show_columns,
json_data=json_data,
total_count=query.total_count,
page=1,
per_page=100,
)
results_table_response = results_table.response()
results_table_response["source"] = query.source
return results_table_response
def process_searches():
last_log_time = 0
cache_expiration = 24 * 3600 # 24 hours in seconds
while True:
search_id = redis_db.lpop(search_queue)
if not search_id:
current_time = time.time()
if current_time - last_log_time >= 60:
print(f"Waiting for searches from queue {search_queue}")
last_log_time = current_time
time.sleep(0.1)
continue
search_json = redis_db.get(search_id)
if not search_json:
continue
search = json.loads(search_json)
# Check if bypass_cache is set to true or the cache is older than 24 hours
bypass_cache = search.get("bypass_cache", False)
print(f"Processing search {search_id} with bypass_cache={bypass_cache}")
last_processed_time = search["timestamps"].get("completed")
if last_processed_time:
last_processed_time = datetime.fromisoformat(last_processed_time)
time_since_processed = (datetime.now(timezone.utc) - last_processed_time).total_seconds()
else:
time_since_processed = cache_expiration + 1 # force recalculation if no timestamp
# cache_valid = not bypass_cache and time_since_processed <= cache_expiration
# turn off caching for now
cache_valid = False
# If the cache is not valid or bypass_cache is true, clear old results and reset state
if not cache_valid:
print(f"Cache is not valid for search {search_id}")
search["results"] = None
search["results_header"] = None
search["meta"] = None
search["is_ready"] = False
search["is_completed"] = False
search["timestamps"]["completed"] = None
# Save the cleared search object back to Redis
print(f"Clearing old results for search {search_id}")
redis_db.set(search_id, json.dumps(search))
# process only if results are not ready or the cache is invalid (bypass or older than 24 hours)
if search.get("is_ready") and cache_valid:
print(f"Search {search_id} is already processed and cache is valid so skipping")
continue
try:
results = fetch_results(search["query"])
if "invalid_query_error" in results:
# invalid query
search["invalid_query_error"] = results["invalid_query_error"]
search["is_ready"] = True
search["is_completed"] = True
search["timestamps"]["completed"] = datetime.now(timezone.utc).isoformat()
else:
# valid results
search["results"] = results["results"]
search["results_header"] = results["results_header"]
search["meta"] = results["meta"]
search["source"] = results["source"]
search["is_ready"] = True
search["is_completed"] = True
search["timestamps"]["completed"] = datetime.now(timezone.utc).isoformat()
print(f"Processed search {search_id} with {search}")
except Exception as e:
# backend error
print(f"Error processing search {search_id}: {e}")
search["backend_error"] = str(e)
search["is_ready"] = True
search["is_completed"] = True
search["timestamps"]["completed"] = datetime.now(timezone.utc).isoformat()
sentry_sdk.capture_exception(e)
# save updated search object back to Redis
print(f"Saving search {search_id} to redis with {search}")
redis_db.set(search_id, json.dumps(search))
# wait to avoid hammering the Redis server
time.sleep(0.1)
def generate_cache_key(query_dict):
query_str = json.dumps(query_dict, sort_keys=True)
return hashlib.md5(query_str.encode('utf-8')).hexdigest()
if __name__ == "__main__":
print(f"Processing searches from queue {search_queue}")
process_searches()