Skip to content

Commit

Permalink
Update Delta snapshots less often
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Dec 13, 2024
1 parent c14eb41 commit 8f906e7
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,9 @@ func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ strin
}

func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscoveryRequest) error {
log.Logger.Info("OnStreamDeltaRequest")
st, _ := s.streams.Load(id)
stream := st.(*Stream)
log.Logger.Info("OnStreamRequest",
log.Logger.Info("OnStreamDeltaRequest",
"id", id,
"peer", stream.peerAddress,
"received", r.GetTypeUrl(),
Expand All @@ -401,16 +400,20 @@ func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscovery
)

s.addOrUpdateNode(r.GetNode().GetId(), stream.peerAddress, id)
if len(r.GetResourceNamesSubscribe()) > 0 || len(r.GetResourceNamesUnsubscribe()) > 0 {
if s.needToUpdateDeltaSubscriptions(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()) {
if err := s.updateDeltaStreamNodeResources(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()); err != nil {
return err
}
}
return nil
}

func (s *Snapshotter) OnStreamDeltaResponse(i int64, request *discovery.DeltaDiscoveryRequest, response *discovery.DeltaDiscoveryResponse) {
log.Logger.Info("OnStreamDeltaResponse")
func (s *Snapshotter) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) {
log.Logger.Info("OnStreamDeltaResponse",
"id", id,
"type", resp.GetTypeUrl(),
"resources", len(resp.GetResources()),
)
}

func (s *Snapshotter) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
Expand Down Expand Up @@ -614,6 +617,10 @@ func (s *Snapshotter) updateNodeDeltaStreamServiceResources(nodeID, typeURL stri
}
}

log.Logger.Info("Updating service snapshot after delta discovery request",
"resources", newTypedResources,
"len(resources)", len(newSnapResources),
)
nodeResources[streamID].services[typeURL] = newSnapResources
nodeResources[streamID].servicesNames[typeURL] = newTypedResources
updatedNode := &Node{
Expand Down Expand Up @@ -805,7 +812,7 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
node := n.(*Node)
sNodeResources, ok := node.resources[streamID]
if !ok {
log.Logger.Warn("Cannot check if snapshot needs updating, strema not found", "id", streamID)
log.Logger.Warn("Cannot check if snapshot needs updating, stream not found", "id", streamID)
return false
}
if mapTypeURL(typeURL) == "services" {
Expand All @@ -817,6 +824,33 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
return false
}

// needToUpdateDeltaSubscriptions returns true if a delta request changes the
// list of subscribed resources
func (s *Snapshotter) needToUpdateDeltaSubscriptions(nodeID, typeURL string, streamID int64, subscribe, unsubscribe []string) bool {
n, ok := s.nodes.Load(nodeID)
if !ok {
return false
}
node := n.(*Node)
sNodeResources, ok := node.resources[streamID]
if !ok {
log.Logger.Warn("Cannot check if delta subscriptions changed, stream not found", "id", streamID)
return false
}
resources := sNodeResources.servicesNames[typeURL]
for _, r := range subscribe {
if !slices.Contains(resources, r) {
return true
}
}
for _, r := range unsubscribe {
if slices.Contains(resources, r) {
return true
}
}
return false
}

// makeDummyResources blindly creates default configuration resources to snapshot based on the requested names.
// For endpoints it will create a single localhost one
func (s *Snapshotter) makeDummyResources(typeURL string, resources []string) ([]types.Resource, error) {
Expand Down

0 comments on commit 8f906e7

Please sign in to comment.