Skip to content

Commit

Permalink
Ensure given event broadcaster is used by manager
Browse files Browse the repository at this point in the history
Fixes an issue where the event broadcaster provided to
'manager.NewManager()' was not being used.
  • Loading branch information
alecmerdler committed Jan 17, 2025
1 parent 8224053 commit 96526e6
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
74 changes: 73 additions & 1 deletion manager/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package manager
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
ctrlmanageropts "k8s.io/controller-manager/options"
Expand Down Expand Up @@ -52,7 +56,7 @@ func TestControllerQueueDone(t *testing.T) {
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")}
eventSink := newFakeEventSink()

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
Expand Down Expand Up @@ -81,3 +85,71 @@ func TestControllerQueueDone(t *testing.T) {
return controller.Queue.Len() == 0
}, 1*time.Second, 1*time.Millisecond)
}

func TestControllerEventsBroadcast(t *testing.T) {
gvr := schema.GroupVersionResource{
Group: "example.com",
Version: "v1",
Resource: "mytypes",
}
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := newFakeEventSink()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "my-controller"})

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
})

mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":8888", broadcaster, eventSink)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = mgr.Start(ctx, controller)
}()
require.Eventually(t, healthCheckPassing(), 1*time.Second, 50*time.Millisecond)

recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test")

require.Eventually(t, func() bool {
return len(eventSink.Events) > 0
}, 5*time.Second, 1*time.Millisecond)
}

type fakeEventSink struct {
Events map[types.UID]*v1.Event
}

func newFakeEventSink() *fakeEventSink {
return &fakeEventSink{
Events: make(map[types.UID]*v1.Event),
}
}

func (f *fakeEventSink) Create(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Update(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Patch(oldEvent *v1.Event, _ []byte) (*v1.Event, error) {
f.Events[oldEvent.UID] = oldEvent
return oldEvent, nil
}

func healthCheckPassing() func() bool {
return func() bool {
resp, err := http.Get("http://localhost:8888/healthz")
if err != nil {
return false
}
defer resp.Body.Close()

return resp.StatusCode == http.StatusOK
}
}
6 changes: 2 additions & 4 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {
}
m.RUnlock()

broadcaster := record.NewBroadcaster()

m.once.Do(func() {
m.Lock()
m.errG, ctx = errgroup.WithContext(ctx)
Expand All @@ -88,8 +86,8 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {

// start broadcaster
m.errG.Go(func() error {
broadcaster.StartStructuredLogging(2)
broadcaster.StartRecordingToSink(m.sink)
m.broadcaster.StartStructuredLogging(2)
m.broadcaster.StartRecordingToSink(m.sink)
return nil
})

Expand Down

0 comments on commit 96526e6

Please sign in to comment.