Skip to content

Commit

Permalink
Enhancement: Refactor controller syncHandler() method #206
Browse files Browse the repository at this point in the history
  • Loading branch information
senthilrch committed Mar 11, 2023
1 parent 84222ce commit 8ce77f3
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 118 deletions.
231 changes: 114 additions & 117 deletions cmd/controller/app/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,6 @@ func (c *Controller) runRefreshWorker() {
// converge the two. It then updates the Status block of the ImageCache resource
// with the current status of the resource.
func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
status := &v1alpha2.ImageCacheStatus{
Failures: map[string]v1alpha2.NodeReasonMessageList{},
}

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(wqKey.ObjKey)
Expand All @@ -444,72 +441,28 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
return err
}

// Get the ImageCache resource with this namespace/name
imageCache, err := c.imageCachesLister.ImageCaches(namespace).Get(name)
if err != nil {
// The ImageCache resource may no longer exist, in which case we stop
// processing.
glog.Errorf("Error getting imagecache(%s): %v", name, err)
return err
}

glog.Infof("Starting to sync image cache %s(%s)", name, wqKey.WorkType)

switch wqKey.WorkType {
case images.ImageCacheCreate, images.ImageCacheUpdate, images.ImageCacheRefresh, images.ImageCachePurge:

startTime := metav1.Now()
status.StartTime = &startTime
// Get the ImageCache resource with this namespace/name
imageCache, err := c.imageCachesLister.ImageCaches(namespace).Get(name)
if err != nil {
// The ImageCache resource may no longer exist, in which case we stop
// processing.
glog.Errorf("Error getting imagecache(%s): %v", name, err)
if err = c.updateImageCacheStatus(imageCache, c.createStatusProcessing(wqKey, imageCache)); err != nil {
glog.Errorf("Error updating imagecache status : %v", err)
return err
}

if wqKey.WorkType == images.ImageCacheUpdate && wqKey.OldImageCache == nil {
status.Status = v1alpha2.ImageCacheActionStatusFailed
status.Reason = v1alpha2.ImageCacheReasonOldImageCacheNotFound
status.Message = v1alpha2.ImageCacheMessageOldImageCacheNotFound

if err := c.updateImageCacheStatus(imageCache, status); err != nil {
glog.Errorf("Error updating imagecache status to %s: %v", status.Status, err)
return err
}
glog.Errorf("%s: %s", v1alpha2.ImageCacheReasonOldImageCacheNotFound, v1alpha2.ImageCacheMessageOldImageCacheNotFound)
return fmt.Errorf("%s: %s", v1alpha2.ImageCacheReasonOldImageCacheNotFound, v1alpha2.ImageCacheMessageOldImageCacheNotFound)
}

cacheSpec := imageCache.Spec.CacheSpec
glog.V(4).Infof("cacheSpec: %+v", cacheSpec)
var nodes []*corev1.Node

status.Status = v1alpha2.ImageCacheActionStatusProcessing

if wqKey.WorkType == images.ImageCacheCreate {
status.Reason = v1alpha2.ImageCacheReasonImageCacheCreate
status.Message = v1alpha2.ImageCacheMessagePullingImages
}

if wqKey.WorkType == images.ImageCacheUpdate {
status.Reason = v1alpha2.ImageCacheReasonImageCacheUpdate
status.Message = v1alpha2.ImageCacheMessageUpdatingCache
}

if wqKey.WorkType == images.ImageCacheRefresh {
status.Reason = v1alpha2.ImageCacheReasonImageCacheRefresh
status.Message = v1alpha2.ImageCacheMessageRefreshingCache
}

if wqKey.WorkType == images.ImageCachePurge {
status.Reason = v1alpha2.ImageCacheReasonImageCachePurge
status.Message = v1alpha2.ImageCacheMessagePurgeCache
}

imageCache, err = c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Error getting imagecache(%s) from api server: %v", name, err)
return err
}

if err = c.updateImageCacheStatus(imageCache, status); err != nil {
glog.Errorf("Error updating imagecache status to %s: %v", status.Status, err)
return err
}

for k, i := range cacheSpec {
if len(i.NodeSelector) > 0 {
if nodes, err = c.nodesLister.List(labels.Set(i.NodeSelector).AsSelector()); err != nil {
Expand Down Expand Up @@ -538,6 +491,11 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
}
}
if wqKey.WorkType == images.ImageCacheUpdate {

if wqKey.OldImageCache == nil {
return fmt.Errorf("OldImageCacheNotFound")
}

for _, oldimage := range wqKey.OldImageCache.Spec.CacheSpec[k].Images {
matched := false
for _, newimage := range i.Images {
Expand Down Expand Up @@ -586,64 +544,14 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {

case images.ImageCacheStatusUpdate:
glog.V(4).Infof("wqKey.Status = %+v", wqKey.Status)
// Finally, we update the status block of the ImageCache resource to reflect the
// current state of the world
// Get the ImageCache resource with this namespace/name
imageCache, err := c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Error getting image cache %s: %v", name, err)
return err
}

if imageCache.Status.StartTime != nil {
status.StartTime = imageCache.Status.StartTime
}

status.Status = v1alpha2.ImageCacheActioneNoImagesPulledOrDeleted
status.Reason = imageCache.Status.Reason
status.Message = v1alpha2.ImageCacheMessageNoImagesPulledOrDeleted

failures := false
for _, v := range *wqKey.Status {
if (v.Status == images.ImageWorkResultStatusSucceeded || v.Status == images.ImageWorkResultStatusAlreadyPulled) && !failures {
status.Status = v1alpha2.ImageCacheActionStatusSucceeded
if v.ImageWorkRequest.WorkType == images.ImageCachePurge {
status.Message = v1alpha2.ImageCacheMessageImagesDeletedSuccessfully
} else {
status.Message = v1alpha2.ImageCacheMessageImagesPulledSuccessfully
}
}
if (v.Status == images.ImageWorkResultStatusFailed || v.Status == images.ImageWorkResultStatusUnknown) && !failures {
failures = true
status.Status = v1alpha2.ImageCacheActionStatusFailed
if v.ImageWorkRequest.WorkType == images.ImageCachePurge {
status.Message = v1alpha2.ImageCacheMessageImageDeleteFailedForSomeImages
} else {
status.Message = v1alpha2.ImageCacheMessageImagePullFailedForSomeImages
}
}
if v.Status == images.ImageWorkResultStatusFailed || v.Status == images.ImageWorkResultStatusUnknown {
status.Failures[v.ImageWorkRequest.Image] = append(
status.Failures[v.ImageWorkRequest.Image], v1alpha2.NodeReasonMessage{
Node: v.ImageWorkRequest.Node.Labels["kubernetes.io/hostname"],
Reason: v.Reason,
Message: v.Message,
})
}
}

err = c.updateImageCacheStatus(imageCache, status)
err = c.updateImageCacheStatus(imageCache, c.createStatusFinal(wqKey, imageCache))
if err != nil {
glog.Errorf("Error updating ImageCache status: %v", err)
return err
}

if imageCache.Status.Reason == v1alpha2.ImageCacheReasonImageCachePurge || imageCache.Status.Reason == v1alpha2.ImageCacheReasonImageCacheRefresh {
imageCache, err := c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Error getting image cache %s: %v", name, err)
return err
}
if imageCache.Status.Reason == v1alpha2.ImageCacheReasonImageCachePurge {
if err := c.removeAnnotation(imageCache, imageCachePurgeAnnotationKey); err != nil {
glog.Errorf("Error removing Annotation %s from imagecache(%s): %v", imageCachePurgeAnnotationKey, imageCache.Name, err)
Expand All @@ -660,13 +568,6 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
}
}

if status.Status == v1alpha2.ImageCacheActionStatusSucceeded || status.Status == v1alpha2.ImageCacheActioneNoImagesPulledOrDeleted {
c.recorder.Event(imageCache, corev1.EventTypeNormal, status.Reason, status.Message)
}

if status.Status == v1alpha2.ImageCacheActionStatusFailed {
c.recorder.Event(imageCache, corev1.EventTypeWarning, status.Reason, status.Message)
}
}
glog.Infof("Completed sync actions for image cache %s(%s)", name, wqKey.WorkType)
return nil
Expand All @@ -691,15 +592,111 @@ func (c *Controller) updateImageCacheStatus(imageCache *v1alpha2.ImageCache, sta
// UpdateStatus will not allow changes to the Spec of the resource,
// which is ideal for ensuring nothing other than resource status has been updated.
_, err = c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(imageCache.Namespace).Update(context.TODO(), imageCacheCopy, metav1.UpdateOptions{})

if status.Status == v1alpha2.ImageCacheActionStatusSucceeded || status.Status == v1alpha2.ImageCacheActioneNoImagesPulledOrDeleted {
c.recorder.Event(imageCache, corev1.EventTypeNormal, status.Reason, status.Message)
}

if status.Status == v1alpha2.ImageCacheActionStatusFailed {
c.recorder.Event(imageCache, corev1.EventTypeWarning, status.Reason, status.Message)
}
return err
}

func (c *Controller) removeAnnotation(imageCache *v1alpha2.ImageCache, annotationKey string) error {
imageCache, err := c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(imageCache.Namespace).Get(context.TODO(), imageCache.Name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Error getting image cache %s: %v", imageCache.Name, err)
return err
}

imageCacheCopy := imageCache.DeepCopy()
delete(imageCacheCopy.Annotations, annotationKey)
_, err := c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(imageCache.Namespace).Update(context.TODO(), imageCacheCopy, metav1.UpdateOptions{})

_, err = c.kubefledgedclientset.KubefledgedV1alpha2().ImageCaches(imageCache.Namespace).Update(context.TODO(), imageCacheCopy, metav1.UpdateOptions{})
if err == nil {
glog.Infof("Annotation %s removed from imagecache(%s)", annotationKey, imageCache.Name)
}
return err
}

func (c *Controller) createStatusProcessing(wqKey images.WorkQueueKey, imageCache *v1alpha2.ImageCache) *v1alpha2.ImageCacheStatus {
status := &v1alpha2.ImageCacheStatus{
Failures: map[string]v1alpha2.NodeReasonMessageList{},
}

startTime := metav1.Now()
status.StartTime = &startTime
status.Status = v1alpha2.ImageCacheActionStatusProcessing

if wqKey.WorkType == images.ImageCacheCreate {
status.Reason = v1alpha2.ImageCacheReasonImageCacheCreate
status.Message = v1alpha2.ImageCacheMessagePullingImages
}

if wqKey.WorkType == images.ImageCacheRefresh {
status.Reason = v1alpha2.ImageCacheReasonImageCacheRefresh
status.Message = v1alpha2.ImageCacheMessageRefreshingCache
}

if wqKey.WorkType == images.ImageCachePurge {
status.Reason = v1alpha2.ImageCacheReasonImageCachePurge
status.Message = v1alpha2.ImageCacheMessagePurgeCache
}

if wqKey.WorkType == images.ImageCacheUpdate {
status.Reason = v1alpha2.ImageCacheReasonImageCacheUpdate
status.Message = v1alpha2.ImageCacheMessageUpdatingCache

if wqKey.OldImageCache == nil {
status.Status = v1alpha2.ImageCacheActionStatusFailed
status.Reason = v1alpha2.ImageCacheReasonOldImageCacheNotFound
status.Message = v1alpha2.ImageCacheMessageOldImageCacheNotFound
}
}
return status
}

func (c *Controller) createStatusFinal(wqKey images.WorkQueueKey, imageCache *v1alpha2.ImageCache) *v1alpha2.ImageCacheStatus {
status := &v1alpha2.ImageCacheStatus{
Failures: map[string]v1alpha2.NodeReasonMessageList{},
}

if imageCache.Status.StartTime != nil {
status.StartTime = imageCache.Status.StartTime
}

status.Status = v1alpha2.ImageCacheActioneNoImagesPulledOrDeleted
status.Reason = imageCache.Status.Reason
status.Message = v1alpha2.ImageCacheMessageNoImagesPulledOrDeleted

failures := false
for _, v := range *wqKey.Status {
if (v.Status == images.ImageWorkResultStatusSucceeded || v.Status == images.ImageWorkResultStatusAlreadyPulled) && !failures {
status.Status = v1alpha2.ImageCacheActionStatusSucceeded
if v.ImageWorkRequest.WorkType == images.ImageCachePurge {
status.Message = v1alpha2.ImageCacheMessageImagesDeletedSuccessfully
} else {
status.Message = v1alpha2.ImageCacheMessageImagesPulledSuccessfully
}
}
if (v.Status == images.ImageWorkResultStatusFailed || v.Status == images.ImageWorkResultStatusUnknown) && !failures {
failures = true
status.Status = v1alpha2.ImageCacheActionStatusFailed
if v.ImageWorkRequest.WorkType == images.ImageCachePurge {
status.Message = v1alpha2.ImageCacheMessageImageDeleteFailedForSomeImages
} else {
status.Message = v1alpha2.ImageCacheMessageImagePullFailedForSomeImages
}
}
if v.Status == images.ImageWorkResultStatusFailed || v.Status == images.ImageWorkResultStatusUnknown {
status.Failures[v.ImageWorkRequest.Image] = append(
status.Failures[v.ImageWorkRequest.Image], v1alpha2.NodeReasonMessage{
Node: v.ImageWorkRequest.Node.Labels["kubernetes.io/hostname"],
Reason: v.Reason,
Message: v.Message,
})
}
}
return status
}
2 changes: 1 addition & 1 deletion cmd/controller/app/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func TestSyncHandler(t *testing.T) {
ObjKey: "foo/bar/car",
},
expectErr: true,
expectedErrString: "unexpected key format",
expectedErrString: "unexpected key format:",
},
/*{
name: "#2: Create - Invalid imagecache spec (no images specified)",
Expand Down

0 comments on commit 8ce77f3

Please sign in to comment.