Skip to content

Commit

Permalink
Update Delta snapshots less often
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Dec 13, 2024
1 parent c14eb41 commit ae68210
Showing 1 changed file with 73 additions and 32 deletions.
105 changes: 73 additions & 32 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,9 @@ func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ strin
}

func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscoveryRequest) error {
log.Logger.Info("OnStreamDeltaRequest")
st, _ := s.streams.Load(id)
stream := st.(*Stream)
log.Logger.Info("OnStreamRequest",
log.Logger.Info("OnStreamDeltaRequest",
"id", id,
"peer", stream.peerAddress,
"received", r.GetTypeUrl(),
Expand All @@ -401,16 +400,20 @@ func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscovery
)

s.addOrUpdateNode(r.GetNode().GetId(), stream.peerAddress, id)
if len(r.GetResourceNamesSubscribe()) > 0 || len(r.GetResourceNamesUnsubscribe()) > 0 {
if s.needToUpdateDeltaSubscriptions(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()) {
if err := s.updateDeltaStreamNodeResources(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()); err != nil {
return err
}
}
return nil
}

func (s *Snapshotter) OnStreamDeltaResponse(i int64, request *discovery.DeltaDiscoveryRequest, response *discovery.DeltaDiscoveryResponse) {
log.Logger.Info("OnStreamDeltaResponse")
func (s *Snapshotter) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) {
log.Logger.Info("OnStreamDeltaResponse",
"id", id,
"type", resp.GetTypeUrl(),
"resources", len(resp.GetResources()),
)
}

func (s *Snapshotter) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
Expand Down Expand Up @@ -587,42 +590,53 @@ func (s *Snapshotter) updateNodeDeltaStreamServiceResources(nodeID, typeURL stri
}
// Calculate new resources to track based on subscribed and unsubscribed list
currTypedResources := nodeResources[streamID].servicesNames[typeURL]
for _, resourceName := range subscribe {
if !slices.Contains(currTypedResources, resourceName) {
currTypedResources = append(currTypedResources, resourceName)
// Snap on every change
newSnapResources, err := s.getResourcesFromCache(typeURL, currTypedResources)
if err != nil {
return fmt.Errorf("Cannot get resources from cache: %s", err)
}
nodeResources[streamID].services[typeURL] = newSnapResources
nodeResources[streamID].servicesNames[typeURL] = currTypedResources
updatedNode := &Node{
address: node.address,
resources: nodeResources,
serviceSnapVersion: node.serviceSnapVersion,
endpointsSnapVersion: node.endpointsSnapVersion,
}
s.nodes.Store(nodeID, updatedNode)
return s.nodeServiceSnapshot(nodeID)
}

}

// Unsubscribe resources
newTypedResources := []string{}
for _, resourceName := range currTypedResources {
if !slices.Contains(unsubscribe, resourceName) {
newTypedResources = append(newTypedResources, resourceName)
}
}
for _, resourceName := range subscribe {
if !slices.Contains(newTypedResources, resourceName) {
newTypedResources = append(newTypedResources, resourceName)
}
}

var newSnapResources []types.Resource
var err error
if s.localhostEndpoints {
newSnapResources, err = s.makeDummyResources(typeURL, newTypedResources)
if err != nil {
return fmt.Errorf("Cannot make dummy resources for localhost mode: %s", err)
}
log.Logger.Debug("Created dummy resources", "type", typeURL, "resources", newSnapResources, "count", len(newSnapResources))
} else {
newSnapResources, err = s.getResourcesFromCache(typeURL, newTypedResources)
if len(newTypedResources) < len(currTypedResources) {
newSnapResources, err := s.getResourcesFromCache(typeURL, newTypedResources)
if err != nil {
return fmt.Errorf("Cannot get resources from cache: %s", err)
}
nodeResources[streamID].services[typeURL] = newSnapResources
nodeResources[streamID].servicesNames[typeURL] = newTypedResources
updatedNode := &Node{
address: node.address,
resources: nodeResources,
serviceSnapVersion: node.serviceSnapVersion,
endpointsSnapVersion: node.endpointsSnapVersion,
}
s.nodes.Store(nodeID, updatedNode)
if err := s.nodeServiceSnapshot(nodeID); err != nil {
return fmt.Errorf("Failed to snap resources to cache for node %s: %s", nodeID, err)
}
}

nodeResources[streamID].services[typeURL] = newSnapResources
nodeResources[streamID].servicesNames[typeURL] = newTypedResources
updatedNode := &Node{
address: node.address,
resources: nodeResources,
serviceSnapVersion: node.serviceSnapVersion,
endpointsSnapVersion: node.endpointsSnapVersion,
}
s.nodes.Store(nodeID, updatedNode)
return nil
}

Expand Down Expand Up @@ -805,7 +819,7 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
node := n.(*Node)
sNodeResources, ok := node.resources[streamID]
if !ok {
log.Logger.Warn("Cannot check if snapshot needs updating, strema not found", "id", streamID)
log.Logger.Warn("Cannot check if snapshot needs updating, stream not found", "id", streamID)
return false
}
if mapTypeURL(typeURL) == "services" {
Expand All @@ -817,6 +831,33 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
return false
}

// needToUpdateDeltaSubscriptions returns true if a delta request changes the
// list of subscribed resources
func (s *Snapshotter) needToUpdateDeltaSubscriptions(nodeID, typeURL string, streamID int64, subscribe, unsubscribe []string) bool {
n, ok := s.nodes.Load(nodeID)
if !ok {
return false
}
node := n.(*Node)
sNodeResources, ok := node.resources[streamID]
if !ok {
log.Logger.Warn("Cannot check if delta subscriptions changed, stream not found", "id", streamID)
return false
}
resources := sNodeResources.servicesNames[typeURL]
for _, r := range subscribe {
if !slices.Contains(resources, r) {
return true
}
}
for _, r := range unsubscribe {
if slices.Contains(resources, r) {
return true
}
}
return false
}

// makeDummyResources blindly creates default configuration resources to snapshot based on the requested names.
// For endpoints it will create a single localhost one
func (s *Snapshotter) makeDummyResources(typeURL string, resources []string) ([]types.Resource, error) {
Expand Down

0 comments on commit ae68210

Please sign in to comment.