diff --git a/controllers/leveltriggered/caching.go b/controllers/leveltriggered/caching.go index 4a2dd66..207eb6d 100644 --- a/controllers/leveltriggered/caching.go +++ b/controllers/leveltriggered/caching.go @@ -44,7 +44,8 @@ type caches struct { runner *runner } -func newCaches(events chan event.GenericEvent, targetScheme *runtime.Scheme) *caches { +func newCaches(targetScheme *runtime.Scheme) *caches { + events := make(chan event.GenericEvent) return &caches{ targetScheme: targetScheme, events: events, @@ -53,12 +54,16 @@ func newCaches(events chan event.GenericEvent, targetScheme *runtime.Scheme) *ca } } +func (c *caches) appEvents() <-chan event.GenericEvent { + return c.events +} + func (c *caches) setupWithManager(mgr ctrl.Manager) error { c.localClusterConfig = mgr.GetConfig() c.baseLogger = mgr.GetLogger().WithValues("component", "target-cache") c.reader = mgr.GetClient() // this specifically gets the client that has the indexing installed below; i.e., these are coupled. - c.runner = newRunner() + c.runner = newRunner(mgr.GetLogger().WithValues("component", "cache-runner")) if err := mgr.Add(c.runner); err != nil { return err } @@ -188,7 +193,7 @@ func (c *caches) watchTargetAndGetReader(ctx context.Context, clusterObject *clu return nil, false, err } - cancel := c.runner.run(func(ctx context.Context) { + cancel := c.runner.run("cache-"+cacheKey.String(), func(ctx context.Context) { if err := ca.Start(ctx); err != nil { logger.Error(err, "cache exited with error") } @@ -368,6 +373,7 @@ func (gc *gc) loop() { for { item, shutdown := gc.queue.Get() if shutdown { + gc.log.Info("exiting cache GC loop") return } key, ok := item.(clusterAndGVK) diff --git a/controllers/leveltriggered/controller.go b/controllers/leveltriggered/controller.go index f4051ce..0aa48b2 100644 --- a/controllers/leveltriggered/controller.go +++ b/controllers/leveltriggered/controller.go @@ -16,7 +16,6 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/source" @@ -34,13 +33,9 @@ type PipelineReconciler struct { caches *caches recorder record.EventRecorder stratReg strategy.StrategyRegistry - - appEvents chan event.GenericEvent } func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, eventRecorder record.EventRecorder, stratReg strategy.StrategyRegistry) *PipelineReconciler { - appEvents := make(chan event.GenericEvent) - // this is empty because we're going to use unstructured.Unstructured objects to support arbitrary types. // If something changed and we wanted typed objects, this scheme would need to have those registered. targetScheme := runtime.NewScheme() @@ -51,8 +46,7 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName st recorder: eventRecorder, ControllerName: controllerName, stratReg: stratReg, - caches: newCaches(appEvents, targetScheme), - appEvents: appEvents, + caches: newCaches(targetScheme), } return pc } @@ -444,7 +438,7 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { &clusterctrlv1alpha1.GitopsCluster{}, handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)), ). - WatchesRawSource(&source.Channel{Source: r.appEvents}, &handler.EnqueueRequestForObject{}). + WatchesRawSource(&source.Channel{Source: r.caches.appEvents()}, &handler.EnqueueRequestForObject{}). Complete(r) } diff --git a/controllers/leveltriggered/controller_remote_test.go b/controllers/leveltriggered/controller_remote_test.go index 26b939f..d98e099 100644 --- a/controllers/leveltriggered/controller_remote_test.go +++ b/controllers/leveltriggered/controller_remote_test.go @@ -33,6 +33,7 @@ func TestRemoteTargets(t *testing.T) { if err != nil { t.Error("starting leaf test env failed", err) } + envsToStop = append(envsToStop, leafEnv) user, err := leafEnv.ControlPlane.AddUser(envtest.User{ Name: "leaf-admin", diff --git a/controllers/leveltriggered/runner.go b/controllers/leveltriggered/runner.go index 4d61fd6..5973b01 100644 --- a/controllers/leveltriggered/runner.go +++ b/controllers/leveltriggered/runner.go @@ -1,47 +1,60 @@ package leveltriggered -import "context" +import ( + "context" + "github.com/go-logr/logr" + "sync" +) -// This is a dead simple way to run things using a manager's context as a base, so that they will -// get shut down when the manager does. It must be constructed with `newRunner`, and added to a manager: +// ctrl.Manager makes sure everything that is `Add`ed is started with a context, and cancels the context +// to signal shutdown. But: all the runnables added share the same context, so they all get shut down at +// the same time. // -// r := newRunner() +// `runner` is a dead simple way to run things with their own context, using a manager's context as a +// base, so that they will get shut down when the manager does _and_ you can shut them down individually. +// It must be constructed with `newRunner`, and added to a manager: +// +// r := newRunner(logger) // mgr.Add(r) // // then you can use it to run funcs: // -// cancel := r.run(func(context.Context)) +// cancel := r.run(string, func(context.Context)) // -// The func will be run with its own context derived from the root context supplied by the manager, +// The func will be run with its own context, derived from the root context supplied by the manager, // with the cancel func returned to the caller as shown. This way you can cancel the context yourself, // or let it be canceled when the manager shuts down. // // It'll deadlock if you call `run` before adding it to a manager (or otherwise calling `Start`). type runWithContext struct { - ctx context.Context - run func(context.Context) + name string + ctx context.Context + do func(context.Context) } type runner struct { + log logr.Logger rootContext context.Context tostart chan runWithContext ready chan struct{} } -func newRunner() *runner { +func newRunner(log logr.Logger) *runner { return &runner{ + log: log, tostart: make(chan runWithContext), ready: make(chan struct{}), } } -func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc { +func (r *runner) run(name string, fn func(ctx context.Context)) context.CancelFunc { <-r.ready // wait until there's a root context ctx, cancel := context.WithCancel(r.rootContext) r.tostart <- runWithContext{ - run: fn, - ctx: ctx, + name: name, + do: fn, + ctx: ctx, } return cancel } @@ -51,12 +64,24 @@ func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc { func (r *runner) Start(ctx context.Context) error { r.rootContext = ctx close(r.ready) // broadcast that things can be run + var wg sync.WaitGroup +loop: for { select { case randc := <-r.tostart: - go randc.run(randc.ctx) + r.log.Info("starting child", "name", randc.name) + wg.Add(1) + go func(rc runWithContext) { + defer wg.Done() + rc.do(rc.ctx) + r.log.Info("child exited", "name", rc.name) + }(randc) case <-r.rootContext.Done(): - return nil + break loop } } + r.log.Info("Stopping and waiting for children") + wg.Wait() + r.log.Info("All children stopped; runner exit") + return nil } diff --git a/controllers/leveltriggered/suite_test.go b/controllers/leveltriggered/suite_test.go index d6bea58..2dcb801 100644 --- a/controllers/leveltriggered/suite_test.go +++ b/controllers/leveltriggered/suite_test.go @@ -34,6 +34,8 @@ var kubeConfig []byte var eventRecorder *testEventRecorder var pipelineReconciler *PipelineReconciler +var envsToStop []*envtest.Environment + type testEvent struct { object runtime.Object eventType string @@ -100,6 +102,7 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("starting test env failed: %s", err) } + envsToStop = append(envsToStop, testEnv) user, err := testEnv.ControlPlane.AddUser(envtest.User{ Name: "envtest-admin", @@ -176,11 +179,20 @@ func TestMain(m *testing.M) { cancel() wg.Wait() + log.Println("manager exited") - err = testEnv.Stop() - if err != nil { - log.Fatalf("stoping test env failed: %s", err) + var failedToStopEnvs bool + for _, env := range envsToStop { + err = env.Stop() + if err != nil { + failedToStopEnvs = true + log.Printf("stopping test env failed: %s\n", err) + } + } + if failedToStopEnvs { + log.Fatalf("failed to stop all test envs") } + log.Println("test envs stopped") os.Exit(retCode) } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 253523a..05bdf5f 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -163,7 +163,7 @@ func TestMain(m *testing.M) { err = testEnv.Stop() if err != nil { - log.Fatalf("stoping test env failed: %s", err) + log.Fatalf("stopping test env failed: %s", err) } os.Exit(retCode)