Skip to content

Commit

Permalink
refactor liklus scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Omer Aplatony <[email protected]>
  • Loading branch information
omerap12 committed Dec 20, 2024
1 parent 55bd5bf commit 56f8d4a
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 114 deletions.
80 changes: 23 additions & 57 deletions pkg/scalers/liiklus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"time"

"github.com/go-logr/logr"
Expand All @@ -27,12 +26,12 @@ type liiklusScaler struct {
}

type liiklusMetadata struct {
lagThreshold int64
activationLagThreshold int64
address string
topic string
group string
groupVersion uint32
LagThreshold int64 `keda:"name=lagThreshold,order=triggerMetadata,default=10"`
ActivationLagThreshold int64 `keda:"name=activationLagThreshold,order=triggerMetadata,default=0"`
Address string `keda:"name=address,order=triggerMetadata"`
Topic string `keda:"name=topic,order=triggerMetadata"`
Group string `keda:"name=group,order=triggerMetadata"`
GroupVersion uint32 `keda:"name=groupVersion,order=triggerMetadata,default=0"`
triggerIndex int
}

Expand Down Expand Up @@ -70,7 +69,7 @@ func NewLiiklusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, err
}

conn, err := grpc.NewClient(lm.address,
conn, err := grpc.NewClient(lm.Address,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand All @@ -94,21 +93,21 @@ func (s *liiklusScaler) GetMetricsAndActivity(ctx context.Context, metricName st
return nil, false, err
}

if totalLag/uint64(s.metadata.lagThreshold) > uint64(len(lags)) {
totalLag = uint64(s.metadata.lagThreshold) * uint64(len(lags))
if totalLag/uint64(s.metadata.LagThreshold) > uint64(len(lags)) {
totalLag = uint64(s.metadata.LagThreshold) * uint64(len(lags))
}

metric := GenerateMetricInMili(metricName, float64(totalLag))

return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.activationLagThreshold), nil
return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.ActivationLagThreshold), nil
}

func (s *liiklusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.topic))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.Topic))),
},
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: liiklusMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -131,9 +130,9 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
ctx1, cancel1 := context.WithTimeout(ctx, 10*time.Second)
defer cancel1()
gor, err := s.client.GetOffsets(ctx1, &liiklus_service.GetOffsetsRequest{
Topic: s.metadata.topic,
Group: s.metadata.group,
GroupVersion: s.metadata.groupVersion,
Topic: s.metadata.Topic,
Group: s.metadata.Group,
GroupVersion: s.metadata.GroupVersion,
})
if err != nil {
return 0, nil, err
Expand All @@ -142,7 +141,7 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second)
defer cancel2()
geor, err := s.client.GetEndOffsets(ctx2, &liiklus_service.GetEndOffsetsRequest{
Topic: s.metadata.topic,
Topic: s.metadata.Topic,
})
if err != nil {
return 0, nil, err
Expand All @@ -159,50 +158,17 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
}

func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, error) {
lagThreshold := defaultLiiklusLagThreshold
activationLagThreshold := defaultLiiklusActivationLagThreshold

if val, ok := config.TriggerMetadata[liiklusLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", liiklusLagThresholdMetricName, err)
}
lagThreshold = t
meta := &liiklusMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing liiklus metadata: %w", err)
}

if val, ok := config.TriggerMetadata[liiklusActivationLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", liiklusActivationLagThresholdMetricName, err)
}
activationLagThreshold = t
}

groupVersion := uint32(0)
if val, ok := config.TriggerMetadata["groupVersion"]; ok {
t, err := strconv.ParseUint(val, 10, 32)
if err != nil {
return nil, fmt.Errorf("error parsing groupVersion: %w", err)
}
groupVersion = uint32(t)
}

switch {
case config.TriggerMetadata["topic"] == "":
case meta.Topic == "":
return nil, ErrLiiklusNoTopic
case config.TriggerMetadata["address"] == "":
case meta.Address == "":
return nil, ErrLiiklusNoAddress
case config.TriggerMetadata["group"] == "":
case meta.Group == "":
return nil, ErrLiiklusNoGroup
}

return &liiklusMetadata{
topic: config.TriggerMetadata["topic"],
address: config.TriggerMetadata["address"],
group: config.TriggerMetadata["group"],
groupVersion: groupVersion,
lagThreshold: lagThreshold,
activationLagThreshold: activationLagThreshold,
triggerIndex: config.TriggerIndex,
}, nil
return meta, nil
}
170 changes: 113 additions & 57 deletions pkg/scalers/liiklus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package scalers

import (
"context"
"errors"
"strconv"
"fmt"
"testing"

"github.com/go-logr/logr"
Expand All @@ -15,12 +14,10 @@ import (
)

type parseLiiklusMetadataTestData struct {
metadata map[string]string
err error
liiklusAddress string
group string
topic string
threshold int64
name string
metadata map[string]string
ExpectedErr error
ExpectedMetatada *liiklusMetadata
}

type liiklusMetricIdentifier struct {
Expand All @@ -30,12 +27,64 @@ type liiklusMetricIdentifier struct {
}

var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{
{map[string]string{}, ErrLiiklusNoTopic, "", "", "", 0},
{map[string]string{"topic": "foo"}, ErrLiiklusNoAddress, "", "", "", 0},
{map[string]string{"topic": "foo", "address": "bar:6565"}, ErrLiiklusNoGroup, "", "", "", 0},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup"}, nil, "bar:6565", "mygroup", "foo", 10},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "activationLagThreshold": "aa"}, strconv.ErrSyntax, "bar:6565", "mygroup", "foo", 10},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "lagThreshold": "15"}, nil, "bar:6565", "mygroup", "foo", 15},
{
name: "Empty metadata",
metadata: map[string]string{},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"address\" in [triggerMetadata]\n" +
"missing required parameter \"topic\" in [triggerMetadata]\n" +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Empty address",
metadata: map[string]string{"topic": "foo"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"address\" in [triggerMetadata]\n" +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Empty group",
metadata: map[string]string{"topic": "foo", "address": "using-mock"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Valid",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"},
ExpectedErr: nil,
ExpectedMetatada: &liiklusMetadata{
LagThreshold: defaultLiiklusLagThreshold,
ActivationLagThreshold: defaultLiiklusActivationLagThreshold,
Address: "using-mock",
Topic: "foo",
Group: "mygroup",
GroupVersion: 0,
triggerIndex: 0,
},
},
{
name: "Invalid activationLagThreshold",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "activationLagThreshold": "invalid"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: unable to set param \"activationLagThreshold\" value \"invalid\": unable to unmarshal to field type int64: invalid character 'i' looking for beginning of value"),
ExpectedMetatada: nil,
},
{
name: "Custom lagThreshold",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "lagThreshold": "20"},
ExpectedErr: nil,
ExpectedMetatada: &liiklusMetadata{
LagThreshold: 20,
ActivationLagThreshold: defaultLiiklusActivationLagThreshold,
Address: "using-mock",
Topic: "foo",
Group: "mygroup",
GroupVersion: 0,
triggerIndex: 0,
},
},
}

var liiklusMetricIdentifiers = []liiklusMetricIdentifier{
Expand All @@ -45,38 +94,44 @@ var liiklusMetricIdentifiers = []liiklusMetricIdentifier{

func TestLiiklusParseMetadata(t *testing.T) {
for _, testData := range parseLiiklusMetadataTestDataset {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && testData.err == nil {
t.Error("Expected success but got error", err)
continue
}
if testData.err != nil && err == nil {
t.Error("Expected error but got success")
continue
}
if testData.err != nil && err != nil && !errors.Is(err, testData.err) {
t.Errorf("Expected error %v but got %v", testData.err, err)
continue
}
if err != nil {
continue
}
if testData.liiklusAddress != meta.address {
t.Errorf("Expected address %q but got %q\n", testData.liiklusAddress, meta.address)
continue
}
if meta.group != testData.group {
t.Errorf("Expected group %q but got %q\n", testData.group, meta.group)
continue
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %q but got %q\n", testData.topic, meta.topic)
continue
}
if meta.lagThreshold != testData.threshold {
t.Errorf("Expected threshold %d but got %d\n", testData.threshold, meta.lagThreshold)
continue
}
t.Run(testData.name, func(t *testing.T) {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata})

// error cases
if testData.ExpectedErr != nil {
if err == nil {
t.Errorf("Expected error %v but got success", testData.ExpectedErr)
} else if err.Error() != testData.ExpectedErr.Error() {
t.Errorf("Expected error %v but got %v", testData.ExpectedErr, err)
}
return // Skip the rest of the checks for error cases
}

// success cases
if err != nil {
t.Errorf("Expected success but got error %v", err)
}
if testData.ExpectedMetatada != nil {
if testData.ExpectedMetatada.Address != meta.Address {
t.Errorf("Expected address %q but got %q", testData.ExpectedMetatada.Address, meta.Address)
}
if meta.Group != testData.ExpectedMetatada.Group {
t.Errorf("Expected group %q but got %q", testData.ExpectedMetatada.Group, meta.Group)
}
if meta.Topic != testData.ExpectedMetatada.Topic {
t.Errorf("Expected topic %q but got %q", testData.ExpectedMetatada.Topic, meta.Topic)
}
if meta.LagThreshold != testData.ExpectedMetatada.LagThreshold {
t.Errorf("Expected threshold %d but got %d", testData.ExpectedMetatada.LagThreshold, meta.LagThreshold)
}
if meta.ActivationLagThreshold != testData.ExpectedMetatada.ActivationLagThreshold {
t.Errorf("Expected activation threshold %d but got %d", testData.ExpectedMetatada.ActivationLagThreshold, meta.ActivationLagThreshold)
}
if meta.GroupVersion != testData.ExpectedMetatada.GroupVersion {
t.Errorf("Expected group version %d but got %d", testData.ExpectedMetatada.GroupVersion, meta.GroupVersion)
}
}
})
}
}

Expand Down Expand Up @@ -172,16 +227,17 @@ func TestLiiklusScalerGetMetricsBehavior(t *testing.T) {

func TestLiiklusGetMetricSpecForScaling(t *testing.T) {
for _, testData := range liiklusMetricIdentifiers {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()}

metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
t.Run(testData.name, func(t *testing.T) {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
meta.triggerIndex = testData.triggerIndex
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()}
metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background())
if metricSpec[0].External.Metric.Name != testData.name {
t.Errorf("Wrong External metric source name: %s", metricSpec[0].External.Metric.Name)
}
})
}
}

0 comments on commit 56f8d4a

Please sign in to comment.