Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for setting circuit breakers #229

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codegen.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21
FROM golang:1.22

ARG USER=$USER
ARG UID=$UID
Expand Down
22 changes: 22 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"google.golang.org/protobuf/types/known/wrapperspb"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -209,6 +210,7 @@ func (c *Controller) servicesToXdsServiceStore() (xds.XdsServiceStore, error) {
policy := xds.ParseToClusterLbPolicy(xdsSvc.Spec.LoadBalancing.Policy)

store.AddOrUpdate(svc, xds.Service{
CircuitBreakers: extractCircuitBreakers(xdsSvc.Spec.CircuitBreakers),
EnableRemoteEndpoints: pointer.BoolDeref(xdsSvc.Spec.EnableRemoteEndpoints, false),
Policy: policy,
RingHash: extractRingHashConfig(policy, xdsSvc.Spec.LoadBalancing.RingHash),
Expand Down Expand Up @@ -347,3 +349,23 @@ func extractRetryPolicy(policy *v1alpha1.XdsServiceSpecRetry) *routev3.RetryPoli
RetryBackOff: xds.ParseRetryBackOff(policy.RetryBackOff.BaseInterval, policy.RetryBackOff.MaxInterval),
}
}

func extractCircuitBreakers(circuitbreakers *v1alpha1.XdsServiceSpecCircuitBreakers) *clusterv3.CircuitBreakers {
if circuitbreakers == nil {
return nil
}
if circuitbreakers.Thresholds == nil {
return nil
}
return &clusterv3.CircuitBreakers{
Thresholds: []*clusterv3.CircuitBreakers_Thresholds{
&clusterv3.CircuitBreakers_Thresholds{
Priority: corev3.RoutingPriority_DEFAULT,
MaxConnections: xds.ParseCircuitBreakerAttribute(circuitbreakers.Thresholds.MaxConnections),
MaxPendingRequests: xds.ParseCircuitBreakerAttribute(circuitbreakers.Thresholds.MaxPendingRequests),
MaxRequests: xds.ParseCircuitBreakerAttribute(circuitbreakers.Thresholds.MaxRequests),
MaxRetries: xds.ParseCircuitBreakerAttribute(circuitbreakers.Thresholds.MaxRetries),
},
},
}
}
37 changes: 37 additions & 0 deletions deploy/kustomize/cluster/semaphore-xds.uw.systems_xdsservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,43 @@ spec:
description: XdsServiceSpec defines the desired config for a service served
via xDS
properties:
circuitBreakers:
description: |-
CircuitBreakers define limits for client upstream connections to the
service servers.
properties:
thresholds:
description: Specifies limits which apply to each all hosts in
a cluster.
properties:
maxConnections:
description: |-
The maximum number of connections that the client will make to the
upstream cluster. If not specified, the default is 1024.
format: int32
type: integer
maxPendingRequests:
description: |-
The maximum number of pending requests that the client will allow to
the upstream cluster. If not specified, the default is 1024.
This limit is applied as a connection limit for non-HTTP traffic.
format: int32
type: integer
maxRequests:
description: |-
The maximum number of parallel requests the client will make to the
upstream cluster. If not specified, the default is 1024.
This limit does not apply to non-HTTP traffic.
format: int32
type: integer
maxRetries:
description: |-
The maximum number of parallel retries that the client will allow to
the upstream cluster. If not specified, the default is 3.
format: int32
type: integer
type: object
type: object
enableRemoteEndpoints:
default: false
description: |-
Expand Down
30 changes: 30 additions & 0 deletions pkg/apis/semaphorexds/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,38 @@ type XdsServiceSpecRetry struct {
RetryBackOff XdsServiceSpecRetryBackoffPolicy `json:"backoff,omitempty"`
}

// XdsServiceSpecCircuitBreakersThresholds defines threshold values for targets
// of all priorities
type XdsServiceSpecCircuitBreakersThresholds struct {
// The maximum number of connections that the client will make to the
// upstream cluster. If not specified, the default is 1024.
MaxConnections *uint32 `json:"maxConnections,omitempty"`
// The maximum number of pending requests that the client will allow to
// the upstream cluster. If not specified, the default is 1024.
// This limit is applied as a connection limit for non-HTTP traffic.
MaxPendingRequests *uint32 `json:"maxPendingRequests,omitempty"`
// The maximum number of parallel requests the client will make to the
// upstream cluster. If not specified, the default is 1024.
// This limit does not apply to non-HTTP traffic.
MaxRequests *uint32 `json:"maxRequests,omitempty"`
// The maximum number of parallel retries that the client will allow to
// the upstream cluster. If not specified, the default is 3.
MaxRetries *uint32 `json:"maxRetries,omitempty"`
}

// XdsServiceSpecCircuitBreakers defines circuit breaker values for upstream
// client connections to target servers
type XdsServiceSpecCircuitBreakers struct {
// Specifies limits which apply to each all hosts in a cluster.
Thresholds *XdsServiceSpecCircuitBreakersThresholds `json:"thresholds,omitempty"`
}

// XdsServiceSpec defines the desired config for a service served via xDS
type XdsServiceSpec struct {
// CircuitBreakers define limits for client upstream connections to the
// service servers.
// +optional
CircuitBreakers *XdsServiceSpecCircuitBreakers `json:"circuitBreakers,omitempty"`
// EnableRemoteEndpoints determines whether this Service should look for
// endpoints (EndpointSlices) in remote clusters.
// +optional
Expand Down
62 changes: 62 additions & 0 deletions pkg/apis/semaphorexds/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// Service holds the data we need to represent a Kubernetes Service in xds
// configuration
type Service struct {
CircuitBreakers *clusterv3.CircuitBreakers
EnableRemoteEndpoints bool
Policy clusterv3.Cluster_LbPolicy
RingHash *clusterv3.Cluster_RingHashLbConfig
Expand Down Expand Up @@ -150,14 +151,15 @@ func makeListener(name, namespace, authority string, port int32, manager *anypb.
}
}

func makeCluster(name, namespace, authority string, port int32, policy clusterv3.Cluster_LbPolicy, ringHash *clusterv3.Cluster_RingHashLbConfig) *clusterv3.Cluster {
func makeCluster(name, namespace, authority string, port int32, policy clusterv3.Cluster_LbPolicy, ringHash *clusterv3.Cluster_RingHashLbConfig, cbs *clusterv3.CircuitBreakers) *clusterv3.Cluster {
clusterName := makeClusterName(name, namespace, port)
if authority != "" {
clusterName = makeXdstpClusterName(name, namespace, authority, port)
}
cluster := &clusterv3.Cluster{
Name: clusterName,
ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS},
CircuitBreakers: cbs,
LbPolicy: policy,
EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{
EdsConfig: &corev3.ConfigSource{
Expand Down Expand Up @@ -201,7 +203,7 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type
}
listener := makeListener(s.Service.Name, s.Service.Namespace, authority, port.Port, manager)
lsnr = append(lsnr, listener)
cluster := makeCluster(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Policy, s.RingHash)
cluster := makeCluster(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Policy, s.RingHash, s.CircuitBreakers)
cls = append(cls, cluster)
}
}
Expand Down
7 changes: 7 additions & 0 deletions xds/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ func ParseRetryBackOff(base, max string) *routev3.RetryPolicy_RetryBackOff {
}
}

func ParseCircuitBreakerAttribute(a *uint32) *wrappers.UInt32Value {
if a == nil {
return nil
}
return &wrappers.UInt32Value{Value: *a}
}

// resourcesMatch checks if 2 slices contain the same set of resources
func resourcesMatch(a, b []string) bool {
if len(a) != len(b) {
Expand Down