-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnections.py
48 lines (40 loc) · 1.83 KB
/
connections.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from streamlit.connections import ExperimentalBaseConnection
from deta import Deta
from streamlit.runtime.caching import cache_data
import pandas as pd
import streamlit as st
class DetaBaseConnection(ExperimentalBaseConnection[Deta]):
def _connect(self, **kwargs) -> Deta:
"""
Connect to the Deta Base using the provided project_key.
Args:
**kwargs: Additional keyword arguments passed to the connection.
Returns:
Deta: An instance of the Deta class representing the Deta Base.
"""
if '_project_key' in kwargs:
project_key = kwargs.pop('_project_key')
else:
project_key = kwargs['project_key']
# Connect to Deta Base
deta = Deta(project_key=project_key)
return deta
def fetch(self, collection_name: str, query: dict, ttl: int = 3600, **kwargs) -> pd.DataFrame:
"""
Fetch data from the Deta Base collection and cache it for the specified time.
Args:
collection_name (str): Name of the collection to fetch data from.
query (dict): Query to filter the data from the collection.
ttl (int, optional): Time to live (in seconds) for the cached data. Defaults to 3600 seconds (1 hour).
**kwargs: Additional keyword arguments passed to the fetch function.
Returns:
pd.DataFrame: A DataFrame containing the fetched data.
"""
@st.cache_data(ttl=ttl)
def _query(collection_name: str, query: dict, **kwargs) -> pd.DataFrame:
db = self._instance
collection = db.Base(collection_name)
response = collection.fetch(query)
data = list(response.items) # Convert FetchResponse to a list of items
return pd.DataFrame(data)
return _query(collection_name, query, **kwargs)