From 96526e6f1f38195441e3f0f57377ac0281d46cbf Mon Sep 17 00:00:00 2001 From: Alec Merdler Date: Thu, 16 Jan 2025 23:45:40 -0500 Subject: [PATCH] Ensure given event broadcaster is used by manager Fixes an issue where the event broadcaster provided to 'manager.NewManager()' was not being used. --- manager/controller_test.go | 74 +++++++++++++++++++++++++++++++++++++- manager/manager.go | 6 ++-- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/manager/controller_test.go b/manager/controller_test.go index 12a687f..35e2836 100644 --- a/manager/controller_test.go +++ b/manager/controller_test.go @@ -3,12 +3,16 @@ package manager import ( "context" "fmt" + "net/http" "testing" "time" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" ctrlmanageropts "k8s.io/controller-manager/options" @@ -52,7 +56,7 @@ func TestControllerQueueDone(t *testing.T) { CtxQueue := queue.NewQueueOperationsCtx() registry := typed.NewRegistry() broadcaster := record.NewBroadcaster() - eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")} + eventSink := newFakeEventSink() controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) { fmt.Println("processing", gvr, namespace, name) @@ -81,3 +85,71 @@ func TestControllerQueueDone(t *testing.T) { return controller.Queue.Len() == 0 }, 1*time.Second, 1*time.Millisecond) } + +func TestControllerEventsBroadcast(t *testing.T) { + gvr := schema.GroupVersionResource{ + Group: "example.com", + Version: "v1", + Resource: "mytypes", + } + CtxQueue := queue.NewQueueOperationsCtx() + registry := typed.NewRegistry() + broadcaster := record.NewBroadcaster() + eventSink := newFakeEventSink() + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "my-controller"}) + + controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) { + fmt.Println("processing", gvr, namespace, name) + }) + + mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":8888", broadcaster, eventSink) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = mgr.Start(ctx, controller) + }() + require.Eventually(t, healthCheckPassing(), 1*time.Second, 50*time.Millisecond) + + recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test") + + require.Eventually(t, func() bool { + return len(eventSink.Events) > 0 + }, 5*time.Second, 1*time.Millisecond) +} + +type fakeEventSink struct { + Events map[types.UID]*v1.Event +} + +func newFakeEventSink() *fakeEventSink { + return &fakeEventSink{ + Events: make(map[types.UID]*v1.Event), + } +} + +func (f *fakeEventSink) Create(event *v1.Event) (*v1.Event, error) { + f.Events[event.UID] = event + return event, nil +} + +func (f *fakeEventSink) Update(event *v1.Event) (*v1.Event, error) { + f.Events[event.UID] = event + return event, nil +} + +func (f *fakeEventSink) Patch(oldEvent *v1.Event, _ []byte) (*v1.Event, error) { + f.Events[oldEvent.UID] = oldEvent + return oldEvent, nil +} + +func healthCheckPassing() func() bool { + return func() bool { + resp, err := http.Get("http://localhost:8888/healthz") + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK + } +} diff --git a/manager/manager.go b/manager/manager.go index 8e786be..eeafefb 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -68,8 +68,6 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error { } m.RUnlock() - broadcaster := record.NewBroadcaster() - m.once.Do(func() { m.Lock() m.errG, ctx = errgroup.WithContext(ctx) @@ -88,8 +86,8 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error { // start broadcaster m.errG.Go(func() error { - broadcaster.StartStructuredLogging(2) - broadcaster.StartRecordingToSink(m.sink) + m.broadcaster.StartStructuredLogging(2) + m.broadcaster.StartRecordingToSink(m.sink) return nil })