Skip to content

Commit

Permalink
Breaking: Replace Callbacks interface by Callbacks struct (server)
Browse files Browse the repository at this point in the history
This continues work start in open-telemetry#324 for Client.

The interface has the following downsides:
- Impossible to define non-trivial default behavior. Here is an example where it was needed: open-telemetry#269 (comment)
- Adding new callbacks requires expanding the interface, which is a breaking change for existing client users.

Getting rid of the interface and keeping just a struct
for callbacks solves both problems:
- Arbitrarily complex default behavior can be now defined on the struct if the user does not provide the particular callback func.
- Adding new callback funcs is not a braking change, existing users won't be affected.
  • Loading branch information
tigrannajaryan committed Dec 19, 2024
1 parent b7beae5 commit fbfd0ff
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 177 deletions.
10 changes: 5 additions & 5 deletions internal/examples/server/opampsrv/opampsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func NewServer(agents *data.Agents) *Server {
func (srv *Server) Start() {
settings := server.StartSettings{
Settings: server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
Callbacks: types.Callbacks{
OnConnecting: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: srv.onMessage,
OnConnectionCloseFunc: srv.onDisconnect,
ConnectionCallbacks: types.ConnectionCallbacks{
OnMessage: srv.onMessage,
OnConnectionClose: srv.onDisconnect,
},
}
},
Expand Down
63 changes: 0 additions & 63 deletions server/callbacks.go

This file was deleted.

71 changes: 30 additions & 41 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func New(logger types.Logger) *server {

func (s *server) Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error) {
s.settings = settings
s.settings.Callbacks.SetDefaults()
s.wsUpgrader = websocket.Upgrader{
EnableCompression: settings.EnableCompression,
}
Expand Down Expand Up @@ -169,26 +170,25 @@ func (s *server) Addr() net.Addr {

func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) {
var connectionCallbacks serverTypes.ConnectionCallbacks
if s.settings.Callbacks != nil {
resp := s.settings.Callbacks.OnConnecting(req)
if !resp.Accept {
// HTTP connection is not accepted. Set the response headers.
for k, v := range resp.HTTPResponseHeader {
w.Header().Set(k, v)
}
// And write the response status code.
w.WriteHeader(resp.HTTPStatusCode)
return
resp := s.settings.Callbacks.OnConnecting(req)
if !resp.Accept {
// HTTP connection is not accepted. Set the response headers.
for k, v := range resp.HTTPResponseHeader {
w.Header().Set(k, v)
}
// use connection-specific handler provided by ConnectionResponse
connectionCallbacks = resp.ConnectionCallbacks
// And write the response status code.
w.WriteHeader(resp.HTTPStatusCode)
return
}
// use connection-specific handler provided by ConnectionResponse
connectionCallbacks = resp.ConnectionCallbacks
connectionCallbacks.SetDefaults()

// HTTP connection is accepted. Check if it is a plain HTTP request.

if req.Header.Get(headerContentType) == contentTypeProtobuf {
// Yes, a plain HTTP request.
s.handlePlainHTTPRequest(req, w, connectionCallbacks)
s.handlePlainHTTPRequest(req, w, &connectionCallbacks)
return
}

Expand All @@ -201,10 +201,10 @@ func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) {

// Return from this func to reduce memory usage.
// Handle the connection on a separate goroutine.
go s.handleWSConnection(req.Context(), conn, connectionCallbacks)
go s.handleWSConnection(req.Context(), conn, &connectionCallbacks)
}

func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Conn, connectionCallbacks serverTypes.ConnectionCallbacks) {
func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Conn, connectionCallbacks *serverTypes.ConnectionCallbacks) {
agentConn := wsConnection{wsConn: wsConn, connMutex: &sync.Mutex{}}

defer func() {
Expand All @@ -216,14 +216,10 @@ func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Co
}
}()

if connectionCallbacks != nil {
connectionCallbacks.OnConnectionClose(agentConn)
}
connectionCallbacks.OnConnectionClose(agentConn)
}()

if connectionCallbacks != nil {
connectionCallbacks.OnConnected(reqCtx, agentConn)
}
connectionCallbacks.OnConnected(reqCtx, agentConn)

sentCustomCapabilities := false

Expand Down Expand Up @@ -254,21 +250,19 @@ func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Co
continue
}

if connectionCallbacks != nil {
response := connectionCallbacks.OnMessage(msgContext, agentConn, &request)
if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
}
if !sentCustomCapabilities {
response.CustomCapabilities = &protobufs.CustomCapabilities{
Capabilities: s.settings.CustomCapabilities,
}
sentCustomCapabilities = true
}
err = agentConn.Send(msgContext, response)
if err != nil {
s.logger.Errorf(msgContext, "Cannot send message to WebSocket: %v", err)
response := connectionCallbacks.OnMessage(msgContext, agentConn, &request)
if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
}
if !sentCustomCapabilities {
response.CustomCapabilities = &protobufs.CustomCapabilities{
Capabilities: s.settings.CustomCapabilities,
}
sentCustomCapabilities = true
}
err = agentConn.Send(msgContext, response)
if err != nil {
s.logger.Errorf(msgContext, "Cannot send message to WebSocket: %v", err)
}
}
}
Expand Down Expand Up @@ -310,7 +304,7 @@ func compressGzip(data []byte) ([]byte, error) {
return buf.Bytes(), nil
}

func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter, connectionCallbacks serverTypes.ConnectionCallbacks) {
func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter, connectionCallbacks *serverTypes.ConnectionCallbacks) {
bodyBytes, err := s.readReqBody(req)
if err != nil {
s.logger.Debugf(req.Context(), "Cannot read HTTP body: %v", err)
Expand All @@ -331,11 +325,6 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter
conn: connFromRequest(req),
}

if connectionCallbacks == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

connectionCallbacks.OnConnected(req.Context(), agentConn)

defer func() {
Expand Down
Loading

0 comments on commit fbfd0ff

Please sign in to comment.