Skip to content

Commit

Permalink
Fix SSE session handling (#373)
Browse files Browse the repository at this point in the history
* feat: add debug mode for web server configuration

- Introduced a new debug flag in the WebServer settings to enable detailed logging.
- Set the default value for the debug mode to false in the configuration.
- Updated validation logic to include the new debug flag for web server settings.

* feat: enhance SSEHandler with debug logging for client connections

- Added debug logging to the SSEHandler for tracking client connections and notifications.
- Logs the total number of connected clients when a new client connects, a notification is sent, or a client disconnects, contingent on the debug mode being enabled in the web server configuration.

* feat: enhance SSEHandler with improved connection management and debug logging

- Added context cancellation for better cleanup of client connections.
- Implemented heartbeat mechanism to keep connections alive and detect issues.
- Enhanced debug logging for client connection events, including successful notifications and errors during marshaling and writing.
- Updated client channel handling to prevent blocking and improve notification delivery.

* feat: implement singleton SSE manager for improved notification handling

- Added a singleton SSEManager to manage server-sent events (SSE) for real-time notifications.
- Implemented methods for subscribing and unsubscribing notification callbacks.
- Enhanced the initialization process to ensure a single event source is used and cleaned up on page unload.
- Updated the component initialization to utilize the new SSEManager for handling notifications.
  • Loading branch information
tphakala authored Jan 8, 2025
1 parent ffd5d25 commit 3251595
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 15 deletions.
1 change: 1 addition & 0 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ type Settings struct {
Realtime RealtimeSettings // Realtime processing settings

WebServer struct {
Debug bool // true to enable debug mode
Enabled bool // true to enable web server
Port string // port for web server
Log LogConfig // logging configuration for web server
Expand Down
1 change: 1 addition & 0 deletions internal/conf/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func setDefaultConfig() {
viper.SetDefault("realtime.telemetry.listen", "0.0.0.0:8090")

// Webserver configuration
viper.SetDefault("webserver.debug", false)
viper.SetDefault("webserver.enabled", true)
viper.SetDefault("webserver.port", "8080")

Expand Down
1 change: 1 addition & 0 deletions internal/conf/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func validateBirdNETSettings(settings *BirdNETConfig) error {

// validateWebServerSettings validates the WebServer-specific settings
func validateWebServerSettings(settings *struct {
Debug bool
Enabled bool
Port string
Log LogConfig
Expand Down
77 changes: 67 additions & 10 deletions internal/httpcontroller/handlers/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package handlers

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/labstack/echo/v4"
"github.com/tphakala/birdnet-go/internal/conf"
)

type Notification struct {
Expand All @@ -19,55 +23,108 @@ type Notification struct {
type SSEHandler struct {
clients map[chan Notification]bool
clientsMux sync.Mutex
debug bool
}

func NewSSEHandler() *SSEHandler {
return &SSEHandler{
clients: make(map[chan Notification]bool),
debug: conf.Setting().WebServer.Debug,
}
}

func (h *SSEHandler) ServeSSE(c echo.Context) error {
h.Debug("SSE: New connection request from %s", c.Request().RemoteAddr)

c.Response().Header().Set("Content-Type", "text/event-stream")
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")
c.Response().WriteHeader(http.StatusOK)

clientChan := make(chan Notification)
clientChan := make(chan Notification, 100)
h.addClient(clientChan)
defer h.removeClient(clientChan)

c.Response().Flush()
// Use a context with cancel for cleanup
ctx, cancel := context.WithCancel(c.Request().Context())
defer func() {
cancel()
h.removeClient(clientChan)
h.Debug("SSE: Connection closed for %s", c.Request().RemoteAddr)
}()

// Add heartbeat
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()

for {
select {
case <-ctx.Done():
h.Debug("SSE: Context cancelled for %s", c.Request().RemoteAddr)
return nil
case notification := <-clientChan:
data, _ := json.Marshal(notification)
fmt.Fprintf(c.Response(), "data: %s\n\n", data)
data, err := json.Marshal(notification)
if err != nil {
h.Debug("SSE: Error marshaling notification: %v", err)
continue
}
_, err = fmt.Fprintf(c.Response(), "data: %s\n\n", data)
if err != nil {
h.Debug("SSE: Write error for %s: %v", c.Request().RemoteAddr, err)
return err
}
c.Response().Flush()
case <-heartbeat.C:
_, err := fmt.Fprintf(c.Response(), ":\n\n")
if err != nil {
h.Debug("SSE: Heartbeat error for %s: %v", c.Request().RemoteAddr, err)
return err
}
c.Response().Flush()
case <-c.Request().Context().Done():
return nil
}
}
}

func (h *SSEHandler) SendNotification(notification Notification) {
h.clientsMux.Lock()
defer h.clientsMux.Unlock()
clientCount := len(h.clients)
log.Printf("SSE: Starting to broadcast notification to %d clients", clientCount)

for clientChan := range h.clients {
clientChan <- notification
select {
case clientChan <- notification:
h.Debug("SSE: Successfully sent notification to client channel")
default:
h.Debug("SSE: Warning - Client channel is blocked, skipping notification")
// Optionally, you might want to remove blocked clients
// go h.removeClient(clientChan)
}
}
h.clientsMux.Unlock()
h.Debug("SSE: Finished broadcasting notification to all clients")
}

func (h *SSEHandler) addClient(clientChan chan Notification) {
h.clientsMux.Lock()
defer h.clientsMux.Unlock()
h.clients[clientChan] = true
h.clientsMux.Unlock()
h.Debug("SSE: New client connected. Channel buffer remaining: %d. Total clients: %d", cap(clientChan)-len(clientChan), len(h.clients))
}

func (h *SSEHandler) removeClient(clientChan chan Notification) {
h.clientsMux.Lock()
delete(h.clients, clientChan)
close(clientChan)
h.clientsMux.Unlock()

h.Debug("SSE: Client disconnected. Total clients: %d", len(h.clients))
}

func (h *SSEHandler) Debug(format string, v ...interface{}) {
if h.debug {
if len(v) == 0 {
log.Print(format)
} else {
log.Printf(format, v...)
}
}
}
56 changes: 51 additions & 5 deletions views/settings/settingsBase.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,43 @@
}, 50); // Small delay to ensure Alpine has finished rendering
});
});

// Create a singleton SSE manager
window.SSEManager = window.SSEManager || {
eventSource: null,
notificationCallbacks: new Set(),

init() {
if (this.eventSource) {
return; // Already initialized
}

this.eventSource = new EventSource('/sse');
this.eventSource.onmessage = (event) => {
const notification = JSON.parse(event.data);
this.notificationCallbacks.forEach(callback => callback(notification));
};

// Clean up on page unload
window.addEventListener('unload', () => this.cleanup());
},

cleanup() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.notificationCallbacks.clear();
},

subscribe(callback) {
this.notificationCallbacks.add(callback);
},

unsubscribe(callback) {
this.notificationCallbacks.delete(callback);
}
};
</script>
<style>
[x-cloak] {
Expand Down Expand Up @@ -68,13 +105,22 @@
notifications: [],
hasChanges: false,
saving: false,
initSSE() {
const eventSource = new EventSource('/sse');
eventSource.onmessage = (event) => {
const notification = JSON.parse(event.data);
init() {
// Subscribe to notifications using the singleton SSE manager
const handleNotification = (notification) => {
this.addNotification(notification.message, notification.type);
};
window.SSEManager.subscribe(handleNotification);
window.SSEManager.init(); // Initialize if not already done
// Cleanup on component destroy
this.$cleanup(() => {
window.SSEManager.unsubscribe(handleNotification);
});
},
addNotification(message, type) {
const id = Date.now();
this.notifications.push({ id, message, type });
Expand Down Expand Up @@ -144,7 +190,7 @@
}
});
}
}" x-init="initSSE()" x-bind:class="$store.pageLoaded.loaded ? 'page-loaded' : ''">
}" x-init="init()" x-bind:class="$store.pageLoaded.loaded ? 'page-loaded' : ''">
<form id="settingsForm" @submit.prevent="saveSettings()">
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-2">
<!-- Settings content rendered here -->
Expand Down

0 comments on commit 3251595

Please sign in to comment.