Skip to content

Commit

Permalink
[release-1.9] Extract scheduler config in a dedicate struct instead o…
Browse files Browse the repository at this point in the history
…f many parameters (#6740)

This is an automated cherry-pick of #6736

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Feb 9, 2023
1 parent 4efda71 commit 1a221ab
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 40 deletions.
26 changes: 10 additions & 16 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"

"knative.dev/eventing/pkg/scheduler"
st "knative.dev/eventing/pkg/scheduler/state"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
st "knative.dev/eventing/pkg/scheduler/state"
)

type Autoscaler interface {
Expand Down Expand Up @@ -60,24 +61,17 @@ type autoscaler struct {
lock sync.Locker
}

func NewAutoscaler(ctx context.Context,
namespace, name string,
lister scheduler.VPodLister,
stateAccessor st.StateAccessor,
evictor scheduler.Evictor,
refreshPeriod time.Duration,
capacity int32) Autoscaler {

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) Autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetName: name,
vpodLister: lister,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: evictor,
evictor: cfg.Evictor,
trigger: make(chan int32, 1),
capacity: capacity,
refreshPeriod: refreshPeriod,
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
}
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
v1 "k8s.io/client-go/listers/core/v1"
gtesting "k8s.io/client-go/testing"

listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"

listers "knative.dev/eventing/pkg/reconciler/testing/v1"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/scheduler"
"knative.dev/eventing/pkg/scheduler/state"
Expand Down Expand Up @@ -377,7 +378,15 @@ func TestAutoscaler(t *testing.T) {
return nil
}

autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, noopEvictor, 10*time.Second, int32(10)).(*autoscaler)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
Evictor: noopEvictor,
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down Expand Up @@ -425,7 +434,15 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
return nil
}

autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, noopEvictor, 2*time.Second, int32(10)).(*autoscaler)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
Evictor: noopEvictor,
RefreshPeriod: 2 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)

done := make(chan bool)
go func() {
Expand Down Expand Up @@ -846,7 +863,15 @@ func TestCompactor(t *testing.T) {
return nil
}

autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, recordEviction, 10*time.Second, int32(10)).(*autoscaler)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
Evictor: recordEviction,
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down
75 changes: 57 additions & 18 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,40 @@ import (
_ "knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount"
)

type Config struct {
StatefulSetNamespace string `json:"statefulSetNamespace"`
StatefulSetName string `json:"statefulSetName"`

// PodCapacity max capacity for each StatefulSet's pod.
PodCapacity int32 `json:"podCapacity"`
// Autoscaler refresh period
RefreshPeriod time.Duration `json:"refreshPeriod"`

SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"`
SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"`
DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"`

Evictor scheduler.Evictor `json:"-"`

VPodLister scheduler.VPodLister `json:"-"`
NodeLister corev1listers.NodeLister `json:"-"`
}

func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {

podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)

go autoscaler.Start(ctx)

return newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister), nil
}

// NewScheduler creates a new scheduler with pod autoscaling enabled.
// Deprecated: Use New
func NewScheduler(ctx context.Context,
namespace, name string,
lister scheduler.VPodLister,
Expand All @@ -69,15 +102,21 @@ func NewScheduler(ctx context.Context,
schedPolicy *scheduler.SchedulerPolicy,
deschedPolicy *scheduler.SchedulerPolicy) scheduler.Scheduler {

podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(namespace)

stateAccessor := st.NewStateBuilder(ctx, namespace, name, lister, capacity, schedulerPolicy, schedPolicy, deschedPolicy, podLister, nodeLister)
autoscaler := NewAutoscaler(ctx, namespace, name, lister, stateAccessor, evictor, refreshPeriod, capacity)

go autoscaler.Start(ctx)

return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister)
cfg := &Config{
StatefulSetNamespace: namespace,
StatefulSetName: name,
PodCapacity: capacity,
RefreshPeriod: refreshPeriod,
SchedulerPolicy: schedulerPolicy,
SchedPolicy: schedPolicy,
DeschedPolicy: deschedPolicy,
Evictor: evictor,
VPodLister: lister,
NodeLister: nodeLister,
}

s, _ := New(ctx, cfg)
return s
}

// StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods
Expand Down Expand Up @@ -106,20 +145,20 @@ type StatefulSetScheduler struct {
reserved map[types.NamespacedName]map[string]int32
}

func NewStatefulSetScheduler(ctx context.Context,
namespace, name string,
lister scheduler.VPodLister,
func newStatefulSetScheduler(ctx context.Context,
cfg *Config,
stateAccessor st.StateAccessor,
autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {
autoscaler Autoscaler,
podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {

scheduler := &StatefulSetScheduler{
ctx: ctx,
logger: logging.FromContext(ctx),
statefulSetNamespace: namespace,
statefulSetName: name,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetNamespace: cfg.StatefulSetNamespace,
statefulSetName: cfg.StatefulSetName,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
podLister: podlister,
vpodLister: lister,
vpodLister: cfg.VPodLister,
pending: make(map[types.NamespacedName]int32),
lock: new(sync.Mutex),
stateAccessor: stateAccessor,
Expand All @@ -130,7 +169,7 @@ func NewStatefulSetScheduler(ctx context.Context,
// Monitor our statefulset
statefulsetInformer := statefulsetinformer.Get(ctx)
statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(namespace, name),
FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName),
Handler: controller.HandleAll(scheduler.updateStatefulset),
})

Expand Down
14 changes: 12 additions & 2 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,12 @@ func TestStatefulsetScheduler(t *testing.T) {
lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)
sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)
if tc.pending != nil {
s.pending = tc.pending
}
Expand Down Expand Up @@ -867,7 +872,12 @@ func TestReservePlacements(t *testing.T) {
vpodClient := tscheduler.NewVPodClient()
vpodClient.Append(tc.vpod)

s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, nil, nil, nil).(*StatefulSetScheduler)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, nil, nil, nil).(*StatefulSetScheduler)
s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve

s.reservePlacements(tc.vpod, tc.placements) //new reserve
Expand Down

0 comments on commit 1a221ab

Please sign in to comment.