Skip to content

Commit

Permalink
Supporting scaler 0.2.0 (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber authored Dec 29, 2019
1 parent e85fc49 commit 55f3ea9
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 100 deletions.
4 changes: 2 additions & 2 deletions autoscaler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY . .

RUN go get github.com/v3io/scaler-types \
&& cd $GOPATH/src/github.com/v3io/scaler-types \
&& git checkout v1.3.0 \
&& git checkout v1.5.0 \
&& cd /go/src/github.com/v3io/app_resourcescaler

RUN mkdir -p /home/app_resourcescaler/bin \
Expand All @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler
ENV PATH=/home/app_resourcescaler:$PATH

COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so
COPY --from=quay.io/v3io/autoscaler:v0.1.2 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler
COPY --from=quay.io/v3io/autoscaler:v0.2.0 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler

CMD ["autoscaler"]
4 changes: 2 additions & 2 deletions dlx/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY . .

RUN go get github.com/v3io/scaler-types \
&& cd $GOPATH/src/github.com/v3io/scaler-types \
&& git checkout v1.3.0 \
&& git checkout v1.5.0 \
&& cd /go/src/github.com/v3io/app_resourcescaler

RUN mkdir -p /home/app_resourcescaler/bin \
Expand All @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler
ENV PATH=/home/app_resourcescaler:$PATH

COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so
COPY --from=quay.io/v3io/dlx:v0.1.2 /home/v3io/bin/dlx /home/app_resourcescaler/dlx
COPY --from=quay.io/v3io/dlx:v0.2.0 /home/v3io/bin/dlx /home/app_resourcescaler/dlx

CMD ["dlx"]
213 changes: 117 additions & 96 deletions resourcescaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ func New(kubeconfigPath string, namespace string) (scaler_types.ResourceScaler,
}, nil
}

func (s *AppResourceScaler) SetScale(resource scaler_types.Resource, scale int) error {
func (s *AppResourceScaler) SetScale(resources []scaler_types.Resource, scale int) error {
serviceNames := make([]string, 0)
for _, resource := range resources {
serviceNames = append(serviceNames, resource.Name)
}
if scale == 0 {
return s.scaleServiceToZero(s.namespace, resource.Name)
return s.scaleServicesToZero(s.namespace, serviceNames)
}
return s.scaleServiceFromZero(s.namespace, resource.Name)
return s.scaleServicesFromZero(s.namespace, serviceNames)
}

func (s *AppResourceScaler) GetResources() ([]scaler_types.Resource, error) {
Expand Down Expand Up @@ -115,91 +119,89 @@ func (s *AppResourceScaler) GetConfig() (*scaler_types.ResourceScalerConfig, err
return nil, nil
}

func (s *AppResourceScaler) scaleServiceFromZero(namespace string, serviceName string) error {
func (s *AppResourceScaler) scaleServicesFromZero(namespace string, serviceNames []string) error {
var jsonPatchMapper []map[string]interface{}
s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceName", serviceName)
desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName))
markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName))
scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName))
lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName))
lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName))
s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceNames", serviceNames)
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": desiredStatePath,
"value": "ready",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": markForRestartPath,
"value": false,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": scaleToZeroStatusPath,
"value": map[string]interface{}{},
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStatePath,
"value": string(scaler_types.ScaleFromZeroStartedScaleEvent),
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStateTimePath,
"value": string(marshaledTime),
})
for _, serviceName := range serviceNames {
jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper,
serviceName,
"ready",
scaler_types.ScaleFromZeroStartedScaleEvent,
marshaledTime)
if err != nil {
return errors.Wrap(err, "Failed appending service state change json patch operations")
}
}

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServiceState(namespace, serviceName, "ready")
err = s.waitForServicesState(namespace, serviceNames, "ready")

if err != nil {
return errors.Wrap(err, "Failed to wait for service readiness")
return errors.Wrap(err, "Failed to wait for services readiness")
}

return nil
}

func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName string) error {
func (s *AppResourceScaler) scaleServicesToZero(namespace string, serviceNames []string) error {
var jsonPatchMapper []map[string]interface{}
s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceName", serviceName)
s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceNames", serviceNames)
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
for _, serviceName := range serviceNames {

jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper,
serviceName,
"scaledToZero",
scaler_types.ScaleToZeroStartedScaleEvent,
marshaledTime)
if err != nil {
return errors.Wrap(err, "Failed appending service state change json patch operations")
}
}

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServicesState(namespace, serviceNames, "scaledToZero")

if err != nil {
return errors.Wrap(err, "Failed to wait for services to scale to zero")
}

return nil
}

func (s *AppResourceScaler) appendServiceStateChangeJsonPatchOperations(jsonPatchMapper []map[string]interface{}, serviceName string, desiredState string, scaleEvent scaler_types.ScaleEvent, marshaledTime []byte) ([]map[string]interface{}, error) {
desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName))
markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName))
scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName))
lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName))
lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName))
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": desiredStatePath,
"value": "scaledToZero",
"value": desiredState,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": markForRestartPath,
"value": false,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": scaleToZeroStatusPath,
Expand All @@ -208,30 +210,24 @@ func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName str
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStatePath,
"value": string(scaler_types.ScaleToZeroStartedScaleEvent),
"value": string(scaleEvent),
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStateTimePath,
"value": string(marshaledTime),
})

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServiceState(namespace, serviceName, "scaledToZero")

if err != nil {
return errors.Wrap(err, "Failed to wait for service to scale to zero")
}

return nil
return jsonPatchMapper, nil
}

func (s *AppResourceScaler) patchIguazioTenantAppServiceSets(namespace string, jsonPatchMapper []map[string]interface{}) error {
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})

err := s.waitForNoProvisioningInProcess(namespace)
if err != nil {
return errors.Wrap(err, "Failed waiting for IguazioTenantAppServiceSet to finish provisioning")
Expand All @@ -256,60 +252,66 @@ func (s *AppResourceScaler) waitForNoProvisioningInProcess(namespace string) err
timeout := time.After(5 * time.Minute)
tick := time.Tick(10 * time.Second)
for {
_, _, state, err := s.getIguazioTenantAppServiceSets()
if err != nil {
return errors.Wrap(err, "Failed to get iguazio tenant app service sets")
}

if state == "ready" || state == "error" {
s.logger.DebugWith("IguazioTenantAppServiceSet finished provisioning")
return nil
}

s.logger.DebugWith("IguazioTenantAppServiceSet is still provisioning", "state", state)

select {
case <-timeout:
return errors.New("Timed out waiting for IguazioTenantAppServiceSet to finish provisioning")
case <-tick:

_, _, state, err := s.getIguazioTenantAppServiceSets()
if err != nil {
return errors.Wrap(err, "Failed to get iguazio tenant app service sets")
}

if state == "ready" || state == "error" {
s.logger.DebugWith("IguazioTenantAppServiceSet finished provisioning")
return nil
}

s.logger.DebugWith("IguazioTenantAppServiceSet is still provisioning", "state", state)
continue
}
}
}

func (s *AppResourceScaler) waitForServiceState(namespace string, serviceName string, state string) error {
s.logger.DebugWith("Waiting for service to reach state", "serviceName", serviceName, "state", state)
timeout := time.After(5 * time.Minute)
func (s *AppResourceScaler) waitForServicesState(namespace string, serviceNames []string, desiredState string) error {
s.logger.DebugWith("Waiting for services to reach desired state", "serviceNames", serviceNames, "desiredState", desiredState)
timeout := time.After(10 * time.Minute)
tick := time.Tick(5 * time.Second)
for {
select {
case <-timeout:
return errors.New("Timed out waiting for service state")
return errors.New("Timed out waiting for services to reach desired state")
case <-tick:

servicesToCheck := append([]string(nil), serviceNames...)
_, statusServicesMap, _, err := s.getIguazioTenantAppServiceSets()
if err != nil {
return errors.Wrap(err, "Failed to get iguazio tenant app service sets")
}

for statusServiceName, serviceStatus := range statusServicesMap {
if statusServiceName != serviceName {
for serviceName, serviceStatus := range statusServicesMap {
if !stringSliceContainsString(servicesToCheck, serviceName) {
continue
}

stateString, err := s.parseServiceState(serviceStatus)
currentState, err := s.parseServiceState(serviceStatus)
if err != nil {
return errors.Wrap(err, "Failed parsing the service state")
}

if stateString == state {
s.logger.InfoWith("Service reached state", "serviceName", serviceName, "state", state)
return nil
if currentState != desiredState {
s.logger.DebugWith("Service did not reach desired state yet",
"serviceName", serviceName,
"currentState", currentState,
"desiredState", desiredState)
break
}

s.logger.DebugWith("Service did not reach state yet",
"serviceName", serviceName,
"currentState", stateString,
"desiredState", state)
s.logger.DebugWith("Service reached desired state", "serviceName", serviceName, "desiredState", desiredState)
servicesToCheck = removeStringFromSlice(serviceName, servicesToCheck)

if len(servicesToCheck) == 0 {
return nil
}
}
}
}
Expand Down Expand Up @@ -523,7 +525,7 @@ func (s *AppResourceScaler) parseScaleResources(serviceSpecInterface interface{}

parsedScaleResource := scaler_types.ScaleResource{
MetricName: metricName,
WindowSize: windowSize,
WindowSize: scaler_types.Duration{Duration: windowSize},
Threshold: int(threshold),
}

Expand All @@ -532,3 +534,22 @@ func (s *AppResourceScaler) parseScaleResources(serviceSpecInterface interface{}

return parsedScaleResources, nil
}

func removeStringFromSlice(someString string, slice []string) []string {
var newSlice []string
for _, item := range slice {
if item != someString {
newSlice = append(newSlice, item)
}
}
return newSlice
}

func stringSliceContainsString(slice []string, str string) bool {
for _, stringInSlice := range slice {
if stringInSlice == str {
return true
}
}
return false
}

0 comments on commit 55f3ea9

Please sign in to comment.