Skip to content

Commit

Permalink
feat: add admin endpoint for bulk indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
ydennisy committed Jun 30, 2024
1 parent 172af53 commit 7e94d95
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 57 deletions.
1 change: 1 addition & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
TOKENIZERS_PARALLELISM=false
OPENAI_API_KEY=...
ADMIN_API_KEY=...
SUPABASE_URL=http://127.0.0.1:54321
SUPABASE_KEY=...
SUPABASE_AUTH_EXTERNAL_GITHUB_SECRET=...
Expand Down
10 changes: 10 additions & 0 deletions backend/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,13 @@ def get_user_profile_by_id(self, user_id: str):
if len(result.data) != 1:
return None
return result.data[0]

def get_text_node_by_url(self, url: str):
result = (
self._client.table("text_nodes")
.select("id")
.eq("url", url)
.limit(1)
.execute()
)
return result.data[0] if result.data else None
5 changes: 4 additions & 1 deletion backend/app/domain/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

class URLStatus(Enum):
RECEIVED_AWAITING_INDEXING = "RECEIVED_AWAITING_INDEXING"
INDEXING_SKIPPED_AS_RECENT_DUPLICATE = "INDEXING_SKIPPED_AS_RECENT_DUPLICATE"
INDEXED_SUCCESSFULLY = "INDEXED_SUCCESSFULLY"
INDEXING_FAILED = "INDEXING_FAILED"
INDEXING_SKIPPED_AS_DUPLICATE = "INDEXING_SKIPPED_AS_DUPLICATE"


class URLSource(Enum):
Expand Down Expand Up @@ -56,6 +56,9 @@ def set_indexing_success(self):
def set_indexing_failure(self):
self._status = URLStatus.INDEXING_FAILED

def set_indexing_skipped_due_to_duplicate(self):
self._status = URLStatus.INDEXING_SKIPPED_AS_DUPLICATE

def to_persistence(self) -> URLPersistence:
return {
"id": self._id,
Expand Down
30 changes: 28 additions & 2 deletions backend/app/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import json
from typing import List, Literal, Annotated

Expand All @@ -9,6 +10,7 @@
Request,
BackgroundTasks,
Query,
Header,
)
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
Expand All @@ -18,7 +20,12 @@

from app.db import DB
from app.llm import answer_with_context
from app.utils import NodeEmbedder, get_current_user, parse_urls_from_text
from app.utils import (
NodeEmbedder,
get_current_user,
parse_urls_from_text,
get_user_by_id,
)
from app.domain import URL, URLSource
from app.services import IndexingService

Expand Down Expand Up @@ -142,7 +149,6 @@ async def post_index_route(
try:
user_id = user.id
urls = [URL(url=url, source=URLSource.WEB) for url in payload.urls]
# await indexing_service.index(urls, user_id)
background_tasks.add_task(indexing_service.index, urls, user_id)
except Exception as ex:
raise HTTPException(500) from ex
Expand Down Expand Up @@ -182,3 +188,23 @@ async def get_profile_route(user=Depends(get_current_user)):
if not profile:
raise HTTPException(404)
return profile


@app.post("/api/admin/index", status_code=status.HTTP_202_ACCEPTED)
async def admin_index_route(
payload: IndexPayload,
user_id: str,
x_admin_api_key: str = Header(...),
background_tasks: BackgroundTasks = BackgroundTasks(),
):
if x_admin_api_key != os.getenv("ADMIN_API_KEY"):
raise HTTPException(status_code=401, detail="Invalid admin key")

try:
get_user_by_id(user_id)
except Exception as ex:
print(ex)
raise HTTPException(status_code=404, detail="User not found")

urls = [URL(url=url, source=URLSource.WEB) for url in payload.urls]
background_tasks.add_task(indexing_service.index, urls, user_id)
14 changes: 14 additions & 0 deletions backend/app/services/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ async def index(self, urls: list[URL], user_id: str):
for idx, processed_url in enumerate(processed_urls):
try:
if isinstance(processed_url, URLProcessingResult):
existing_node = db.get_text_node_by_url(processed_url.url)
if existing_node:
log.info(
f"URL already indexed (duplicate), will be skipped: {processed_url.url}"
)
urls[idx].set_indexing_skipped_due_to_duplicate()
continue

text_node = TextNode(
url=processed_url.url,
url_feed_id=urls[idx].id,
Expand All @@ -42,6 +50,12 @@ async def index(self, urls: list[URL], user_id: str):
urls[idx].set_indexing_failure()

if len(nodes) > 0:
# TODO: an issue can happen here if a dupe is submitted quickly,
# or in the same request, it will pass the exists check in the loop
# but the DB will throw and we will not even set it to failed correctly.

# A good solution is to implement UPSERT vs duplicate detection...
db.create_text_nodes(nodes=nodes, user_id=user_id)

db.update_urls(urls=urls)
log.info(f"Finished indexing {len(urls)} nodes, submitted by {user_id}")
3 changes: 2 additions & 1 deletion backend/app/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from app.utils.embedder import NodeEmbedder
from app.utils.processor import URLProcessor
from app.utils.processor import URLProcessingResult
from app.utils.auth import get_current_user
from app.utils.auth import get_current_user, get_user_by_id
from app.utils.parse import parse_urls_from_text

__all__ = [
Expand All @@ -16,4 +16,5 @@
"URLProcessingResult",
"get_current_user",
"parse_urls_from_text",
"get_user_by_id",
]
4 changes: 4 additions & 0 deletions backend/app/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ def get_current_user(authorization: Annotated[str | None, Header()] = None):
raise HTTPException(status_code=401, detail="User not found.")

return user.user


def get_user_by_id(id: str):
return client.auth.admin.get_user_by_id(id)
6 changes: 3 additions & 3 deletions backend/supabase/migrations/20240114120525_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ create extension vector;
create type
url_status as enum (
'RECEIVED_AWAITING_INDEXING',
'INDEXING_SKIPPED_AS_RECENT_DUPLICATE',
'INDEXED_SUCCESSFULLY',
'INDEXING_FAILED'
'INDEXING_FAILED',
'INDEXING_SKIPPED_AS_DUPLICATE'
);

create type
Expand All @@ -32,7 +32,6 @@ create index
user_id
);

-- TODO: do not allow dupe URLs?
create table
public.text_nodes (
id uuid primary key,
Expand All @@ -48,6 +47,7 @@ create table
embedding vector(256)
);

alter table text_nodes add constraint unique_url unique (url);
alter table text_nodes enable row level security;

create index
Expand Down
1 change: 1 addition & 0 deletions frontend/components/NavBar.vue
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const toggleMobileMenu = () => {
isMobileMenuOpen.value = !isMobileMenuOpen.value;
};
// TODO: this behaviour is buggy, we cannot logout if we are logged out!
const logout = async () => {
const { error } = await supabaseClient.auth.signOut();
Expand Down
1 change: 1 addition & 0 deletions indexer/.env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
OPENAI_API_KEY=...
DIFFBOT_API_KEY=...
ADMIN_API_KEY=...
65 changes: 15 additions & 50 deletions indexer/data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -12,49 +12,7 @@
"import requests\n",
"import pandas as pd\n",
"from dotenv import load_dotenv; load_dotenv()\n",
"from supabase import create_client, Client"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"supabase: Client = create_client(\n",
" os.environ.get(\"SUPABASE_URL\"),\n",
" os.environ.get(\"SUPABASE_KEY\")\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"USER_ID = \"c9e3aa85-b7f9-4ee7-b6ab-2673b46662aa\"\n",
"user = supabase.auth.admin.get_user_by_id(USER_ID)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"UserResponse(user=User(id='c9e3aa85-b7f9-4ee7-b6ab-2673b46662aa', app_metadata={'provider': 'github', 'providers': ['github']}, user_metadata={'avatar_url': 'https://avatars.githubusercontent.com/u/10243849?v=4', 'email': '[email protected]', 'email_verified': True, 'full_name': 'Dennis', 'iss': 'https://api.github.com', 'name': 'Dennis', 'phone_verified': False, 'preferred_username': 'ydennisy', 'provider_id': '10243849', 'sub': '10243849', 'user_name': 'ydennisy'}, aud='authenticated', confirmation_sent_at=None, recovery_sent_at=None, email_change_sent_at=None, new_email=None, invited_at=None, action_link=None, email='[email protected]', phone='', created_at=datetime.datetime(2024, 6, 16, 14, 4, 2, 590913, tzinfo=TzInfo(UTC)), confirmed_at=datetime.datetime(2024, 6, 16, 14, 4, 2, 595941, tzinfo=TzInfo(UTC)), email_confirmed_at=datetime.datetime(2024, 6, 16, 14, 4, 2, 595941, tzinfo=TzInfo(UTC)), phone_confirmed_at=None, last_sign_in_at=datetime.datetime(2024, 6, 16, 14, 4, 26, 290729, tzinfo=TzInfo(UTC)), role='authenticated', updated_at=datetime.datetime(2024, 6, 29, 6, 48, 47, 932910, tzinfo=TzInfo(UTC)), identities=[UserIdentity(id='10243849', identity_id='0b699a0e-4f94-41fb-b2c5-f5936c9a2e8b', user_id='c9e3aa85-b7f9-4ee7-b6ab-2673b46662aa', identity_data={'avatar_url': 'https://avatars.githubusercontent.com/u/10243849?v=4', 'email': '[email protected]', 'email_verified': True, 'full_name': 'Dennis', 'iss': 'https://api.github.com', 'name': 'Dennis', 'phone_verified': False, 'preferred_username': 'ydennisy', 'provider_id': '10243849', 'sub': '10243849', 'user_name': 'ydennisy'}, provider='github', created_at=datetime.datetime(2024, 6, 16, 14, 4, 2, 594030, tzinfo=TzInfo(UTC)), last_sign_in_at=datetime.datetime(2024, 6, 16, 14, 4, 2, 594012, tzinfo=TzInfo(UTC)), updated_at=datetime.datetime(2024, 6, 16, 14, 4, 26, 35347, tzinfo=TzInfo(UTC)))], factors=None))"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"supabase.auth.set_session()"
"from tqdm import tqdm"
]
},
{
Expand Down Expand Up @@ -106,7 +64,9 @@
"outputs": [],
"source": [
"final = pd.DataFrame({ \"url\": results })\n",
"final.to_csv(\"urls-processed-20240616.csv\", index=False)"
"final = final.drop_duplicates().dropna()\n",
"final.to_csv(\"urls-processed-20240616.csv\", index=False)\n",
"print(f\"There are {len(final)} URLs to index.\")"
]
},
{
Expand All @@ -115,13 +75,18 @@
"metadata": {},
"outputs": [],
"source": [
"token = os.getenv(\"TOKEN\")\n",
"admin_api_key = os.getenv(\"ADMIN_API_KEY\")\n",
"user_id = \"832c407c-280f-4976-8840-1a7f81be20b5\"\n",
"headers = {\n",
" \"Authorization\": f\"Bearer {token}\"\n",
" \"x-admin-api-key\": admin_api_key\n",
"}\n",
"for url in results:\n",
" requests.post(\"http://localhost:8000/api/index\", headers=headers, json={\"urls\": [url]})\n",
" time.sleep(1)\n"
"\n",
"for url in tqdm(results, desc=\"Indexing URLs...\"):\n",
" try:\n",
" requests.post(\"http://localhost:8000/api/admin/index\", headers=headers, json={\"urls\": [url]}, params={\"user_id\": user_id})\n",
" time.sleep(2)\n",
" except Exception as ex:\n",
" print(ex)"
]
},
{
Expand Down

0 comments on commit 7e94d95

Please sign in to comment.