diff --git a/cloudtest/aws.go b/cloudtest/aws.go deleted file mode 100644 index 2879247f..00000000 --- a/cloudtest/aws.go +++ /dev/null @@ -1,68 +0,0 @@ -package cloudtest - -import ( - "errors" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/autoscaling" - "github.com/aws/aws-sdk-go/service/ec2" - "github.com/aws/aws-sdk-go/service/ec2/ec2iface" -) - -type MockEC2Service struct { - ec2iface.EC2API - Templates []*ec2.LaunchTemplate -} - -func (m *MockEC2Service) DescribeLaunchTemplates(_ *ec2.DescribeLaunchTemplatesInput) (*ec2.DescribeLaunchTemplatesOutput, error) { - output := &ec2.DescribeLaunchTemplatesOutput{ - LaunchTemplates: m.Templates, - } - return output, nil -} - -func (m *MockEC2Service) DescribeLaunchTemplateByID(input *ec2.DescribeLaunchTemplatesInput) (*ec2.LaunchTemplate, error) { - for _, template := range m.Templates { - if template.LaunchTemplateId == input.LaunchTemplateIds[0] { - return template, nil - } - if template.LaunchTemplateName == input.LaunchTemplateNames[0] { - return template, nil - } - } - return nil, errors.New("not found") -} - -func CreateTestAutoScalingGroup(name, launchConfigurationName string, launchTemplateSpecification *autoscaling.LaunchTemplateSpecification, instances []*autoscaling.Instance) *autoscaling.Group { - asg := &autoscaling.Group{ - AutoScalingGroupName: aws.String(name), - Instances: instances, - } - if len(launchConfigurationName) != 0 { - asg.SetLaunchConfigurationName(launchConfigurationName) - } - if launchTemplateSpecification != nil { - asg.SetLaunchTemplate(launchTemplateSpecification) - } - return asg -} - -func CreateTestAutoScalingInstance(id, launchConfigurationName string, launchTemplateSpecification *autoscaling.LaunchTemplateSpecification, healthStatus string) *autoscaling.Instance { - instance := &autoscaling.Instance{ - HealthStatus: aws.String(healthStatus), - InstanceId: aws.String(id), - } - if len(launchConfigurationName) != 0 { - instance.SetLaunchConfigurationName(launchConfigurationName) - } - if launchTemplateSpecification != nil { - instance.SetLaunchTemplate(launchTemplateSpecification) - } - return instance -} - -func CreateTestEc2Instance(id string) *ec2.Instance { - instance := &ec2.Instance{ - InstanceId: aws.String(id), - } - return instance -} diff --git a/cloudtest/cloudtest.go b/cloudtest/cloudtest.go new file mode 100644 index 00000000..499934e5 --- /dev/null +++ b/cloudtest/cloudtest.go @@ -0,0 +1,132 @@ +package cloudtest + +import ( + "errors" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" +) + +type MockEC2Service struct { + ec2iface.EC2API + + Counter map[string]int64 + Templates []*ec2.LaunchTemplate +} + +func NewMockEC2Service(templates []*ec2.LaunchTemplate) *MockEC2Service { + return &MockEC2Service{ + Counter: make(map[string]int64), + Templates: templates, + } +} + +func (m *MockEC2Service) DescribeLaunchTemplates(_ *ec2.DescribeLaunchTemplatesInput) (*ec2.DescribeLaunchTemplatesOutput, error) { + m.Counter["DescribeLaunchTemplates"]++ + output := &ec2.DescribeLaunchTemplatesOutput{ + LaunchTemplates: m.Templates, + } + return output, nil +} + +func (m *MockEC2Service) DescribeLaunchTemplateByID(input *ec2.DescribeLaunchTemplatesInput) (*ec2.LaunchTemplate, error) { + m.Counter["DescribeLaunchTemplateByID"]++ + for _, template := range m.Templates { + if template.LaunchTemplateId == input.LaunchTemplateIds[0] { + return template, nil + } + if template.LaunchTemplateName == input.LaunchTemplateNames[0] { + return template, nil + } + } + return nil, errors.New("not found") +} + +func CreateTestEc2Instance(id string) *ec2.Instance { + instance := &ec2.Instance{ + InstanceId: aws.String(id), + } + return instance +} + +type MockAutoScalingService struct { + autoscalingiface.AutoScalingAPI + + Counter map[string]int64 + AutoScalingGroups map[string]*autoscaling.Group +} + +func NewMockAutoScalingService(autoScalingGroups []*autoscaling.Group) *MockAutoScalingService { + service := &MockAutoScalingService{ + Counter: make(map[string]int64), + AutoScalingGroups: make(map[string]*autoscaling.Group), + } + for _, autoScalingGroup := range autoScalingGroups { + service.AutoScalingGroups[aws.StringValue(autoScalingGroup.AutoScalingGroupName)] = autoScalingGroup + } + return service +} + +func (m *MockAutoScalingService) TerminateInstanceInAutoScalingGroup(_ *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) { + m.Counter["TerminateInstanceInAutoScalingGroup"]++ + return &autoscaling.TerminateInstanceInAutoScalingGroupOutput{}, nil +} + +func (m *MockAutoScalingService) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) { + m.Counter["DescribeAutoScalingGroups"]++ + var autoScalingGroups []*autoscaling.Group + for _, autoScalingGroupName := range input.AutoScalingGroupNames { + for _, autoScalingGroup := range m.AutoScalingGroups { + if aws.StringValue(autoScalingGroupName) == aws.StringValue(autoScalingGroup.AutoScalingGroupName) { + autoScalingGroups = append(autoScalingGroups, autoScalingGroup) + } + } + } + return &autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: autoScalingGroups, + }, nil +} + +func (m *MockAutoScalingService) SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) { + m.Counter["SetDesiredCapacity"]++ + m.AutoScalingGroups[aws.StringValue(input.AutoScalingGroupName)].SetDesiredCapacity(aws.Int64Value(input.DesiredCapacity)) + return &autoscaling.SetDesiredCapacityOutput{}, nil +} + +func (m *MockAutoScalingService) UpdateAutoScalingGroup(_ *autoscaling.UpdateAutoScalingGroupInput) (*autoscaling.UpdateAutoScalingGroupOutput, error) { + m.Counter["UpdateAutoScalingGroup"]++ + return &autoscaling.UpdateAutoScalingGroupOutput{}, nil +} + +func CreateTestAutoScalingGroup(name, launchConfigurationName string, launchTemplateSpecification *autoscaling.LaunchTemplateSpecification, instances []*autoscaling.Instance) *autoscaling.Group { + asg := &autoscaling.Group{ + AutoScalingGroupName: aws.String(name), + Instances: instances, + DesiredCapacity: aws.Int64(int64(len(instances))), + MinSize: aws.Int64(0), + MaxSize: aws.Int64(999), + } + if len(launchConfigurationName) != 0 { + asg.SetLaunchConfigurationName(launchConfigurationName) + } + if launchTemplateSpecification != nil { + asg.SetLaunchTemplate(launchTemplateSpecification) + } + return asg +} + +func CreateTestAutoScalingInstance(id, launchConfigurationName string, launchTemplateSpecification *autoscaling.LaunchTemplateSpecification, lifeCycleState string) *autoscaling.Instance { + instance := &autoscaling.Instance{ + LifecycleState: aws.String(lifeCycleState), + InstanceId: aws.String(id), + } + if len(launchConfigurationName) != 0 { + instance.SetLaunchConfigurationName(launchConfigurationName) + } + if launchTemplateSpecification != nil { + instance.SetLaunchTemplate(launchTemplateSpecification) + } + return instance +} diff --git a/k8stest/k8stest.go b/k8stest/k8stest.go index 6cdfc008..11cfe8cb 100644 --- a/k8stest/k8stest.go +++ b/k8stest/k8stest.go @@ -8,27 +8,38 @@ import ( type MockKubernetesClient struct { Counter map[string]int64 - nodes []v1.Node - pods []v1.Pod + Nodes map[string]v1.Node + Pods map[string]v1.Pod } func NewMockKubernetesClient(nodes []v1.Node, pods []v1.Pod) *MockKubernetesClient { - return &MockKubernetesClient{ + client := &MockKubernetesClient{ Counter: make(map[string]int64), - nodes: nodes, - pods: pods, + Nodes: make(map[string]v1.Node), + Pods: make(map[string]v1.Pod), } + for _, node := range nodes { + client.Nodes[node.Name] = node + } + for _, pod := range pods { + client.Pods[pod.Name] = pod + } + return client } func (mock *MockKubernetesClient) GetNodes() ([]v1.Node, error) { mock.Counter["GetNodes"]++ - return mock.nodes, nil + var nodes []v1.Node + for _, node := range mock.Nodes { + nodes = append(nodes, node) + } + return nodes, nil } func (mock *MockKubernetesClient) GetPodsInNode(node string) ([]v1.Pod, error) { mock.Counter["GetPodsInNode"]++ var pods []v1.Pod - for _, pod := range mock.pods { + for _, pod := range mock.Pods { if pod.Spec.NodeName == node { pods = append(pods, pod) } @@ -38,7 +49,7 @@ func (mock *MockKubernetesClient) GetPodsInNode(node string) ([]v1.Pod, error) { func (mock *MockKubernetesClient) GetNodeByHostName(hostName string) (*v1.Node, error) { mock.Counter["GetNodeByHostName"]++ - for _, node := range mock.nodes { + for _, node := range mock.Nodes { // For the sake of simplicity, we'll just assume that the host name is the same as the node name if node.Name == hostName { return &node, nil @@ -49,6 +60,7 @@ func (mock *MockKubernetesClient) GetNodeByHostName(hostName string) (*v1.Node, func (mock *MockKubernetesClient) UpdateNode(node *v1.Node) error { mock.Counter["UpdateNode"]++ + mock.Nodes[node.Name] = *node return nil } @@ -68,6 +80,7 @@ func CreateTestNode(name string, allocatableCpu, allocatableMemory string) v1.No }, } node.SetName(name) + node.SetAnnotations(make(map[string]string)) return node } diff --git a/main.go b/main.go index 88588ad8..a2a1da9b 100644 --- a/main.go +++ b/main.go @@ -86,7 +86,11 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e log.Printf("[%s] outdated=%d; updated=%d; updatedAndReady=%d; asgCurrent=%d; asgDesired=%d; asgMax=%d", aws.StringValue(autoScalingGroup.AutoScalingGroupName), len(outdatedInstances), len(updatedInstances), len(updatedReadyNodes), len(autoScalingGroup.Instances), aws.Int64Value(autoScalingGroup.DesiredCapacity), aws.Int64Value(autoScalingGroup.MaxSize)) } - // XXX: this should be configurable (i.e. SLOW_ROLLING_UPDATE) + if int64(len(autoScalingGroup.Instances)) < aws.Int64Value(autoScalingGroup.DesiredCapacity) { + log.Printf("[%s] Skipping because ASG has a desired capacity of %d, but only has %d instances", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.Int64Value(autoScalingGroup.DesiredCapacity), len(autoScalingGroup.Instances)) + continue + } + if numberOfNonReadyNodesOrInstances != 0 { log.Printf("[%s] ASG has %d non-ready updated nodes/instances, waiting until all nodes/instances are ready", aws.StringValue(autoScalingGroup.AutoScalingGroupName), numberOfNonReadyNodesOrInstances) continue @@ -109,12 +113,9 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e err := k8s.AnnotateNodeByHostName(kubernetesClient, aws.StringValue(outdatedInstance.InstanceId), k8s.RollingUpdateStartedTimestampAnnotationKey, time.Now().Format(time.RFC3339)) if err != nil { log.Printf("[%s][%s] Unable to annotate node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) - // XXX: should we really skip here? log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) continue } - // TODO: increase desired instance by 1 (to create a new updated instance) - } else { log.Printf("[%s][%s] Node already started rollout process", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) // check if existing updatedInstances have the capacity to support what's inside this node @@ -129,6 +130,7 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) continue } else { + // Only annotate if no error was encountered _ = k8s.AnnotateNodeByHostName(kubernetesClient, aws.StringValue(outdatedInstance.InstanceId), k8s.RollingUpdateDrainedTimestampAnnotationKey, time.Now().Format(time.RFC3339)) } } else { @@ -142,28 +144,33 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e log.Printf("[%s][%s] Ran into error while terminating node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) continue } else { + // Only annotate if no error was encountered _ = k8s.AnnotateNodeByHostName(kubernetesClient, aws.StringValue(outdatedInstance.InstanceId), k8s.RollingUpdateTerminatedTimestampAnnotationKey, time.Now().Format(time.RFC3339)) } } else { log.Printf("[%s][%s] Node is already in the process of being terminated since %d minutes ago, skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), minutesSinceTerminated) - continue // TODO: check if minutesSinceTerminated > 10. If that happens, then there's clearly a problem, so we should do something about it + // The node has already been terminated, there's nothing to do here, continue to the next one + continue } + // If this code is reached, it means that the current node has been successfully drained and + // scheduled for termination. + // As a result, we return here to make sure that multiple old instances didn't use the same updated + // instances to calculate resources available + log.Printf("[%s][%s] Node has been drained and scheduled for termination successfully", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) return } else { log.Printf("[%s][%s] Updated nodes do not have enough resources available, increasing desired count by 1", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) // TODO: check if desired capacity matches (updatedInstances + outdatedInstances + 1) - err := cloud.SetAutoScalingGroupDesiredCount(autoScalingService, autoScalingGroup, *autoScalingGroup.DesiredCapacity+1) + err := cloud.SetAutoScalingGroupDesiredCount(autoScalingService, autoScalingGroup, aws.Int64Value(autoScalingGroup.DesiredCapacity)+1) if err != nil { log.Printf("[%s][%s] Unable to increase ASG desired size: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) continue } - return } } } - // TODO: Check if ASG hit max, and then decide what to do (patience or violence) } } @@ -171,20 +178,23 @@ func getReadyNodesAndNumberOfNonReadyNodesOrInstances(updatedInstances []*autosc var updatedReadyNodes []*v1.Node numberOfNonReadyNodesOrInstances := 0 for _, updatedInstance := range updatedInstances { - if *updatedInstance.LifecycleState != "InService" { + if aws.StringValue(updatedInstance.LifecycleState) != "InService" { numberOfNonReadyNodesOrInstances++ log.Printf("[%s][%s] Skipping because instance is not in LifecycleState 'InService', but is in '%s' instead", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(updatedInstance.InstanceId), aws.StringValue(updatedInstance.LifecycleState)) continue } updatedNode, err := kubernetesClient.GetNodeByHostName(aws.StringValue(updatedInstance.InstanceId)) if err != nil { - log.Printf("[%s][%s] Unable to get updated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(updatedInstance.InstanceId), err.Error()) - log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(updatedInstance.InstanceId)) + numberOfNonReadyNodesOrInstances++ + log.Printf("[%s][%s] Skipping because unable to get updated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(updatedInstance.InstanceId), err.Error()) continue } // Check if Kubelet is ready to accept pods on that node conditions := updatedNode.Status.Conditions - if kubeletCondition := conditions[len(conditions)-1]; kubeletCondition.Type == v1.NodeReady && kubeletCondition.Status == v1.ConditionTrue { + if len(conditions) == 0 { + log.Printf("[%s][%s] For some magical reason, %s doesn't have any conditions, therefore it is impossible to determine whether the node is ready to accept new pods or not", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(updatedInstance.InstanceId), updatedNode.Name) + numberOfNonReadyNodesOrInstances++ + } else if kubeletCondition := conditions[len(conditions)-1]; kubeletCondition.Type == v1.NodeReady && kubeletCondition.Status == v1.ConditionTrue { updatedReadyNodes = append(updatedReadyNodes, updatedNode) } else { numberOfNonReadyNodesOrInstances++ @@ -264,7 +274,9 @@ func SeparateOutdatedFromUpdatedInstances(asg *autoscaling.Group, ec2Svc ec2ifac targetLaunchConfiguration := asg.LaunchConfigurationName targetLaunchTemplate := asg.LaunchTemplate if targetLaunchTemplate == nil && asg.MixedInstancesPolicy != nil && asg.MixedInstancesPolicy.LaunchTemplate != nil { - log.Printf("[%s] using mixed instances policy launch template", aws.StringValue(asg.AutoScalingGroupName)) + if config.Get().Debug { + log.Printf("[%s] using mixed instances policy launch template", aws.StringValue(asg.AutoScalingGroupName)) + } targetLaunchTemplate = asg.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification } if targetLaunchTemplate != nil { @@ -283,13 +295,13 @@ func SeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate(targetLaunchTemplat err error ) switch { - case targetLaunchTemplate.LaunchTemplateId != nil && *targetLaunchTemplate.LaunchTemplateId != "": - if targetTemplate, err = cloud.DescribeLaunchTemplateByID(ec2Svc, *targetLaunchTemplate.LaunchTemplateId); err != nil { - return nil, nil, fmt.Errorf("error retrieving information about launch template %s: %v", *targetLaunchTemplate.LaunchTemplateId, err) + case targetLaunchTemplate.LaunchTemplateId != nil && aws.StringValue(targetLaunchTemplate.LaunchTemplateId) != "": + if targetTemplate, err = cloud.DescribeLaunchTemplateByID(ec2Svc, aws.StringValue(targetLaunchTemplate.LaunchTemplateId)); err != nil { + return nil, nil, fmt.Errorf("error retrieving information about launch template %s: %v", aws.StringValue(targetLaunchTemplate.LaunchTemplateId), err) } - case targetLaunchTemplate.LaunchTemplateName != nil && *targetLaunchTemplate.LaunchTemplateName != "": - if targetTemplate, err = cloud.DescribeLaunchTemplateByName(ec2Svc, *targetLaunchTemplate.LaunchTemplateName); err != nil { - return nil, nil, fmt.Errorf("error retrieving information about launch template name %s: %v", *targetLaunchTemplate.LaunchTemplateName, err) + case targetLaunchTemplate.LaunchTemplateName != nil && aws.StringValue(targetLaunchTemplate.LaunchTemplateName) != "": + if targetTemplate, err = cloud.DescribeLaunchTemplateByName(ec2Svc, aws.StringValue(targetLaunchTemplate.LaunchTemplateName)); err != nil { + return nil, nil, fmt.Errorf("error retrieving information about launch template name %s: %v", aws.StringValue(targetLaunchTemplate.LaunchTemplateName), err) } default: return nil, nil, fmt.Errorf("invalid launch template name") diff --git a/main_test.go b/main_test.go index 5bd06272..714630af 100644 --- a/main_test.go +++ b/main_test.go @@ -2,14 +2,17 @@ package main import ( "github.com/TwinProduction/aws-eks-asg-rolling-update-handler/cloudtest" + "github.com/TwinProduction/aws-eks-asg-rolling-update-handler/k8s" + "github.com/TwinProduction/aws-eks-asg-rolling-update-handler/k8stest" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" + v1 "k8s.io/api/core/v1" "testing" ) func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration_whenInstanceIsOutdated(t *testing.T) { - instance := cloudtest.CreateTestAutoScalingInstance("instance", "v1", nil, "Healthy") + instance := cloudtest.CreateTestAutoScalingInstance("instance", "v1", nil, "InService") outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration(aws.String("v2"), []*autoscaling.Instance{instance}) if err != nil { t.Fatal("Shouldn't have returned an error, but returned", err) @@ -20,7 +23,7 @@ func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration_whenInstan } func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration_whenInstanceIsUpdated(t *testing.T) { - instance := cloudtest.CreateTestAutoScalingInstance("instance", "v1", nil, "Healthy") + instance := cloudtest.CreateTestAutoScalingInstance("instance", "v1", nil, "InService") outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration(aws.String("v1"), []*autoscaling.Instance{instance}) if err != nil { t.Fatal("Shouldn't have returned an error, but returned", err) @@ -31,9 +34,9 @@ func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration_whenInstan } func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration_whenOneInstanceIsUpdatedAndTwoInstancesAreOutdated(t *testing.T) { - firstInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "Healthy") - secondInstance := cloudtest.CreateTestAutoScalingInstance("old-2", "v1", nil, "Healthy") - thirdInstance := cloudtest.CreateTestAutoScalingInstance("new", "v2", nil, "Healthy") + firstInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "InService") + secondInstance := cloudtest.CreateTestAutoScalingInstance("old-2", "v1", nil, "InService") + thirdInstance := cloudtest.CreateTestAutoScalingInstance("new", "v2", nil, "InService") outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchConfiguration(aws.String("v2"), []*autoscaling.Instance{firstInstance, secondInstance, thirdInstance}) if err != nil { t.Fatal("Shouldn't have returned an error, but returned", err) @@ -63,8 +66,8 @@ func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate_whenInstanceIsO LaunchTemplateId: updatedLaunchTemplate.LaunchTemplateId, LaunchTemplateName: updatedLaunchTemplate.LaunchTemplateName, } - instance := cloudtest.CreateTestAutoScalingInstance("instance", "", outdatedLaunchTemplate, "Healthy") - outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate(updatedLaunchTemplate, []*autoscaling.Instance{instance}, &cloudtest.MockEC2Service{Templates: []*ec2.LaunchTemplate{updatedEc2LaunchTemplate}}) + instance := cloudtest.CreateTestAutoScalingInstance("instance", "", outdatedLaunchTemplate, "InService") + outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate(updatedLaunchTemplate, []*autoscaling.Instance{instance}, cloudtest.NewMockEC2Service([]*ec2.LaunchTemplate{updatedEc2LaunchTemplate})) if err != nil { t.Fatal("Shouldn't have returned an error, but returned:", err) } @@ -85,8 +88,8 @@ func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate_whenInstanceIsU LaunchTemplateId: updatedLaunchTemplate.LaunchTemplateId, LaunchTemplateName: updatedLaunchTemplate.LaunchTemplateName, } - instance := cloudtest.CreateTestAutoScalingInstance("instance", "", updatedLaunchTemplate, "Healthy") - outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate(updatedLaunchTemplate, []*autoscaling.Instance{instance}, &cloudtest.MockEC2Service{Templates: []*ec2.LaunchTemplate{updatedEc2LaunchTemplate}}) + instance := cloudtest.CreateTestAutoScalingInstance("instance", "", updatedLaunchTemplate, "InService") + outdated, updated, err := SeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate(updatedLaunchTemplate, []*autoscaling.Instance{instance}, cloudtest.NewMockEC2Service([]*ec2.LaunchTemplate{updatedEc2LaunchTemplate})) if err != nil { t.Fatal("Shouldn't have returned an error, but returned:", err) } @@ -96,9 +99,9 @@ func TestSeparateOutdatedFromUpdatedInstancesUsingLaunchTemplate_whenInstanceIsU } func TestSeparateOutdatedFromUpdatedInstances_withLaunchConfigurationWhenOneInstanceIsUpdatedAndTwoInstancesAreOutdated(t *testing.T) { - firstInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "Healthy") - secondInstance := cloudtest.CreateTestAutoScalingInstance("old-2", "v1", nil, "Healthy") - thirdInstance := cloudtest.CreateTestAutoScalingInstance("new", "v2", nil, "Healthy") + firstInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "InService") + secondInstance := cloudtest.CreateTestAutoScalingInstance("old-2", "v1", nil, "InService") + thirdInstance := cloudtest.CreateTestAutoScalingInstance("new", "v2", nil, "InService") asg := cloudtest.CreateTestAutoScalingGroup("asg", "v2", nil, []*autoscaling.Instance{firstInstance, secondInstance, thirdInstance}) @@ -113,3 +116,102 @@ func TestSeparateOutdatedFromUpdatedInstances_withLaunchConfigurationWhenOneInst t.Error("1 instance should've been outdated") } } + +func TestHandleRollingUpgrade(t *testing.T) { + oldInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "InService") + asg := cloudtest.CreateTestAutoScalingGroup("asg", "v2", nil, []*autoscaling.Instance{oldInstance}) + + oldNode := k8stest.CreateTestNode(aws.StringValue(oldInstance.InstanceId), "1000m", "1000Mi") + oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "100m", "100Mi") + + mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode}, []v1.Pod{oldNodePod}) + mockEc2Service := cloudtest.NewMockEC2Service(nil) + mockAutoScalingService := cloudtest.NewMockAutoScalingService([]*autoscaling.Group{asg}) + + // First run (Node rollout process gets marked as started) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockKubernetesClient.Counter["UpdateNode"] != 1 { + t.Error("Node should've been annotated, meaning that UpdateNode should've been called once") + } + oldNodeAfterFirstRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateStartedTimestampAnnotationKey]; !ok { + t.Error("Node should've been annotated with", k8s.RollingUpdateStartedTimestampAnnotationKey) + } + if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been terminated yet, therefore shouldn't have been annotated with", k8s.RollingUpdateTerminatedTimestampAnnotationKey) + } + + // Second run (ASG's desired capacity gets increased) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("ASG should've been increased because there's no updated nodes yet") + } + asgAfterSecondRun := mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asgAfterSecondRun.DesiredCapacity) != 2 { + t.Error("The desired capacity of the ASG should've been increased to 2") + } + oldNodeAfterSecondRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterSecondRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Third run (Nothing changed) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("Desired capacity shouldn't have been updated") + } + asgAfterThirdRun := mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asgAfterThirdRun.DesiredCapacity) != 2 { + t.Error("The desired capacity of the ASG should've stayed at 2") + } + oldNodeAfterThirdRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterThirdRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Fourth run (new instance has been registered to ASG, but is pending) + newInstance := cloudtest.CreateTestAutoScalingInstance("new-1", "v2", nil, "Pending") + asg.Instances = append(asg.Instances, newInstance) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("Desired capacity shouldn't have been updated") + } + oldNodeAfterFourthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterFourthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Fifth run (new instance is now InService, but node has still not joined cluster (GetNodeByHostName should return not found)) + newInstance.SetLifecycleState("InService") + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNodeAfterFifthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterFifthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Sixth run (new instance has joined the cluster, but Kubelet isn't ready to accept pods yet) + newNode := k8stest.CreateTestNode(aws.StringValue(newInstance.InstanceId), "1000m", "1000Mi") + newNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}} + mockKubernetesClient.Nodes[newNode.Name] = newNode + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNodeAfterSixthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterSixthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Seventh run (Kubelet is ready to accept new pods. Old node gets drained and terminated) + newNodeAfterSeventhRun := mockKubernetesClient.Nodes[newNode.Name] + newNodeAfterSeventhRun.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}} + mockKubernetesClient.Nodes[newNode.Name] = newNodeAfterSeventhRun + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNodeAfterSeventhRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNodeAfterSeventhRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; !ok { + t.Error("Node should've been drained") + } + if _, ok := oldNodeAfterSeventhRun.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; !ok { + t.Error("Node should've been terminated") + } +}