diff --git a/xds/snapshotter.go b/xds/snapshotter.go index e9b1d2b..a40bae0 100644 --- a/xds/snapshotter.go +++ b/xds/snapshotter.go @@ -386,10 +386,9 @@ func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ strin } func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscoveryRequest) error { - log.Logger.Info("OnStreamDeltaRequest") st, _ := s.streams.Load(id) stream := st.(*Stream) - log.Logger.Info("OnStreamRequest", + log.Logger.Info("OnStreamDeltaRequest", "id", id, "peer", stream.peerAddress, "received", r.GetTypeUrl(), @@ -401,7 +400,7 @@ func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscovery ) s.addOrUpdateNode(r.GetNode().GetId(), stream.peerAddress, id) - if len(r.GetResourceNamesSubscribe()) > 0 || len(r.GetResourceNamesUnsubscribe()) > 0 { + if s.needToUpdateDeltaSubscriptions(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()) { if err := s.updateDeltaStreamNodeResources(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()); err != nil { return err } @@ -409,8 +408,12 @@ func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscovery return nil } -func (s *Snapshotter) OnStreamDeltaResponse(i int64, request *discovery.DeltaDiscoveryRequest, response *discovery.DeltaDiscoveryResponse) { - log.Logger.Info("OnStreamDeltaResponse") +func (s *Snapshotter) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) { + log.Logger.Info("OnStreamDeltaResponse", + "id", id, + "type", resp.GetTypeUrl(), + "resources", len(resp.GetResources()), + ) } func (s *Snapshotter) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) { @@ -805,7 +808,7 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6 node := n.(*Node) sNodeResources, ok := node.resources[streamID] if !ok { - log.Logger.Warn("Cannot check if snapshot needs updating, strema not found", "id", streamID) + log.Logger.Warn("Cannot check if snapshot needs updating, stream not found", "id", streamID) return false } if mapTypeURL(typeURL) == "services" { @@ -817,6 +820,33 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6 return false } +// needToUpdateDeltaSubscriptions returns true if a delta request changes the +// list of subscribed resources +func (s *Snapshotter) needToUpdateDeltaSubscriptions(nodeID, typeURL string, streamID int64, subscribe, unsubscribe []string) bool { + n, ok := s.nodes.Load(nodeID) + if !ok { + return false + } + node := n.(*Node) + sNodeResources, ok := node.resources[streamID] + if !ok { + log.Logger.Warn("Cannot check if delta subscriptions changed, stream not found", "id", streamID) + return false + } + resources := sNodeResources.servicesNames[typeURL] + for _, r := range subscribe { + if !slices.Contains(resources, r) { + return true + } + } + for _, r := range unsubscribe { + if slices.Contains(resources, r) { + return true + } + } + return false +} + // makeDummyResources blindly creates default configuration resources to snapshot based on the requested names. // For endpoints it will create a single localhost one func (s *Snapshotter) makeDummyResources(typeURL string, resources []string) ([]types.Resource, error) {