Skip to content

Commit

Permalink
Switch to builder focused cluster support
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
  • Loading branch information
sttts committed Apr 23, 2024
1 parent d023d3e commit 4d8e79e
Show file tree
Hide file tree
Showing 13 changed files with 452 additions and 373 deletions.
272 changes: 193 additions & 79 deletions pkg/builder/controller.go

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ type Controller struct {
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// WatchProviderClusters indicates whether the controller should
// only watch clusters that are engaged by the cluster provider. Defaults to false
// if no provider is set, and to true if a provider is set.
WatchProviderClusters *bool
// EngageWithDefaultCluster indicates whether the controller should engage
// with the default cluster. This default to false if a cluster provider
// is configured, and to true otherwise.
//
// This is an experimental feature and is subject to change.
EngageWithDefaultCluster *bool

// EngageWithProvidedClusters indicates whether the controller should engage
// with the provided clusters of the manager. This defaults to true if a
// cluster provider is set, and to false otherwise.
EngageWithProviderClusters *bool
}
24 changes: 10 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -64,10 +62,16 @@ type Options struct {
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger

// WatchProviderClusters indicates whether the controller should
// only watch clusters that are engaged by the cluster provider. Defaults to false
// if no provider is set, and to true if a provider is set.
WatchProviderClusters *bool
// EngageWithDefaultCluster indicates whether the controller should engage
// with the default cluster of a manager. This defaults to false through the
// global controller options of the manager if a cluster provider is set,
// and to true otherwise. Here it can be overridden.
EngageWithDefaultCluster *bool
// EngageWithProvidedClusters indicates whether the controller should engage
// with the provided clusters of a manager. This defaults to true through the
// global controller options of the manager if a cluster provider is set,
// and to false otherwise. Here it can be overridden.
EngageWithProviderClusters *bool
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -161,13 +165,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection
}

if options.WatchProviderClusters == nil {
options.WatchProviderClusters = mgr.GetControllerOptions().WatchProviderClusters
if options.WatchProviderClusters == nil { // should never happen
options.WatchProviderClusters = ptr.To(false)
}
}

// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
Expand All @@ -182,7 +179,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
WatchProviderClusters: *options.WatchProviderClusters,
}, nil
}

Expand Down
106 changes: 106 additions & 0 deletions pkg/controller/multicluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"sync"

"sigs.k8s.io/controller-runtime/pkg/cluster"
)

// MultiClusterController is a Controller that is aware of the Cluster it is
// running in. It engage and disengage clusters dynamically, starting the
// watches and stopping them.
type MultiClusterController interface {
cluster.AwareRunnable
Controller
}

// ClusterWatcher starts watches for a given Cluster. The ctx should be
// used to cancel the watch when the Cluster is disengaged.
type ClusterWatcher interface {
Watch(ctx context.Context, cl cluster.Cluster) error
}

// NewMultiClusterController creates a new MultiClusterController for the given
// controller with the given ClusterWatcher.
func NewMultiClusterController(c Controller, watcher ClusterWatcher) MultiClusterController {
return &multiClusterController{
Controller: c,
watcher: watcher,
clusters: map[string]struct{}{},
}
}

type multiClusterController struct {
Controller
watcher ClusterWatcher

lock sync.Mutex
clusters map[string]struct{}
}

// Engage gets called when the runnable should start operations for the given Cluster.
func (c *multiClusterController) Engage(clusterCtx context.Context, cl cluster.Cluster) error {
c.lock.Lock()
defer c.lock.Unlock()

if _, ok := c.clusters[cl.Name()]; ok {
return nil
}

// pass through in case the controller itself is cluster aware
if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok {
if err := ctrl.Engage(clusterCtx, cl); err != nil {
return err
}
}

// start watches on the cluster
if err := c.watcher.Watch(clusterCtx, cl); err != nil {
if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok {
if err := ctrl.Disengage(clusterCtx, cl); err != nil {
return err
}
}
return err
}
c.clusters[cl.Name()] = struct{}{}

return nil
}

// Disengage gets called when the runnable should stop operations for the given Cluster.
func (c *multiClusterController) Disengage(ctx context.Context, cl cluster.Cluster) error {
c.lock.Lock()
defer c.lock.Unlock()

if _, ok := c.clusters[cl.Name()]; !ok {
return nil
}
delete(c.clusters, cl.Name())

// pass through in case the controller itself is cluster aware
if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok {
if err := ctrl.Disengage(ctx, cl); err != nil {
return err
}
}

return nil
}
82 changes: 82 additions & 0 deletions pkg/handler/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"context"
"time"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// ForCluster wraps an EventHandler and adds the cluster name to the reconcile.Requests.
func ForCluster(clusterName string, h EventHandler) EventHandler {
return &clusterAwareHandler{
clusterName: clusterName,
handler: h,
}
}

type clusterAwareHandler struct {
handler EventHandler
clusterName string
}

var _ EventHandler = &clusterAwareHandler{}

func (c *clusterAwareHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
c.handler.Create(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName})
}

func (c *clusterAwareHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
c.handler.Update(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName})
}

func (c *clusterAwareHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
c.handler.Delete(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName})
}

func (c *clusterAwareHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
c.handler.Generic(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName})
}

// clusterWorkqueue is a wrapper around a RateLimitingInterface that adds the
// cluster name to the reconcile.Requests
type clusterWorkqueue struct {
workqueue.RateLimitingInterface
clusterName string
}

func (q *clusterWorkqueue) AddAfter(item interface{}, duration time.Duration) {
req := item.(reconcile.Request)
req.ClusterName = q.clusterName
q.RateLimitingInterface.AddAfter(req, duration)
}

func (q *clusterWorkqueue) Add(item interface{}) {
req := item.(reconcile.Request)
req.ClusterName = q.clusterName
q.RateLimitingInterface.Add(req)
}

func (q *clusterWorkqueue) AddRateLimited(item interface{}) {
req := item.(reconcile.Request)
req.ClusterName = q.clusterName
q.RateLimitingInterface.AddRateLimited(req)
}
82 changes: 21 additions & 61 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -36,53 +35,33 @@ var _ EventHandler = &EnqueueRequestForObject{}
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type EnqueueRequestForObject struct {
cluster cluster.Cluster
}
type EnqueueRequestForObject struct{}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
var clusterName string
if e.cluster != nil {
clusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
ClusterName: clusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
},
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
var clusterName string
if e.cluster != nil {
clusterName = e.cluster.Name()
}

switch {
case evt.ObjectNew != nil:
q.Add(reconcile.Request{
ClusterName: clusterName,
NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
},
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
case evt.ObjectOld != nil:
q.Add(reconcile.Request{
ClusterName: clusterName,
NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
},
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
Expand All @@ -94,17 +73,10 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
var clusterName string
if e.cluster != nil {
clusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
ClusterName: clusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
},
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}

// Generic implements EventHandler.
Expand All @@ -113,20 +85,8 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
}
var clusterName string
if e.cluster != nil {
clusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
ClusterName: clusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
},
})
}

// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler].
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
return &EnqueueRequestForObject{cluster: c}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
Loading

0 comments on commit 4d8e79e

Please sign in to comment.