Skip to content

Commit

Permalink
correctly handle domains from NodePools when honoring taints
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Aug 15, 2024
1 parent c4a7cb0 commit bc2d87a
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 65 deletions.
35 changes: 14 additions & 21 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -235,8 +234,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
nodePoolList.OrderByWeight()

instanceTypes := map[string][]*cloudprovider.InstanceType{}
domains := map[string]sets.Set[string]{}
var notReadyNodePools []string
domainGroups := map[string]scheduler.TopologyDomainGroup{}
for _, nodePool := range nodePoolList.Items {
// Get instance type options
instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool))
Expand All @@ -252,6 +250,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
continue
}
instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...)
nodePoolTaints := nodePool.Spec.Template.Spec.Taints

// Construct Topology Domains
for _, instanceType := range instanceTypeOptions {
Expand All @@ -261,15 +260,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
requirements.Add(instanceType.Requirements.Values()...)

for key, requirement := range requirements {
// This code used to execute a Union between domains[key] and requirement.Values().
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
// This resulted in a lot of memory pressure on the heap and poor performance
// https://github.com/aws/karpenter/issues/3565
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
for topologyKey, requirement := range requirements {
if _, ok := domainGroups[topologyKey]; !ok {
domainGroups[topologyKey] = scheduler.NewTopologyDomainGroup()
}
for _, domain := range requirement.Values() {
domainGroups[topologyKey].Insert(domain, nodePoolTaints...)
}
}
}
Expand All @@ -278,23 +274,20 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
for key, requirement := range requirements {
if requirement.Operator() == corev1.NodeSelectorOpIn {
// The following is a performance optimisation, for the explanation see the comment above
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
if _, ok := domainGroups[key]; !ok {
domainGroups[key] = scheduler.NewTopologyDomainGroup()
}
for _, value := range requirement.Values() {
domainGroups[key].Insert(value, nodePoolTaints...)
}
}
}
}
if len(notReadyNodePools) > 0 {
log.FromContext(ctx).WithValues("nodePools", nodePoolList).Info("skipped nodePools, not ready")
}
// inject topology constraints
pods = p.injectVolumeTopologyRequirements(ctx, pods)

// Calculate cluster topology
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domainGroups, pods)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
fakecr "sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -167,7 +166,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
client := fakecr.NewFakeClient()
pods := makeDiversePods(podCount)
cluster = state.NewCluster(&clock.RealClock{}, client)
domains := map[string]sets.Set[string]{}
domains := map[string]scheduling.TopologyDomainGroup{}
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
if err != nil {
b.Fatalf("creating topology, %s", err)
Expand Down
16 changes: 10 additions & 6 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ type Topology struct {
// in some cases.
inverseTopologies map[uint64]*TopologyGroup
// The universe of domains by topology key
domains map[string]sets.Set[string]
domainGroups map[string]TopologyDomainGroup
// excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate
// moving pods to prevent them from being double counted.
excludedPods sets.Set[string]
cluster *state.Cluster
}

func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*corev1.Pod) (*Topology, error) {
func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domainGroups map[string]TopologyDomainGroup, pods []*corev1.Pod) (*Topology, error) {
t := &Topology{
kubeClient: kubeClient,
cluster: cluster,
domains: domains,
domainGroups: domainGroups,
topologies: map[uint64]*TopologyGroup{},
inverseTopologies: map[uint64]*TopologyGroup{},
excludedPods: sets.New[string](),
Expand Down Expand Up @@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -269,6 +269,10 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
// capture new domain values from existing nodes that may not have any pods selected by the topology group
// scheduled to them already
t.cluster.ForEachNode(func(n *state.StateNode) bool {
// ignore state nodes which are tracking in-flight NodeClaims
if n.Node == nil {
return true
}
// ignore the node if it doesn't match the topology group
if !tg.nodeFilter.Matches(n.Node) {
return true
Expand Down Expand Up @@ -330,7 +334,7 @@ func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace),
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domains[cs.TopologyKey]))
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -367,7 +371,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down
Loading

0 comments on commit bc2d87a

Please sign in to comment.