Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Unified Notifications Feature Implementation #1089

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/account_v2/authentication_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from django.db.utils import IntegrityError
from django.middleware import csrf
from django.shortcuts import redirect
from logs_helper.log_service import LogService
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
Expand Down Expand Up @@ -268,6 +269,8 @@ def make_user_organization_display_name(self, user_name: str) -> str:
return self.auth_service.make_user_organization_display_name(user_name)

def user_logout(self, request: Request) -> Response:
session_id: str = UserSessionUtils.get_session_id(request=request)
LogService.remove_logs_on_logout(session_id=session_id)
response = self.auth_service.user_logout(request=request)
organization_id = UserSessionUtils.get_organization_id(request)
user_id = UserSessionUtils.get_user_id(request)
Expand Down
3 changes: 3 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def get_required_setting(
get_required_setting("LOG_HISTORY_CONSUMER_INTERVAL", "60")
)
LOGS_BATCH_LIMIT = int(get_required_setting("LOGS_BATCH_LIMIT", "30"))
LOGS_EXPIRATION_TIME_IN_SECOND = int(
get_required_setting("LOGS_EXPIRATION_TIME_IN_SECOND")
)
CELERY_BROKER_URL = get_required_setting(
"CELERY_BROKER_URL", f"redis://{REDIS_HOST}:{REDIS_PORT}"
)
Expand Down
1 change: 1 addition & 0 deletions backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
path("api/", include("api_v2.urls")),
path("usage/", include("usage_v2.urls")),
path("notifications/", include("notification_v2.urls")),
path("logs/", include("logs_helper.urls")),
path(
UrlPathConstants.PROMPT_STUDIO,
include("prompt_studio.prompt_profile_manager_v2.urls"),
Expand Down
Empty file added backend/logs_helper/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions backend/logs_helper/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class LogsHelperConfig(AppConfig):
name = "logs_helper"
3 changes: 3 additions & 0 deletions backend/logs_helper/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class LogsHelperKeys:
LOG = "LOG"
LOG_EVENTS_ID = "log_events_id"
24 changes: 24 additions & 0 deletions backend/logs_helper/log_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from utils.cache_service import CacheService


class LogService:
@staticmethod
def remove_logs_on_logout(session_id: str) -> None:

if session_id:
key_pattern = f"{LogService.generate_redis_key(session_id=session_id)}*"

# Delete keys matching the pattern
CacheService.clear_cache(key_pattern=key_pattern)

@staticmethod
def generate_redis_key(session_id):
"""Generate a Redis key for logs based on the provided session_id.
Parameters:
session_id (str): The session identifier to include in the Redis key.
Returns:
str: The constructed Redis key.
"""
return f"logs:{session_id}"
5 changes: 5 additions & 0 deletions backend/logs_helper/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from rest_framework import serializers


class StoreLogMessagesSerializer(serializers.Serializer):
log = serializers.CharField()
16 changes: 16 additions & 0 deletions backend/logs_helper/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from django.urls import path
from rest_framework.urlpatterns import format_suffix_patterns

from .views import LogsHelperViewSet

logs_helper = LogsHelperViewSet.as_view({"get": "get_logs", "post": "store_log"})

urlpatterns = format_suffix_patterns(
[
path(
"",
logs_helper,
name="logs-helper",
),
]
)
66 changes: 66 additions & 0 deletions backend/logs_helper/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import logging
from datetime import datetime, timezone

from django.conf import settings
from django.http import HttpRequest
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from utils.cache_service import CacheService
from utils.user_session import UserSessionUtils

from .log_service import LogService
from .serializers import StoreLogMessagesSerializer

logger = logging.getLogger(__name__)


class LogsHelperViewSet(viewsets.ModelViewSet):
"""Viewset to handle all Tool Studio prompt related API logics."""

@action(detail=False, methods=["get"])
def get_logs(self, request: HttpRequest) -> Response:
# Extract the session ID
session_id: str = UserSessionUtils.get_session_id(request=request)

# Construct the Redis key pattern to match keys
# associated with the session ID
redis_key = LogService.generate_redis_key(session_id=session_id)

# Retrieve keys matching the pattern
keys = CacheService.get_all_keys(f"{redis_key}*")

# Retrieve values corresponding to the keys and sort them by timestamp
logs = []
for key in keys:
log_data = CacheService.get_key(key)
logs.append(log_data)

# Sort logs based on timestamp
sorted_logs = sorted(logs, key=lambda x: x["timestamp"])

return Response({"data": sorted_logs}, status=status.HTTP_200_OK)

@action(detail=False, methods=["post"])
def store_log(self, request: HttpRequest) -> Response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tahierhussain if we are already storing logs in redis before publishing the message - why do we need this API?

"""Store log message in Redis."""
# Extract the session ID
logs_expiry = settings.LOGS_EXPIRATION_TIME_IN_SECOND
session_id: str = UserSessionUtils.get_session_id(request=request)

serializer = StoreLogMessagesSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

# Extract the log message from the validated data
log: str = serializer.validated_data.get("log")
log_data = json.loads(log)
timestamp = datetime.now(timezone.utc).timestamp()

redis_key = (
f"{LogService.generate_redis_key(session_id=session_id)}:{timestamp}"
)

CacheService.set_key(redis_key, log_data, logs_expiry)

return Response({"message": "Successfully stored the message in redis"})
2 changes: 2 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ ENABLE_LOG_HISTORY=True
LOG_HISTORY_CONSUMER_INTERVAL=30
# Maximum number of logs to insert in a single batch.
LOGS_BATCH_LIMIT=30
# Logs Expiry of 24 hours
LOGS_EXPIRATION_TIME_IN_SECOND=86400

# Celery Configuration
CELERY_BROKER_URL = "redis://unstract-redis:6379"
Expand Down
6 changes: 6 additions & 0 deletions backend/utils/cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def set_key(
expire,
)

@staticmethod
def get_all_keys(key_pattern: str) -> Any:
keys = redis_cache.keys(key_pattern)
# Ensure all keys are strings
return [key.decode("utf-8") if isinstance(key, bytes) else key for key in keys]

@staticmethod
def clear_cache(key_pattern: str) -> Any:
"""Delete keys in bulk based on the key pattern."""
Expand Down
4 changes: 4 additions & 0 deletions backend/utils/user_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def set_organization_member_role(
) -> None:
request.session["role"] = member.role

@staticmethod
def get_session_id(request: HttpRequest) -> Optional[str]:
return request.session.session_key

@staticmethod
def get_organization_member_role(request: HttpRequest) -> Optional[str]:
return request.session.get("role")
Expand Down
11 changes: 11 additions & 0 deletions frontend/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import PostHogPageviewTracker from "./PostHogPageviewTracker.js";
import { PageTitle } from "./components/widgets/page-title/PageTitle.jsx";
import { useEffect } from "react";
import CustomMarkdown from "./components/helpers/custom-markdown/CustomMarkdown.jsx";
import { useSocketLogsStore } from "./store/socket-logs-store.js";

let GoogleTagManagerHelper;
try {
Expand All @@ -24,6 +25,7 @@ function App() {
const { defaultAlgorithm, darkAlgorithm } = theme;
const { sessionDetails } = useSessionStore();
const { alertDetails } = useAlertStore();
const { pushLogMessages } = useSocketLogsStore();

const btn = (
<>
Expand Down Expand Up @@ -55,6 +57,15 @@ function App() {
btn,
key: alertDetails?.key,
});

pushLogMessages([
{
timestamp: Math.floor(Date.now() / 1000),
level: alertDetails?.type ? alertDetails?.type.toUpperCase() : "",
message: alertDetails.content,
type: "NOTIFICATION",
},
]);
}, [alertDetails]);

return (
Expand Down
79 changes: 3 additions & 76 deletions frontend/src/components/agency/agency/Agency.jsx
Original file line number Diff line number Diff line change
@@ -1,79 +1,33 @@
import { Button, Collapse, Layout, Modal } from "antd";
import {
FullscreenExitOutlined,
FullscreenOutlined,
LeftOutlined,
RightOutlined,
} from "@ant-design/icons";
import { Button, Layout } from "antd";
import { LeftOutlined, RightOutlined } from "@ant-design/icons";
import Sider from "antd/es/layout/Sider";
import { useEffect, useState } from "react";

import { IslandLayout } from "../../../layouts/island-layout/IslandLayout";
import { Actions } from "../actions/Actions";
import { WorkflowExecution } from "../workflow-execution/WorkflowExecution";
import "./Agency.css";
import { useSocketLogsStore } from "../../../store/socket-logs-store";
import { useSocketMessagesStore } from "../../../store/socket-messages-store";
import { useWorkflowStore } from "../../../store/workflow-store";
import { LogsLabel } from "../logs-label/LogsLabel";
import { SidePanel } from "../side-panel/SidePanel";
import { DisplayLogs } from "../display-logs/DisplayLogs";
import { PageTitle } from "../../widgets/page-title/PageTitle";

function Agency() {
const [isCollapsed, setIsCollapsed] = useState(false);
const [activeKey, setActiveKey] = useState([]);
const [steps, setSteps] = useState([]);
const [inputMd, setInputMd] = useState("");
const [outputMd, setOutputMd] = useState("");
const [statusBarMsg, setStatusBarMsg] = useState("");
const [sourceMsg, setSourceMsg] = useState("");
const [destinationMsg, setDestinationMsg] = useState("");
const { message, setDefault } = useSocketMessagesStore();
const { emptyLogs } = useSocketLogsStore();
const workflowStore = useWorkflowStore();
const { details, loadingType, projectName } = workflowStore;
const prompt = details?.prompt_text;
const [activeToolId, setActiveToolId] = useState("");
const [prevLoadingType, setPrevLoadingType] = useState("");
const [isUpdateSteps, setIsUpdateSteps] = useState(false);
const [stepLoader, setStepLoader] = useState(false);
const [showLogsModal, setShowLogsModal] = useState(false);

const openLogsModal = () => {
setShowLogsModal(true);
};

const closeLogsModal = () => {
setShowLogsModal(false);
};

const genExtra = () => (
<FullscreenOutlined
onClick={(event) => {
// If you don't want click extra trigger collapse, you can prevent this:
openLogsModal();
event.stopPropagation();
}}
/>
);

const getItems = () => [
{
key: "1",
label: activeKey?.length > 0 ? <LogsLabel /> : "Logs",
children: (
<div className="agency-ide-logs">
<DisplayLogs />
</div>
),
extra: genExtra(),
},
];

const handleCollapse = (keys) => {
setActiveKey(keys);
};

useEffect(() => {
if (prevLoadingType !== "EXECUTE") {
Expand Down Expand Up @@ -103,7 +57,6 @@ function Agency() {
setOutputMd("");
setStatusBarMsg("");
setDefault();
emptyLogs();
setSourceMsg("");
setDestinationMsg("");
};
Expand All @@ -112,7 +65,6 @@ function Agency() {
// Clean up function to clear all the socket messages
return () => {
setDefault();
emptyLogs();
};
}, []);

Expand All @@ -139,7 +91,6 @@ function Agency() {
}

if (msgComp === "SOURCE" && state === "RUNNING") {
setActiveKey("");
setSourceMsg("");
setDestinationMsg("");
const newSteps = [...steps].map((step) => {
Expand Down Expand Up @@ -227,31 +178,7 @@ function Agency() {
stepLoader={stepLoader}
/>
</div>
<div className="agency-footer">
<Collapse
className="agency-ide-collapse-panel"
size="small"
activeKey={activeKey}
items={getItems()}
expandIconPosition="end"
onChange={handleCollapse}
bordered={false}
/>
<Modal
title="Logs"
open={showLogsModal}
onCancel={closeLogsModal}
className="agency-ide-log-modal"
footer={null}
width={1000}
closeIcon={<FullscreenExitOutlined />}
>
<LogsLabel />
<div className="agency-ide-logs">
<DisplayLogs />
</div>
</Modal>
</div>
<div className="height-20" />
</div>
);
}
Expand Down
Loading
Loading