diff --git a/autoscaler/Dockerfile b/autoscaler/Dockerfile index 9d53b628..ce5c9099 100644 --- a/autoscaler/Dockerfile +++ b/autoscaler/Dockerfile @@ -15,7 +15,7 @@ COPY . . RUN go get github.com/v3io/scaler-types \ && cd $GOPATH/src/github.com/v3io/scaler-types \ - && git checkout v1.3.0 \ + && git checkout v1.5.0 \ && cd /go/src/github.com/v3io/app_resourcescaler RUN mkdir -p /home/app_resourcescaler/bin \ @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler ENV PATH=/home/app_resourcescaler:$PATH COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so -COPY --from=quay.io/v3io/autoscaler:v0.1.2 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler +COPY --from=quay.io/v3io/autoscaler:v0.2.0 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler CMD ["autoscaler"] diff --git a/dlx/Dockerfile b/dlx/Dockerfile index 2b4b91b6..f447c2b4 100644 --- a/dlx/Dockerfile +++ b/dlx/Dockerfile @@ -15,7 +15,7 @@ COPY . . RUN go get github.com/v3io/scaler-types \ && cd $GOPATH/src/github.com/v3io/scaler-types \ - && git checkout v1.3.0 \ + && git checkout v1.5.0 \ && cd /go/src/github.com/v3io/app_resourcescaler RUN mkdir -p /home/app_resourcescaler/bin \ @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler ENV PATH=/home/app_resourcescaler:$PATH COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so -COPY --from=quay.io/v3io/dlx:v0.1.2 /home/v3io/bin/dlx /home/app_resourcescaler/dlx +COPY --from=quay.io/v3io/dlx:v0.2.0 /home/v3io/bin/dlx /home/app_resourcescaler/dlx CMD ["dlx"] diff --git a/resourcescaler.go b/resourcescaler.go index 1423ecfe..e81dacfb 100644 --- a/resourcescaler.go +++ b/resourcescaler.go @@ -46,11 +46,15 @@ func New(kubeconfigPath string, namespace string) (scaler_types.ResourceScaler, }, nil } -func (s *AppResourceScaler) SetScale(resource scaler_types.Resource, scale int) error { +func (s *AppResourceScaler) SetScale(resources []scaler_types.Resource, scale int) error { + serviceNames := make([]string, 0) + for _, resource := range resources { + serviceNames = append(serviceNames, resource.Name) + } if scale == 0 { - return s.scaleServiceToZero(s.namespace, resource.Name) + return s.scaleServicesToZero(s.namespace, serviceNames) } - return s.scaleServiceFromZero(s.namespace, resource.Name) + return s.scaleServicesFromZero(s.namespace, serviceNames) } func (s *AppResourceScaler) GetResources() ([]scaler_types.Resource, error) { @@ -115,48 +119,23 @@ func (s *AppResourceScaler) GetConfig() (*scaler_types.ResourceScalerConfig, err return nil, nil } -func (s *AppResourceScaler) scaleServiceFromZero(namespace string, serviceName string) error { +func (s *AppResourceScaler) scaleServicesFromZero(namespace string, serviceNames []string) error { var jsonPatchMapper []map[string]interface{} - s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceName", serviceName) - desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName)) - markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName)) - scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName)) - lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName)) - lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName)) + s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceNames", serviceNames) marshaledTime, err := time.Now().MarshalText() if err != nil { return errors.Wrap(err, "Failed to marshal time") } - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": desiredStatePath, - "value": "ready", - }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": markForRestartPath, - "value": false, - }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": "/status/state", - "value": "waitingForProvisioning", - }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": scaleToZeroStatusPath, - "value": map[string]interface{}{}, - }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": lastScaleStatePath, - "value": string(scaler_types.ScaleFromZeroStartedScaleEvent), - }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": lastScaleStateTimePath, - "value": string(marshaledTime), - }) + for _, serviceName := range serviceNames { + jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper, + serviceName, + "ready", + scaler_types.ScaleFromZeroStartedScaleEvent, + marshaledTime) + if err != nil { + return errors.Wrap(err, "Failed appending service state change json patch operations") + } + } err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper) @@ -164,42 +143,65 @@ func (s *AppResourceScaler) scaleServiceFromZero(namespace string, serviceName s return errors.Wrap(err, "Failed to patch iguazio tenant app service sets") } - err = s.waitForServiceState(namespace, serviceName, "ready") + err = s.waitForServicesState(namespace, serviceNames, "ready") if err != nil { - return errors.Wrap(err, "Failed to wait for service readiness") + return errors.Wrap(err, "Failed to wait for services readiness") } return nil } -func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName string) error { +func (s *AppResourceScaler) scaleServicesToZero(namespace string, serviceNames []string) error { var jsonPatchMapper []map[string]interface{} - s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceName", serviceName) + s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceNames", serviceNames) + marshaledTime, err := time.Now().MarshalText() + if err != nil { + return errors.Wrap(err, "Failed to marshal time") + } + for _, serviceName := range serviceNames { + + jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper, + serviceName, + "scaledToZero", + scaler_types.ScaleToZeroStartedScaleEvent, + marshaledTime) + if err != nil { + return errors.Wrap(err, "Failed appending service state change json patch operations") + } + } + + err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper) + + if err != nil { + return errors.Wrap(err, "Failed to patch iguazio tenant app service sets") + } + + err = s.waitForServicesState(namespace, serviceNames, "scaledToZero") + + if err != nil { + return errors.Wrap(err, "Failed to wait for services to scale to zero") + } + + return nil +} + +func (s *AppResourceScaler) appendServiceStateChangeJsonPatchOperations(jsonPatchMapper []map[string]interface{}, serviceName string, desiredState string, scaleEvent scaler_types.ScaleEvent, marshaledTime []byte) ([]map[string]interface{}, error) { desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName)) markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName)) scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName)) lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName)) lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName)) - marshaledTime, err := time.Now().MarshalText() - if err != nil { - return errors.Wrap(err, "Failed to marshal time") - } jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ "op": "add", "path": desiredStatePath, - "value": "scaledToZero", + "value": desiredState, }) jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ "op": "add", "path": markForRestartPath, "value": false, }) - jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ - "op": "add", - "path": "/status/state", - "value": "waitingForProvisioning", - }) jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ "op": "add", "path": scaleToZeroStatusPath, @@ -208,7 +210,7 @@ func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName str jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ "op": "add", "path": lastScaleStatePath, - "value": string(scaler_types.ScaleToZeroStartedScaleEvent), + "value": string(scaleEvent), }) jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ "op": "add", @@ -216,22 +218,16 @@ func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName str "value": string(marshaledTime), }) - err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper) - - if err != nil { - return errors.Wrap(err, "Failed to patch iguazio tenant app service sets") - } - - err = s.waitForServiceState(namespace, serviceName, "scaledToZero") - - if err != nil { - return errors.Wrap(err, "Failed to wait for service to scale to zero") - } - - return nil + return jsonPatchMapper, nil } func (s *AppResourceScaler) patchIguazioTenantAppServiceSets(namespace string, jsonPatchMapper []map[string]interface{}) error { + jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{ + "op": "add", + "path": "/status/state", + "value": "waitingForProvisioning", + }) + err := s.waitForNoProvisioningInProcess(namespace) if err != nil { return errors.Wrap(err, "Failed waiting for IguazioTenantAppServiceSet to finish provisioning") @@ -256,60 +252,66 @@ func (s *AppResourceScaler) waitForNoProvisioningInProcess(namespace string) err timeout := time.After(5 * time.Minute) tick := time.Tick(10 * time.Second) for { + _, _, state, err := s.getIguazioTenantAppServiceSets() + if err != nil { + return errors.Wrap(err, "Failed to get iguazio tenant app service sets") + } + + if state == "ready" || state == "error" { + s.logger.DebugWith("IguazioTenantAppServiceSet finished provisioning") + return nil + } + + s.logger.DebugWith("IguazioTenantAppServiceSet is still provisioning", "state", state) + select { case <-timeout: return errors.New("Timed out waiting for IguazioTenantAppServiceSet to finish provisioning") case <-tick: - - _, _, state, err := s.getIguazioTenantAppServiceSets() - if err != nil { - return errors.Wrap(err, "Failed to get iguazio tenant app service sets") - } - - if state == "ready" || state == "error" { - s.logger.DebugWith("IguazioTenantAppServiceSet finished provisioning") - return nil - } - - s.logger.DebugWith("IguazioTenantAppServiceSet is still provisioning", "state", state) + continue } } } -func (s *AppResourceScaler) waitForServiceState(namespace string, serviceName string, state string) error { - s.logger.DebugWith("Waiting for service to reach state", "serviceName", serviceName, "state", state) - timeout := time.After(5 * time.Minute) +func (s *AppResourceScaler) waitForServicesState(namespace string, serviceNames []string, desiredState string) error { + s.logger.DebugWith("Waiting for services to reach desired state", "serviceNames", serviceNames, "desiredState", desiredState) + timeout := time.After(10 * time.Minute) tick := time.Tick(5 * time.Second) for { select { case <-timeout: - return errors.New("Timed out waiting for service state") + return errors.New("Timed out waiting for services to reach desired state") case <-tick: - + servicesToCheck := append([]string(nil), serviceNames...) _, statusServicesMap, _, err := s.getIguazioTenantAppServiceSets() if err != nil { return errors.Wrap(err, "Failed to get iguazio tenant app service sets") } - for statusServiceName, serviceStatus := range statusServicesMap { - if statusServiceName != serviceName { + for serviceName, serviceStatus := range statusServicesMap { + if !stringSliceContainsString(servicesToCheck, serviceName) { continue } - stateString, err := s.parseServiceState(serviceStatus) + currentState, err := s.parseServiceState(serviceStatus) if err != nil { return errors.Wrap(err, "Failed parsing the service state") } - if stateString == state { - s.logger.InfoWith("Service reached state", "serviceName", serviceName, "state", state) - return nil + if currentState != desiredState { + s.logger.DebugWith("Service did not reach desired state yet", + "serviceName", serviceName, + "currentState", currentState, + "desiredState", desiredState) + break } - s.logger.DebugWith("Service did not reach state yet", - "serviceName", serviceName, - "currentState", stateString, - "desiredState", state) + s.logger.DebugWith("Service reached desired state", "serviceName", serviceName, "desiredState", desiredState) + servicesToCheck = removeStringFromSlice(serviceName, servicesToCheck) + + if len(servicesToCheck) == 0 { + return nil + } } } } @@ -523,7 +525,7 @@ func (s *AppResourceScaler) parseScaleResources(serviceSpecInterface interface{} parsedScaleResource := scaler_types.ScaleResource{ MetricName: metricName, - WindowSize: windowSize, + WindowSize: scaler_types.Duration{Duration: windowSize}, Threshold: int(threshold), } @@ -532,3 +534,22 @@ func (s *AppResourceScaler) parseScaleResources(serviceSpecInterface interface{} return parsedScaleResources, nil } + +func removeStringFromSlice(someString string, slice []string) []string { + var newSlice []string + for _, item := range slice { + if item != someString { + newSlice = append(newSlice, item) + } + } + return newSlice +} + +func stringSliceContainsString(slice []string, str string) bool { + for _, stringInSlice := range slice { + if stringInSlice == str { + return true + } + } + return false +}