Skip to content

Commit

Permalink
refactor load-tests
Browse files Browse the repository at this point in the history
Signed-off-by: jhu02 <[email protected]>
  • Loading branch information
jhu02 authored and jhu02 committed Feb 12, 2022
1 parent 61c1bea commit 07a82b0
Show file tree
Hide file tree
Showing 21 changed files with 1,097 additions and 30 deletions.
30 changes: 6 additions & 24 deletions controllers/addon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package controllers
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -29,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
Expand All @@ -42,24 +40,22 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
addonmgrv1alpha1 "github.com/keikoproj/addon-manager/api/v1alpha1"
"github.com/keikoproj/addon-manager/pkg/addon"
"github.com/keikoproj/addon-manager/pkg/common"
"github.com/keikoproj/addon-manager/pkg/workflows"

"k8s.io/client-go/dynamic/dynamicinformer"
)

const (
controllerName = "addon_manager_controller"
// addon ttl time
TTL = time.Duration(1) * time.Hour // 1 hour

workflowDeployedNS = "addon-manager-system"
managedNameSpace = "addon-manager-system"
)

// Watched resources
Expand Down Expand Up @@ -197,17 +193,15 @@ func New(mgr manager.Manager, stopChan <-chan struct{}) (controller.Controller,
return nil, err
}

// Watch workflows created by addon only in addon-manager-system namespace
nsInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(r.dynClient, time.Minute*30, workflowDeployedNS, nil)
wfInf := nsInformers.ForResource(common.WorkflowGVR())
if err := c.Watch(&source.Informer{Informer: wfInf.Informer()}, &handler.EnqueueRequestForOwner{
if err := c.Watch(&source.Kind{Type: &wfv1.Workflow{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &addonmgrv1alpha1.Addon{},
}, predicate.NewPredicateFuncs(r.workflowHasMatchingNamespace)); err != nil {
}); err != nil {
return nil, err
}

wfInforms := NewWfInformers(nsInformers, stopChan)
// Watch workflows only in managed namespace
wfInforms := NewWfInformers(NewWorkflowInformer(r.dynClient, managedNameSpace, workflowResyncPeriod, nil, nil), stopChan)
err = mgr.Add(wfInforms)
if err != nil {
return nil, fmt.Errorf("failed to start workflowinformers")
Expand Down Expand Up @@ -244,18 +238,6 @@ func New(mgr manager.Manager, stopChan <-chan struct{}) (controller.Controller,
return c, nil
}

func (r *AddonReconciler) workflowHasMatchingNamespace(obj client.Object) bool {
u, _ := obj.(*unstructured.Unstructured)
if u.GetObjectKind().GroupVersionKind() != common.WorkflowType().GroupVersionKind() {
r.Log.Error(fmt.Errorf("unexpected object type in workflow watch predicates"), "expected", "*wfv1.Workflow", "found", reflect.TypeOf(obj))
return false
}
if obj.GetNamespace() != workflowDeployedNS {
return false
}
return true
}

func (r *AddonReconciler) enqueueRequestWithAddonLabel() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
var reqs = make([]reconcile.Request, 0)
Expand Down
67 changes: 61 additions & 6 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,82 @@ package controllers

import (
"context"
"fmt"
"time"

"k8s.io/client-go/dynamic/dynamicinformer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/tools/cache"
)

const (
workflowResyncPeriod = 20 * time.Minute
)

type WfInformers struct {
nsInformers dynamicinformer.DynamicSharedInformerFactory
//nsInformers dynamicinformer.DynamicSharedInformerFactory
nsInformers cache.SharedIndexInformer

stopCh <-chan struct{}
}

func NewWfInformers(nsInfo dynamicinformer.DynamicSharedInformerFactory, stopCh <-chan struct{}) *WfInformers {
func NewWfInformers(nsInfo cache.SharedIndexInformer, stopCh <-chan struct{}) *WfInformers {
return &WfInformers{
nsInformers: nsInfo,
stopCh: stopCh,
}
}

func (wfinfo *WfInformers) Start(ctx context.Context) error {
s := wfinfo.stopCh
wfinfo.nsInformers.Start(s)
wfinfo.nsInformers.WaitForCacheSync(s)
go wfinfo.nsInformers.Run(wfinfo.stopCh)
if ok := cache.WaitForCacheSync(wfinfo.stopCh, wfinfo.nsInformers.HasSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
<-wfinfo.stopCh
return nil
}

func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer {
resource := schema.GroupVersionResource{
Group: "argoproj.io",
Version: "v1alpha1",
Resource: "workflows",
}
informer := NewFilteredUnstructuredInformer(
resource,
dclient,
ns,
resyncPeriod,
indexers,
tweakListOptions,
)
return informer
}

func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
ctx := context.Background()
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).Watch(ctx, options)
},
},
&unstructured.Unstructured{},
resyncPeriod,
indexers,
)
}
36 changes: 36 additions & 0 deletions test-load/dolphin/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package app

import (
"os"

"github.com/keikoproj/addon-manager/test-load/pkg/cmd"
"github.com/keikoproj/addon-manager/test-load/pkg/cmd/dolphin"
"github.com/keikoproj/addon-manager/test-load/pkg/exec"
"github.com/keikoproj/addon-manager/test-load/pkg/log"
)

// Main is the dolphin main(), it will invoke Run(), if an error is returned it exit
func Main() {
if err := Run(cmd.NewLogger(), cmd.StandardIOStreams(), os.Args[1:]); err != nil {
os.Exit(1)
}
}

// Run invokes the dolphin command, returning the error.
func Run(logger log.Logger, streams cmd.IOStreams, args []string) error {
c := dolphin.NewCommand(logger, streams)
c.SetArgs(args)
if err := c.Execute(); err != nil {
logError(logger, err)
return err
}
return nil
}

// logError logs runtime error messages
func logError(logger log.Logger, err error) {
logger.Errorf("ERROR: %v", err)
if err := exec.RunErrorForError(err); err != nil {
logger.Errorf("\nCommand Output: %s", err.Output)
}
}
9 changes: 9 additions & 0 deletions test-load/dolphin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/keikoproj/addon-manager/test-load/dolphin/app"
)

func main() {
app.Main()
}
1 change: 1 addition & 0 deletions test-load/pkg/cmd/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cmd
105 changes: 105 additions & 0 deletions test-load/pkg/cmd/dolphin/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dolphin

import (
"io"
"io/ioutil"

"github.com/keikoproj/addon-manager/test-load/pkg/cmd"
"github.com/keikoproj/addon-manager/test-load/pkg/cmd/dolphin/tests"
"github.com/keikoproj/addon-manager/test-load/pkg/log"
"github.com/spf13/cobra"
)

type flagpole struct {
LogLevel string
Verbosity int32
Quiet bool
}

// NewCommand returns a new cobra.Command implementing the root command for dolphin
func NewCommand(logger log.Logger, streams cmd.IOStreams) *cobra.Command {
flags := &flagpole{}
cmd := &cobra.Command{
Args: cobra.NoArgs,
Use: "dolphin",
Short: "dolphin is a tool for managing local Kubernetes clusters pods",
Long: "dolphin creates and manages local Kubernetes clusters using Docker container 'nodes'",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return runE(logger, flags, cmd)
},
SilenceUsage: true,
SilenceErrors: true,
// Version: version.Version(),
}
cmd.SetOut(streams.Out)
cmd.SetErr(streams.ErrOut)
cmd.PersistentFlags().StringVar(
&flags.LogLevel,
"loglevel",
"",
"DEPRECATED: see -v instead",
)
cmd.PersistentFlags().Int32VarP(
&flags.Verbosity,
"verbosity",
"v",
0,
"info log verbosity, higher value produces more output",
)
cmd.PersistentFlags().BoolVarP(
&flags.Quiet,
"quiet",
"q",
false,
"silence all stderr output",
)
// add all top level subcommands
cmd.AddCommand(tests.NewCommand(logger, streams))
return cmd
}

func runE(logger log.Logger, flags *flagpole, command *cobra.Command) error {
// handle limited migration for --loglevel
setLogLevel := command.Flag("loglevel").Changed
setVerbosity := command.Flag("verbosity").Changed
if setLogLevel && !setVerbosity {
switch flags.LogLevel {
case "debug":
flags.Verbosity = 3
case "trace":
flags.Verbosity = 2147483647
}
}
// normal logger setup
if flags.Quiet {
maybeSetWriter(logger, ioutil.Discard)
}
maybeSetVerbosity(logger, log.Level(flags.Verbosity))
if setLogLevel {
logger.Warn("WARNING: --loglevel is deprecated, please switch to -v and -q!")
}
return nil
}

// maybeSetWriter will call logger.SetWriter(w) if logger has a SetWriter method
func maybeSetWriter(logger log.Logger, w io.Writer) {
type writerSetter interface {
SetWriter(io.Writer)
}
v, ok := logger.(writerSetter)
if ok {
v.SetWriter(w)
}
}

// maybeSetVerbosity will call logger.SetVerbosity(verbosity) if logger
// has a SetVerbosity method
func maybeSetVerbosity(logger log.Logger, verbosity log.Level) {
type verboser interface {
SetVerbosity(log.Level)
}
v, ok := logger.(verboser)
if ok {
v.SetVerbosity(verbosity)
}
}
Loading

0 comments on commit 07a82b0

Please sign in to comment.