Skip to content

Commit

Permalink
Include all_kube_services route for potential envoy clients
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Dec 10, 2024
1 parent f541708 commit 5fa3265
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 16 deletions.
22 changes: 11 additions & 11 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestReconcileServices_LabelledService(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify the correct lb policy (ring hash) is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestReconcileServices_XdsService(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have one Endpoint resource in the snapshot
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses from both local(2) and remote(2). 4 lbEndpoint addresses in
// total. Also verify that all priorities are set to 0.
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have 1 Endpoint resource in the snapshot containing
// only local client addresses.
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have 1 Endpoint resource in the snapshot containing
// only remote addresses (2).
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses for local endpoints with priority 0 and for remote ones
// with priority 1.
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t *
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses for remote endpoints with priority 0, regardless of
// PriorityStrategy set to local-first.
Expand Down
45 changes: 45 additions & 0 deletions xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func makeRouteConfig(name, namespace, authority string, port int32, retry *route
}
return routeConfig(routeName, clusterName, virtualHostName, domains, retry, hashPolicies)
}

func routeConfig(routeName, clusterName, virtualHostName string, domains []string, retry *routev3.RetryPolicy, hashPolicies []*routev3.RouteAction_HashPolicy) *routev3.RouteConfiguration {
return &routev3.RouteConfiguration{
Name: routeName,
Expand Down Expand Up @@ -123,6 +124,47 @@ func routeConfig(routeName, clusterName, virtualHostName string, domains []strin
}
}

// makeAllKubeServicesRouteConfig will return all the available routes in a
// Kubernetes cluster. It is meant to be combined with envoy clients that will
// also configure on-demand CDS and EDS for lazy resources discovery.
func makeAllKubeServicesRouteConfig(serviceStore XdsServiceStore) *routev3.RouteConfiguration {
vh := []*routev3.VirtualHost{}
for _, s := range serviceStore.All() {
for _, port := range s.Service.Spec.Ports {
clusterName := makeClusterName(s.Service.Name, s.Service.Namespace, port.Port)
virtualHostName := makeVirtualHostName(s.Service.Name, s.Service.Namespace, port.Port)
domains := []string{makeGlobalServiceDomain(s.Service.Name, s.Service.Namespace, port.Port)}
vh = append(vh, &routev3.VirtualHost{
Name: virtualHostName,
Domains: domains,
Routes: []*routev3.Route{{
Match: &routev3.RouteMatch{
PathSpecifier: &routev3.RouteMatch_Prefix{
Prefix: "",
},
},
Action: &routev3.Route_Route{
Route: &routev3.RouteAction{
ClusterSpecifier: &routev3.RouteAction_Cluster{
Cluster: clusterName,
},
HashPolicy: s.RingHashPolicies,
},
},
}},
RetryPolicy: s.Retry,
})
}
}
if len(vh) > 0 {
return &routev3.RouteConfiguration{
Name: "all_kube_routes",
VirtualHosts: vh,
}
}
return nil
}

func makeManager(routeConfig *routev3.RouteConfiguration) (*anypb.Any, error) {
router, _ := anypb.New(&routerv3.Router{})
return anypb.New(&managerv3.HttpConnectionManager{
Expand Down Expand Up @@ -216,5 +258,8 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type
cls = append(cls, cluster)
}
}
if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil {
rds = append(rds, allKubeRoutes)
}
return cls, rds, lsnr, nil
}
2 changes: 2 additions & 0 deletions xds/snapshotter_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func TestSnapMetricsCollector(t *testing.T) {
fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpListenerName, "test-node", expectHttpRouteName, resource.ListenerType),
fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpsListenerName, "test-node", expectHttpsRouteName, resource.ListenerType),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, expectHttpRouteName, EmptyNodeID, resource.RouteType, expectHttpVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, expectHttpsRouteName, EmptyNodeID, resource.RouteType, expectHttpsVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpsVhost),
fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpClusterName, EmptyNodeID, resource.ClusterType),
fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpsClusterName, EmptyNodeID, resource.ClusterType),
fmt.Sprintf(`semaphore_xds_snapshot_endpoint{cluster_name="%s",health_status="healthy",lb_address="%s",locality_subzone="foo-xzf",locality_zone="test",node_id="%s",priority="0",type="%s"} 1`, expectHttpClusterName, "10.2.1.1:80", EmptyNodeID, resource.EndpointType),
Expand Down
14 changes: 9 additions & 5 deletions xds/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSnapServices_SingleService(t *testing.T) {
// cluster
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
}

func TestSnapServices_NoServicePorts(t *testing.T) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSnapServices_MultipleServicePorts(t *testing.T) {
// per port
assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
}

func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) {
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) {
}
assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
// Client requesting more resources again should bring them back into the node snapshot
assert.Equal(t, true, snapshotter.needToUpdateSnapshot(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}))
if err := snapshotter.updateStreamNodeResources(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}); err != nil {
Expand Down Expand Up @@ -504,8 +504,12 @@ func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) {
}
assert.Contains(t, expectedClusters, cluster.Name)
}
expectedRoutes := []string{makeRouteConfigName("foo", "bar", int32(80)), makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80))}
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType)))
expectedRoutes := []string{
makeRouteConfigName("foo", "bar", int32(80)),
makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80)),
"all_kube_routes",
}
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
for _, res := range snap.GetResources(resource.RouteType) {
route, err := UnmarshalResourceToRouteConfiguration(res)
if err != nil {
Expand Down

0 comments on commit 5fa3265

Please sign in to comment.