Skip to content

Commit

Permalink
[release-1.11] Scheduler: fix reserved replicas handling, blocking au…
Browse files Browse the repository at this point in the history
…toscaler and overcommitted pods (#7374)

* Scheduler: fix reserved replicas handling, blocking autoscaler and overcommitted pods

There are extensive comments in the actual code changes on the why
of each individual change.

- Properly handle overcommitted pods
- Don't block the scheduler on triggering the autoscaler if the autoscaler
  is active
- Additional fix for #6733
- various logging improvements (leader, state, actions and context)

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add unit tests for overcommitted pods

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Oct 17, 2023
1 parent c1626f1 commit 0dadfd9
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
// Assert the pod is not overcommitted
if free[ordinal] < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (a *autoscaler) Demote(b reconciler.Bucket) {

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx),
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
Expand All @@ -133,8 +133,10 @@ func (a *autoscaler) Start(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(a.refreshPeriod):
a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = true
case <-a.trigger:
a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = false
}

Expand All @@ -145,9 +147,14 @@ func (a *autoscaler) Start(ctx context.Context) {
}

func (a *autoscaler) Autoscale(ctx context.Context) {
select {
// We trigger the autoscaler asynchronously by using the channel so that the scale down refresh
// period is reset.
a.trigger <- struct{}{}
case a.trigger <- struct{}{}:
default:
// We don't want to block if the channel's buffer is full, it will be triggered eventually.

}
}

func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) error {
Expand Down
66 changes: 50 additions & 16 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla
s.reservedMu.Lock()
defer s.reservedMu.Unlock()

vpods, err := s.vpodLister()
if err != nil {
return nil, err
}
vpodFromLister := st.GetVPod(vpod.GetKey(), vpods)
if vpodFromLister != nil && vpod.GetResourceVersion() != vpodFromLister.GetResourceVersion() {
return nil, fmt.Errorf("vpod to schedule has resource version different from one in indexer")
}

placements, err := s.scheduleVPod(vpod)
if placements == nil {
return placements, err
Expand All @@ -253,7 +244,7 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla
}

func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) {
logger := s.logger.With("key", vpod.GetKey())
logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler"))
// Get the current placements state
// Quite an expensive operation but safe and simple.
state, err := s.stateAccessor.State(s.reserved)
Expand All @@ -262,18 +253,60 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1
return nil, err
}

// Clean up reserved from removed resources that don't appear in the vpod list anymore and have
// no pending resources.
reserved := make(map[types.NamespacedName]map[string]int32)
for k, v := range s.reserved {
if pendings, ok := state.Pending[k]; ok {
if pendings == 0 {
reserved[k] = map[string]int32{}
} else {
reserved[k] = v
}
}
}
s.reserved = reserved

logger.Debugw("scheduling", zap.Any("state", state))

existingPlacements := vpod.GetPlacements()
var left int32

// Remove unschedulable pods from placements
// Remove unschedulable or adjust overcommitted pods from placements
var placements []duckv1alpha1.Placement
if len(existingPlacements) > 0 {
placements = make([]duckv1alpha1.Placement, 0, len(existingPlacements))
for _, p := range existingPlacements {
if state.IsSchedulablePod(st.OrdinalFromPodName(p.PodName)) {
placements = append(placements, *p.DeepCopy())
p := p.DeepCopy()
ordinal := st.OrdinalFromPodName(p.PodName)

if !state.IsSchedulablePod(ordinal) {
continue
}

// Handle overcommitted pods.
if state.FreeCap[ordinal] < 0 {
// vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4
// vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4
// vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3

overcommit := -state.FreeCap[ordinal]

if p.VReplicas >= overcommit {
state.SetFree(ordinal, 0)
state.Pending[vpod.GetKey()] += overcommit

p.VReplicas = p.VReplicas - overcommit
} else {
state.SetFree(ordinal, p.VReplicas-overcommit)
state.Pending[vpod.GetKey()] += p.VReplicas

p.VReplicas = 0
}
}

if p.VReplicas > 0 {
placements = append(placements, *p)
}
}
}
Expand Down Expand Up @@ -312,7 +345,7 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1
} else { //Predicates and priorities must be used for scheduling
// Need less => scale down
if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil {
logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()))
logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()))
placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements)

// Do not trigger the autoscaler to avoid unnecessary churn
Expand All @@ -325,17 +358,18 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1
// Need more => scale up
// rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation
// can fall here when vreps scaled up or after eviction
logger.Debugw("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()))
logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()))
placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements)
}
}

if left > 0 {
// Give time for the autoscaler to do its job
logger.Info("not enough pod replicas to schedule. Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left))
logger.Infow("not enough pod replicas to schedule")

// Trigger the autoscaler
if s.autoscaler != nil {
logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left))
s.autoscaler.Autoscale(s.ctx)
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,32 @@ func TestStatefulsetScheduler(t *testing.T) {
},
},
},
{
name: "two replicas, 12 vreplicas, already scheduled on overcommitted pod, remove replicas",
vreplicas: 12,
replicas: int32(2),
placements: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 12},
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 10},
{PodName: "statefulset-name-1", VReplicas: 2},
},
schedulerPolicyType: scheduler.MAXFILLUP,
},
{
name: "one replica, 12 vreplicas, already scheduled on overcommitted pod, remove replicas",
vreplicas: 12,
replicas: int32(1),
placements: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 12},
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 10},
},
err: controller.NewRequeueAfter(5 * time.Second),
schedulerPolicyType: scheduler.MAXFILLUP,
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 0dadfd9

Please sign in to comment.