From 086933464a00b06d24b54211370b34d630e7bdeb Mon Sep 17 00:00:00 2001 From: Fiona Waters Date: Thu, 23 Nov 2023 12:48:47 +0000 Subject: [PATCH] Added error handling and logging --- cmd/kar-controllers/app/options/options.go | 8 + cmd/kar-controllers/app/server.go | 8 +- pkg/apis/controller/utils/utils.go | 4 +- .../queuejob/queuejob_controller_ex.go | 189 +++++++++++------- pkg/controller/queuejob/scheduling_queue.go | 47 ++++- .../queuejobdispatch/queuejobagent.go | 20 +- .../genericresource/genericresource.go | 66 ++++-- .../quotasubtmgr/quota_subtree_manager.go | 1 + .../quota-manager/quota/consumerinfo.go | 4 + .../quota-manager/quota/core/allocation.go | 7 +- .../quota/core/forestconsumer.go | 4 +- .../quota-manager/quota/core/quotanode.go | 7 +- .../quota-manager/quota/core/treecache.go | 5 +- .../quota-manager/quota/quotamanager.go | 10 + 14 files changed, 268 insertions(+), 112 deletions(-) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 7d42288cc..11c6b7cb7 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -21,6 +21,8 @@ import ( "os" "strconv" "strings" + + "k8s.io/klog/v2" ) // ServerOption is the main context object for the controller manager. @@ -96,6 +98,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() { backoffInt, err := strconv.Atoi(backoffString) if err == nil { s.BackoffTime = backoffInt + } else { + klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err) } } @@ -105,6 +109,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() { holInt, err := strconv.Atoi(holString) if err == nil { s.HeadOfLineHoldingTime = holInt + } else { + klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err) } } @@ -126,6 +132,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() { to, err := strconv.ParseInt(dispatchResourceReservationTimeoutString, 10, 64) if err == nil { s.DispatchResourceReservationTimeout = to + } else { + klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err) } } } diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 4deb4ac14..a09e7b968 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "fmt" "net/http" "strings" @@ -42,7 +43,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { func Run(opt *options.ServerOption) error { restConfig, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { - return err + return fmt.Errorf("[Run] unable to build server config, - error: %#v", err) } neverStop := make(chan struct{}) @@ -71,7 +72,8 @@ func Run(opt *options.ServerOption) error { // This call is blocking (unless an error occurs) which equates to <-neverStop err = listenHealthProbe(opt) if err != nil { - return err + return fmt.Errorf("[Run] unable to start health probe listener, - error: %#v", err) + } return nil @@ -83,7 +85,7 @@ func listenHealthProbe(opt *options.ServerOption) error { handler.Handle("/healthz", &health.Handler{}) err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) if err != nil { - return err + return fmt.Errorf("[listenHealthProbe] unable to listen and serve, - error: %#v", err) } return nil diff --git a/pkg/apis/controller/utils/utils.go b/pkg/apis/controller/utils/utils.go index ef0986c6b..6c7627bac 100644 --- a/pkg/apis/controller/utils/utils.go +++ b/pkg/apis/controller/utils/utils.go @@ -21,11 +21,13 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" ) func GetController(obj interface{}) types.UID { accessor, err := meta.Accessor(obj) if err != nil { + klog.Errorf("[GetController] unable to return object as minimum required fields are missing, - error: %#v", err) return "" } @@ -37,10 +39,10 @@ func GetController(obj interface{}) types.UID { return "" } - func GetJobID(pod *v1.Pod) types.UID { accessor, err := meta.Accessor(pod) if err != nil { + klog.Errorf("[GetJobID] unable to return object as minimum required fields are missing, - error: %#v", err) return "" } diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index da79ee341..37f5a5aef 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -135,7 +135,7 @@ type JobAndClusterAgent struct { func GetQueueJobKey(obj interface{}) (string, error) { qj, ok := obj.(*arbv1.AppWrapper) if !ok { - return "", fmt.Errorf("not a AppWrapper") + return "", fmt.Errorf("Error - not an AppWrapper") } return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil @@ -147,7 +147,7 @@ func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error { labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name) pods, errt := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) if errt != nil { - return errt + return fmt.Errorf("Unable to list pods, error: %#v", errt) } running := int32(FilterPods(pods.Items, v1.PodRunning)) @@ -206,7 +206,7 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { // TODO: when no pods are listed, do we send entire node capacity as available // this will cause false positive dispatch. if err != nil { - klog.Errorf("[allocatableCapacity] Error listing pods %v", err) + klog.Errorf("[allocatableCapacity] Error listing pods %#v", err) } for _, pod := range podList.Items { if _, ok := pod.GetLabels()["appwrappers.mcad.ibm.com"]; !ok && pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded { @@ -246,7 +246,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat appWrapperClient, err := clientset.NewForConfig(restConfig) if err != nil { - klog.Fatalf("Could not instantiate k8s client, err=%v", err) + klog.Fatalf("Could not instantiate k8s client, error: %#v", err) } cc.appwrapperInformer = informerFactory.NewSharedInformerFactory(appWrapperClient, 0).Workload().V1beta1().AppWrappers() cc.appwrapperInformer.Informer().AddEventHandler( @@ -281,7 +281,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat cc.quotaManager, err = quotaforestmanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.appWrapperLister, restConfig, mcadConfig) if err != nil { - klog.Error("Failed to instantiate quota manager: %#v", err) + klog.Errorf("Failed to instantiate quota manager, error: %#v", err) return nil } } else { @@ -333,7 +333,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { var message string newjob, err := qjm.getAppWrapper(aw.Namespace, aw.Name, "[PreemptQueueJobs] get fresh app wrapper") if err != nil { - klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) + klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', error: %#v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) return } // we need to update AW before analyzing it as a candidate for preemption @@ -436,7 +436,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { if generatedCondition { err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") if err != nil { - klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) + klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper error: %#v", newjob.Namespace, newjob.Name, err) return } @@ -466,7 +466,7 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A klog.Warningf("[preemptAWJobs] App wrapper '%s/%s' was not found when getting a fresh copy. ", aw.Namespace, aw.Name) continue } - klog.Errorf("[preemptAWJobs] Failed to get AppWrapper to from API Cache %s/%s: err = %v", + klog.Errorf("[preemptAWJobs] Failed to get AppWrapper to from API Cache %s/%s: error: %#v", aw.Namespace, aw.Name, err) continue } @@ -477,7 +477,7 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A klog.Warningf("[preemptAWJobs] App wrapper '%s/%s' was not found when updating status. ", aw.Namespace, aw.Name) continue } - klog.Warningf("[preemptAWJobs] status update for '%s/%s' failed, err=%v", aw.Namespace, aw.Name, err) + klog.Warningf("[preemptAWJobs] status update for '%s/%s' failed, error: %#v", aw.Namespace, aw.Name, err) } } } @@ -566,7 +566,10 @@ func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapp // Get all pods and related resources for _, genericItem := range cqj.Spec.AggrResources.GenericItems { - itemsList, _ := genericresource.GetListOfPodResourcesFromOneGenericItem(&genericItem) + itemsList, err := genericresource.GetListOfPodResourcesFromOneGenericItem(&genericItem) + if err != nil { + klog.Errorf("[GetAggregatedResourcesPerGenericItem] Unable to get list of pod resources from one generic item - error: %#v", err) + } for i := 0; i < len(itemsList); i++ { retVal = append(retVal, itemsList[i]) } @@ -587,7 +590,7 @@ func (qjm *XController) getAppWrapperCompletionStatus(caw *arbv1.AppWrapper) arb unstruct.Object = make(map[string]interface{}) var blob interface{} if err := jsons.Unmarshal(objectName.Raw, &blob); err != nil { - klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling, err=%#v", err) + klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling, error: %#v", err) } unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling name := "" @@ -635,7 +638,7 @@ func (qjm *XController) GetAggregatedResources(cqj *arbv1.AppWrapper) *clusterst for _, genericItem := range cqj.Spec.AggrResources.GenericItems { qjv, err := genericresource.GetResources(&genericItem) if err != nil { - klog.V(8).Infof("[GetAggregatedResources] Failure aggregating resources for %s/%s, err=%#v, genericItem=%#v", + klog.Errorf("[GetAggregatedResources] Failure aggregating resources for %s/%s, error: %#v, genericItem: %#v", cqj.Namespace, cqj.Name, err, genericItem) } allocated = allocated.Add(qjv) @@ -697,7 +700,7 @@ func (qjm *XController) getProposedPreemptions(requestingJob *arbv1.AppWrapper, } if !foundEnoughResources { - klog.V(10).Infof("[getProposedPreemptions] Not enough preemptable jobs to dispatch %s/%s.", requestingJob.Namespace, requestingJob.Name) + klog.Infof("[getProposedPreemptions] Not enough preemptable jobs to dispatch %s/%s.", requestingJob.Namespace, requestingJob.Name) } return proposedPreemptions @@ -709,7 +712,7 @@ func (qjm *XController) getDispatchedAppWrappers(restConfig *rest.Config) (map[s // Setup and break down an informer to get a list of appwrappers bofore controllerinitialization completes appWrapperClient, err := clientset.NewForConfig(restConfig) if err != nil { - klog.Errorf("[getDispatchedAppWrappers] Failure creating client for initialization informer err=%#v", err) + klog.Errorf("[getDispatchedAppWrappers] Failure creating client for initialization informer - error: %#v", err) return awrRetVal, awsRetVal } queueJobInformer := informerFactory.NewSharedInformerFactory(appWrapperClient, 0).Workload().V1beta1().AppWrappers() @@ -742,9 +745,8 @@ func (qjm *XController) getDispatchedAppWrappers(restConfig *rest.Config) (map[s cache.WaitForCacheSync(stopCh, queueJobSynced) appwrappers, err := queueJobLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("[getDispatchedAppWrappers] List of AppWrappers err=%+v", err) + klog.Errorf("[getDispatchedAppWrappers] Error listing AppWrappers error: %+v", err) return awrRetVal, awsRetVal } @@ -809,7 +811,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust if qjm.isDispatcher { // Get the job key klog.V(10).Infof("[getAggAvaiResPri] %s: Getting job key for: %s/%s.", time.Now().String(), value.Namespace, value.Name) - queueJobKey, _ := GetQueueJobKey(value) + queueJobKey, err := GetQueueJobKey(value) + if err != nil { + klog.Errorf("[getAggAvaiResPri] Error getting queue job key - error: %#v", err) + } klog.V(10).Infof("[getAggAvaiResPri] %s: Getting dispatchid for: %s.", time.Now().String(), queueJobKey) dispatchedAgentId := qjm.dispatchMap[queueJobKey] @@ -824,14 +829,17 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust err := qjm.UpdateQueueJobStatus(value) if err != nil { - klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", value.Namespace, value.Name, err) + klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s/%s, - error: %+v", value.Namespace, value.Name, err) } totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) klog.V(10).Infof("[getAggAvaiResPri] total resources consumed by Appwrapper %s/%s when lower priority compared to target are %v", value.Namespace, value.Name, totalResource) preemptable = preemptable.Add(totalResource) klog.V(6).Infof("[getAggAvaiResPri] %s/%s priority %v is lower target priority %v reclaiming total preemptable resources %v", value.Namespace, value.Name, value.Status.SystemPriority, targetpr, totalResource) - queueJobKey, _ := GetQueueJobKey(value) + queueJobKey, err := GetQueueJobKey(value) + if err != nil { + klog.Errorf("[getAggAvaiResPri] Error getting queue job key - error: %#v", err) + } addPreemptableAWs(preemptableAWs, value, queueJobKey, preemptableAWsMap) continue } else if qjm.isDispatcher { @@ -844,14 +852,18 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust } else if value.Status.CanRun { qjv := clusterstateapi.EmptyResource() for _, genericItem := range value.Spec.AggrResources.GenericItems { - res, _ := genericresource.GetResources(&genericItem) + res, err := genericresource.GetResources(&genericItem) + if err != nil { + klog.Errorf("[getAggAvaiResPri] Failure getting resources for %s/%s, error: %#v, genericItem: %#v", + value.Namespace, value.Name, err, genericItem) + } qjv.Add(res) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s/%s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Namespace, value.Name, value.Status.CanRun) } err := qjm.UpdateQueueJobStatus(value) if err != nil { - klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", value.Namespace, value.Name, err) + klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s/%s,- error: %+v", value.Namespace, value.Name, err) } totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) @@ -859,7 +871,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust delta, err := qjv.NonNegSub(totalResource) pending = pending.Add(delta) if err != nil { - klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resources %v, %v", qjv, err) + klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resources %v, - error: %v", qjv, err) pending = pending.Add(qjv) } klog.V(6).Infof("[getAggAvaiResPri] The value of pending is %v", pending) @@ -871,7 +883,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(6).Infof("[getAggAvaiResPri] Schedulable idle cluster resources: %+v, subtracting dispatched resources: %+v and adding preemptable cluster resources: %+v", r, pending, preemptable) r = r.Add(preemptable) - r, _ = r.NonNegSub(pending) + r, err = r.NonNegSub(pending) + if err != nil { + klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, - error: %v", err) + } klog.V(3).Infof("[getAggAvaiResPri] %+v available resources to schedule", r) return r, proposedPremptions @@ -910,7 +925,7 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s klog.V(2).Infof("[chooseAgent] AppWrapper %s/%s does not have enough quota\n", qj.Namespace, qj.Name) } } else { - klog.Errorf("[chooseAgent] Quota evaluation is enable but not initialize. AppWrapper %s/%s does not have enough quota\n", qj.Namespace, qj.Name) + klog.Errorf("[chooseAgent] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Namespace, qj.Name) } } else { // Quota is not enabled to return selected agent @@ -993,7 +1008,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Warningf("[ScheduleNext] app wrapper '%s/%s' not found skipping dispatch", qj.Namespace, qj.Name) return nil } - klog.Errorf("[ScheduleNext] Unable to get AW %s/%s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status, retryErr) + klog.Errorf("[ScheduleNext] Unable to get AW %s/%s from API cache &aw=%p Version=%s Status=%+v - error: %#v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status, retryErr) return retryErr } // make sure qj has the latest information @@ -1013,15 +1028,28 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { // Create newHeap to temporarily store qjqueue jobs for updating SystemPriority tempQ := newHeap(cache.MetaNamespaceKeyFunc, HigherSystemPriorityQJ) qj.Status.SystemPriority = float64(qj.Spec.Priority) + qj.Spec.PrioritySlope*(time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time)).Seconds() - tempQ.Add(qj) + err := tempQ.Add(qj) + if err != nil { + klog.Errorf("[ScheduleNext] unable to add item to the queue - error: %#v", err) + } for qjm.qjqueue.Length() > 0 { - qjtemp, _ := qjm.qjqueue.Pop() + qjtemp, err := qjm.qjqueue.Pop() + if err != nil { + klog.Errorf("[ScheduleNext] Cannot pop queuejob from queue - error: %#v", err) + } qjtemp.Status.SystemPriority = float64(qjtemp.Spec.Priority) + qjtemp.Spec.PrioritySlope*(time.Now().Sub(qjtemp.Status.ControllerFirstTimestamp.Time)).Seconds() - tempQ.Add(qjtemp) + err = tempQ.Add(qjtemp) + if err != nil { + klog.Errorf("[ScheduleNext] unable to add item to the queue - error: %#v", err) + } + } // move AppWrappers back to activeQ and sort based on SystemPriority for tempQ.data.Len() > 0 { - qjtemp, _ := tempQ.Pop() + qjtemp, err := tempQ.Pop() + if err != nil { + klog.Errorf("[ScheduleNext] Cannot pop queuejob from queue - error: %#v", err) + } qjm.qjqueue.AddIfNotPresent(qjtemp.(*arbv1.AppWrapper)) } // Print qjqueue.ativeQ for debugging @@ -1039,7 +1067,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { // Retrieve HeadOfLine after priority update qj, retryErr = qjm.qjqueue.Pop() if retryErr != nil { - klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", retryErr) + klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from queue - error: %#v", retryErr) return err } klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status) @@ -1048,7 +1076,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if apierrors.IsNotFound(retryErr) { return nil } - klog.Errorf("[ScheduleNext] failed to get a fresh copy of the app wrapper '%s/%s', err=%#v", qj.Namespace, qj.Name, retryErr) + klog.Errorf("[ScheduleNext] failed to get a fresh copy of the app wrapper '%s/%s', - error: %#v", qj.Namespace, qj.Name, retryErr) return err } if apiCacheAWJob.Status.CanRun { @@ -1069,7 +1097,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Warningf("[ScheduleNext] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v this may be due to appwrapper deletion.", qj.Namespace, qj.Name, qj.Status) return nil } else { - klog.Errorf("[ScheduleNext] Failed to updated status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, retryErr) + klog.Errorf("[ScheduleNext] Failed to updated status in etcd for app wrapper '%s/%s', status = %+v, - error: %v", qj.Namespace, qj.Name, qj.Status, retryErr) } return retryErr } @@ -1095,7 +1123,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Warningf("[ScheduleNext] app wrapper '%s/%s' not found skipping dispatch", qj.Namespace, qj.Name) return nil } - klog.Errorf("[ScheduleNext] [Dispatcher Mode] failed to retrieve the app wrapper '%s/%s', err=%#v", qj.Namespace, qj.Name, err) + klog.Errorf("[ScheduleNext] [Dispatcher Mode] failed to retrieve the app wrapper '%s/%s', - error: %#v", qj.Namespace, qj.Name, err) return err } // make sure qj has the latest information @@ -1105,7 +1133,10 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { apiCacheAWJob.DeepCopyInto(qj) } qj.Status.CanRun = true - queueJobKey, _ := GetQueueJobKey(qj) + queueJobKey, err := GetQueueJobKey(qj) + if err != nil { + klog.Errorf("[ScheduleNext] Error getting queue job key - error: %#v", err) + } qjm.dispatchMap[queueJobKey] = agentId klog.V(10).Infof("[ScheduleNext] [Dispatcher Mode] %s/%s, %s: ScheduleNextBeforeEtcd", qj.Namespace, qj.Name, time.Now().Sub(qj.CreationTimestamp.Time)) retryErr = qjm.updateStatusInEtcd(ctx, qj, "[ScheduleNext] [Dispatcher Mode] - setCanRun") @@ -1113,7 +1144,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if apierrors.IsConflict(err) { klog.Warningf("[ScheduleNext] [Dispatcher Mode] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) } else { - klog.Errorf("[ScheduleNext] [Dispatcher Mode] Failed to updated status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[ScheduleNext] [Dispatcher Mode] Failed to update status in etcd for app wrapper '%s/%s', status = %+v, - error: %v", qj.Namespace, qj.Name, qj.Status, err) } return retryErr } @@ -1191,7 +1222,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Warningf("[ScheduleNext] [Agent Mode] app wrapper '%s/%s' not found while trying to update labels, skipping dispatch.", qj.Namespace, qj.Name) return nil } - return retryErr + return fmt.Errorf("[ScheduleNext] [Agent Mode] error getting app wrapper while trying to update labels - error: %#v", retryErr) } tempAW.SetLabels(newLabels) updatedAW, retryErr := qjm.updateEtcd(ctx, tempAW, "ScheduleNext [Agent Mode] - setDefaultQuota") @@ -1199,7 +1230,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if apierrors.IsConflict(err) { klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating labels in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) } else { - klog.Errorf("[ScheduleNext] [Agent mode] Failed to update labels in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[ScheduleNext] [Agent mode] Failed to update labels in etcd for app wrapper '%s/%s', status = %+v, - error: %v", qj.Namespace, qj.Name, qj.Status, err) } return retryErr } @@ -1279,7 +1310,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { } else if apierrors.IsConflict(retryErr) { klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) } else if retryErr != nil { - klog.Errorf("[ScheduleNext] [Agent mode] Failed to update status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[ScheduleNext] [Agent mode] Failed to update status in etcd for app wrapper '%s/%s', status = %+v, - error: %v", qj.Namespace, qj.Name, qj.Status, err) } return retryErr } @@ -1324,7 +1355,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { return } if err != nil { - klog.Warningf("[ScheduleNext] failed to dispatch the app wrapper '%s/%s', err= %v", qj.Namespace, qj.Name, err) + klog.Warningf("[ScheduleNext] failed to dispatch the app wrapper '%s/%s', - error: %v", qj.Namespace, qj.Name, err) klog.Warningf("[ScheduleNext] retrying dispatch") qjm.qjqueue.AddIfNotPresent(qj) } @@ -1338,7 +1369,7 @@ func (cc *XController) updateEtcd(ctx context.Context, currentAppwrapper *arbv1. currentAppwrapper.Status.Local = false // for Informer FilterFunc to pickup updatedAppwrapper, err := cc.arbclients.WorkloadV1beta1().AppWrappers(currentAppwrapper.Namespace).Update(ctx, currentAppwrapper, metav1.UpdateOptions{}) if err != nil { - return nil, err + return nil, fmt.Errorf("[updateEtcd] failure to update app wrapper, - error: %#v", err) } if larger(currentAppwrapper.ResourceVersion, updatedAppwrapper.ResourceVersion) { klog.Warningf("[updateEtcd] current app wrapper '%s/%s' called by '%s' has version %s", currentAppwrapper.Namespace, currentAppwrapper.Name, caller, currentAppwrapper.ResourceVersion) @@ -1354,7 +1385,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper currentAppwrapper.Status.Sender = "before " + caller // set Sender string to indicate code location updatedAppwrapper, err := cc.arbclients.WorkloadV1beta1().AppWrappers(currentAppwrapper.Namespace).UpdateStatus(ctx, currentAppwrapper, metav1.UpdateOptions{}) if err != nil { - return err + return fmt.Errorf("[updateStatusInEtcd] failure to update app wrapper status, - error: %#v", err) } if larger(currentAppwrapper.ResourceVersion, updatedAppwrapper.ResourceVersion) { klog.Warningf("[updateStatusInEtcd] current app wrapper '%s/%s' called by '%s' has version %s", currentAppwrapper.Namespace, currentAppwrapper.Name, caller, currentAppwrapper.ResourceVersion) @@ -1429,7 +1460,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason err := etcUpdateRetrier.Run(func() error { apiCacheAWJob, retryErr := qjm.getAppWrapper(q.Namespace, q.Name, "[backoff] - Rejoining") if retryErr != nil { - return retryErr + return fmt.Errorf("[backoff] unable to get app wrapper - error: %#v", retryErr) + } q.Status.DeepCopyInto(&apiCacheAWJob.Status) apiCacheAWJob.Status.QueueJobState = arbv1.AppWrapperCondBackoff @@ -1440,12 +1472,13 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason if apierrors.IsConflict(retryErr) { klog.Warningf("[backoff] Conflict when upating AW status in etcd '%s/%s'. Retrying.", apiCacheAWJob.Namespace, apiCacheAWJob.Name) } - return retryErr + return fmt.Errorf("[backoff] unable to update status in etcd - error: %#v", retryErr) + } return nil }) if err != nil { - klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions. err=%s", q.Namespace, q.Name, err) + klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions, - error: %s", q.Namespace, q.Name, err) } qjm.qjqueue.AddUnschedulableIfNotPresent(q) klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Namespace, q.Name, @@ -1490,7 +1523,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { if newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion { err := qjm.UpdateQueueJobStatus(newjob) if err != nil { - klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", newjob.Namespace, newjob.Name, err) + klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s/%s, - error: %+v", newjob.Namespace, newjob.Name, err) // TODO: should we really return? return } @@ -1521,7 +1554,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion") if err != nil { // TODO: implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, - error: %+v.", newjob.Namespace, newjob.Name, newjob.Status, err) } } // Set appwrapper status to complete @@ -1547,7 +1580,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { qjm.quotaManager.Release(updateQj) } // TODO: Implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, - error: %+v.", newjob.Namespace, newjob.Name, newjob.Status, err) } if qjm.quotaManager != nil { qjm.quotaManager.Release(updateQj) @@ -1618,7 +1651,7 @@ func (cc *XController) addQueueJob(obj interface{}) { time.Sleep(requeueIntervalForCompletionStatus) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if err != nil || !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name) + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status - error: %#v", qj.Namespace, qj.Name, err) break } else { var latestAw *arbv1.AppWrapper @@ -1631,9 +1664,10 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s/%s with status %s", latestAw.Namespace, latestAw.Name, latestAw.Status.State) AwinEtcd, err := cc.arbclients.WorkloadV1beta1().AppWrappers(latestAw.Namespace).Get(context.Background(), latestAw.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.V(2).Infof("[Informer-addQJ] Stopped requeueing of AW %s due to error %v\n", latestAw.Name, err) + klog.V(2).Infof("[Informer-addQJ] Stopped requeueing of AW %s due to error: %v\n", latestAw.Name, err) break } else if AwinEtcd.Status.State == latestAw.Status.State && err != nil { + klog.V(2).Infof("[Informer-addQJ] Unable to get App Wrapper - error: %#v", err) break // Exit the loop } } @@ -1661,7 +1695,7 @@ func (cc *XController) addQueueJob(obj interface{}) { time.Sleep(requeueInterval) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if err != nil || !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name) + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling - error: %#v", qj.Namespace, qj.Name, err) break } else { var latestAw *arbv1.AppWrapper @@ -1770,7 +1804,7 @@ func (cc *XController) enqueue(obj interface{}) error { err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue if err != nil { - klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err) + klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v - error: %#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err) } else { klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) } @@ -1784,7 +1818,7 @@ func (cc *XController) enqueueIfNotPresent(obj interface{}) error { } err := cc.eventQueue.AddIfNotPresent(aw) // add to FIFO queue if not in, update object & keep position if already in FIFO queue - return err + return fmt.Errorf("[enqueueIfNotPresent] error adding item to queue - error: %#v", err) } func (cc *XController) agentEventQueueWorker() { @@ -1832,7 +1866,8 @@ func (cc *XController) updateQueueJobStatus(ctx context.Context, queueJobFromAge } return nil } - return err + return fmt.Errorf("[updateQueueJobStatus] unable to get app wrapper %#v", err) + } if len(queueJobFromAgent.Status.State) == 0 || queueJobInEtcd.Status.State == queueJobFromAgent.Status.State { return nil @@ -1841,7 +1876,7 @@ func (cc *XController) updateQueueJobStatus(ctx context.Context, queueJobFromAge queueJobInEtcd.Status.State = new_flag _, err = cc.arbclients.WorkloadV1beta1().AppWrappers(queueJobInEtcd.Namespace).Update(ctx, queueJobInEtcd, metav1.UpdateOptions{}) if err != nil { - return err + return fmt.Errorf("[updateQueueJobStatus] unable to update app wrapper - error: %#v", err) } return nil } @@ -1889,7 +1924,7 @@ func (cc *XController) worker() { klog.V(2).Infof("[worker] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) err00 := cc.Cleanup(ctx, queuejob) if err00 != nil { - klog.Errorf("[worker] Failed to delete resources for AppWrapper Job '%s/%s', err=%v", queuejob.Namespace, queuejob.Name, err00) + klog.Errorf("[worker] Failed to delete resources for AppWrapper Job '%s/%s', - error: %v", queuejob.Namespace, queuejob.Name, err00) return err00 } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) @@ -1910,7 +1945,7 @@ func (cc *XController) worker() { queuejob.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext err := cc.updateStatusInEtcdWithRetry(ctx, queuejob, "worker - setQueueing") if err != nil { - klog.Errorf("[worker] Error updating status 'setQueueing' AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) + klog.Errorf("[worker] Error updating status 'setQueueing' AppWrapper: '%s/%s',Status=%+v, - error: %+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) return err } @@ -1947,9 +1982,9 @@ func (cc *XController) worker() { return nil }) if err != nil && !CanIgnoreAPIError(err) && !IsJsonSyntaxError(err) { - klog.Warningf("[worker] Fail to process item from eventQueue, err %v. Attempting to re-enqueque...", err) + klog.Warningf("[worker] Fail to process item from eventQueue, - error: %#v. Attempting to re-enqueque...", err) if err00 := cc.enqueueIfNotPresent(item); err00 != nil { - klog.Errorf("[worker] Fatal error trying to re-enqueue item, err =%v", err00) + klog.Errorf("[worker] Fatal error trying to re-enqueue item, - error: %#v", err00) } else { klog.Warning("[worker] Item re-enqueued.") } @@ -1964,13 +1999,14 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e klog.Warningf("[syncQueueJob] AppWrapper '%s/%s' not found in cache and will be deleted", qj.Namespace, qj.Name) // clean up app wrapper resources including quota if err := cc.Cleanup(ctx, qj); err != nil { - klog.Errorf("Failed to delete resources associated with app wrapper: '%s/%s', err %v", qj.Namespace, qj.Name, err) + klog.Errorf("[syncQueueJob] Failed to delete resources associated with app wrapper: '%s/%s', - error: %#v", qj.Namespace, qj.Name, err) // return error so operation can be retried return err } cc.qjqueue.Delete(qj) return nil } + klog.Errorf("[syncQueueJob] Error while getting app wrapper - error: %#v", err) return err } klog.V(10).Infof("[syncQueueJob] Cache AW '%s/%s' &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status) @@ -1991,7 +2027,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e if qj.Status.State == arbv1.AppWrapperStateActive { err := cc.UpdateQueueJobStatus(awNew) if err != nil { - klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", qj.Namespace, qj.Name, err) + klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s/%s, - error: %+v", qj.Namespace, qj.Name, err) return err } klog.Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion, @@ -2005,7 +2041,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e awNew.Status.FilterIgnore = true // Update AppWrapperCondRunning err := cc.updateStatusInEtcdWithRetry(ctx, awNew, "[syncQueueJob] Update pod counts") if err != nil { - klog.Error("[syncQueueJob] Error updating pod status counts for app wrapper job: '%s/%s', err=%+v.", qj.Namespace, qj.Name, err) + klog.Error("[syncQueueJob] Error updating pod status counts for app wrapper job: '%s/%s', - error: %+v.", qj.Namespace, qj.Name, err) return err } return nil @@ -2040,7 +2076,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, // updates of pod phase counts done in caller of this function. err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - podPhaseChanges") if err != nil { - klog.Errorf("[manageQueueJob] Error updating status for podPhaseChanges for AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[manageQueueJob] Error updating status for podPhaseChanges for AppWrapper: '%s/%s',Status=%+v, - error: %+v.", qj.Namespace, qj.Name, qj.Status, err) return err } } @@ -2077,9 +2113,9 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, _, err00 := cc.genericresources.SyncQueueJob(qj, &ar) if err00 != nil { if apierrors.IsInvalid(err00) { - klog.Warningf("[manageQueueJob] Invalid generic item sent for dispatching by app wrapper='%s/%s' err=%v", qj.Namespace, qj.Name, err00) + klog.Warningf("[manageQueueJob] Invalid generic item sent for dispatching by app wrapper='%s/%s' - error: %v", qj.Namespace, qj.Name, err00) } else { - klog.Errorf("[manageQueueJob] Error dispatching generic item for app wrapper='%s/%s' type=%v err=%v", qj.Namespace, qj.Name, err00) + klog.Errorf("[manageQueueJob] Error dispatching generic item for app wrapper='%s/%s' type=%v - error: %v", qj.Namespace, qj.Name, err00) } dispatchFailureMessage = fmt.Sprintf("%s/%s creation failure: %+v", qj.Namespace, qj.Name, err00) dispatched = false @@ -2122,7 +2158,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, qj.Status.FilterIgnore = true // update State & QueueJobState after dispatch err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - afterEtcdDispatching") if err != nil { - klog.Errorf("[manageQueueJob] Error updating status 'afterEtcdDispatching' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[manageQueueJob] Error updating status 'afterEtcdDispatching' for AppWrapper: '%s/%s',Status=%+v, - error: %+v.", qj.Namespace, qj.Name, qj.Status, err) return err } return nil @@ -2135,7 +2171,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, // updates of pod phase counts done in caller of this function. err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - podPhaseChanges") if err != nil { - klog.Errorf("[manageQueueJob] Error updating status 'podPhaseChanges' AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[manageQueueJob] Error updating status 'podPhaseChanges' AppWrapper: '%s/%s',Status=%+v, - error: %+v.", qj.Namespace, qj.Name, qj.Status, err) return err } } @@ -2160,7 +2196,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - setQueueing") if err != nil { - klog.Errorf("[manageQueueJob] Error updating status 'setQueueing' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err) + klog.Errorf("[manageQueueJob] Error updating status 'setQueueing' for AppWrapper: '%s/%s',Status=%+v, - error: %+v.", qj.Namespace, qj.Name, qj.Status, err) return err } } @@ -2177,7 +2213,10 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, klog.V(10).Infof("[manageQueueJob] [Dispatcher] '%s/%s', %s: WorkerBeforeDispatch", qj.Namespace, qj.Name, time.Now().Sub(qj.CreationTimestamp.Time)) } - queuejobKey, _ := GetQueueJobKey(qj) + queuejobKey, err := GetQueueJobKey(qj) + if err != nil { + klog.Errorf("[manageQueueJob] Error getting queue job key - error: %#v", err) + } if agentId, ok := cc.dispatchMap[queuejobKey]; ok { klog.V(10).Infof("[manageQueueJob] [Dispatcher] Dispatched AppWrapper %s/%s to Agent ID: %s.", qj.Namespace, qj.Name, agentId) cc.agentMap[agentId].CreateJob(ctx, qj) @@ -2190,9 +2229,9 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, klog.V(10).Infof("[manageQueueJob] [Dispatcher] XQJ %s/%s has Overhead After Dispatching: %s", qj.Namespace, qj.Name, current_time.Sub(qj.CreationTimestamp.Time)) klog.V(10).Infof("[manageQueueJob] [Dispatcher] %s/%s, %s: WorkerAfterDispatch", qj.Namespace, qj.Name, time.Now().Sub(qj.CreationTimestamp.Time)) } - err := cc.updateStatusInEtcdWithRetry(ctx, qj, "[manageQueueJob] [Dispatcher] -- set dispatched true") + err = cc.updateStatusInEtcdWithRetry(ctx, qj, "[manageQueueJob] [Dispatcher] -- set dispatched true") if err != nil { - klog.Errorf("Failed to update status of AppWrapper %s/%s: err=%v", qj.Namespace, qj.Name, err) + klog.Errorf("Failed to update status of AppWrapper %s/%s: - error: %v", qj.Namespace, qj.Name, err) return err } } @@ -2209,7 +2248,7 @@ func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper for _, ar := range appwrapper.Spec.AggrResources.GenericItems { genericResourceName, gvk, err00 := cc.genericresources.Cleanup(appwrapper, &ar) if err00 != nil && !CanIgnoreAPIError(err00) && !IsJsonSyntaxError(err00) { - klog.Errorf("[Cleanup] Error deleting generic item %s, from app wrapper='%s/%s' err=%v.", + klog.Errorf("[Cleanup] Error deleting generic item %s, from app wrapper='%s/%s' - error: %v.", genericResourceName, appwrapper.Namespace, appwrapper.Name, err00) err = multierror.Append(err, err00) continue @@ -2225,7 +2264,10 @@ func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper } } else { if appwrapper.Status.IsDispatched { - queuejobKey, _ := GetQueueJobKey(appwrapper) + queuejobKey, err := GetQueueJobKey(appwrapper) + if err != nil { + klog.Errorf("[Cleanup] Error getting queue job key - error: %#v", err) + } if obj, ok := cc.dispatchMap[queuejobKey]; ok { cc.agentMap[obj].DeleteJob(ctx, appwrapper) } @@ -2251,8 +2293,9 @@ func (cc *XController) getAppWrapper(namespace string, name string, caller strin apiCacheAWJob, err := cc.appWrapperLister.AppWrappers(namespace).Get(name) if err != nil { if !apierrors.IsNotFound(err) { - klog.Errorf("[getAppWrapper] getting a copy of '%s/%s' failed, when called by '%s', err=%v", namespace, name, caller, err) + klog.Errorf("[getAppWrapper] getting a copy of '%s/%s' failed, when called by '%s', - error: %v", namespace, name, caller, err) } + klog.Errorf("[getAppWrapper] unable to get app wrapper - error: %#v", err) return nil, err } klog.V(5).Infof("[getAppWrapper] get a copy of '%s/%s' suceeded when called by '%s'", namespace, name, caller) diff --git a/pkg/controller/queuejob/scheduling_queue.go b/pkg/controller/queuejob/scheduling_queue.go index 92b226bc0..827c4db61 100644 --- a/pkg/controller/queuejob/scheduling_queue.go +++ b/pkg/controller/queuejob/scheduling_queue.go @@ -129,7 +129,10 @@ func (p *PriorityQueue) Length() int { func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool { p.lock.Lock() defer p.lock.Unlock() - _, exists, _ := p.activeQ.Get(qj) + _, exists, err := p.activeQ.Get(qj) + if err != nil { + klog.Errorf("[IfExist] unable to check if app wrapper exists, - error:%#v", err) + } if p.unschedulableQ.Get(qj) != nil || exists { return true } @@ -140,7 +143,10 @@ func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool { func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool { p.lock.Lock() defer p.lock.Unlock() - _, exists, _ := p.activeQ.Get(qj) + _, exists, err := p.activeQ.Get(qj) + if err != nil { + klog.Errorf("[IfExistActiveQ] unable to check if app wrapper exists, - error:%#v", err) + } return exists } @@ -196,12 +202,15 @@ func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error { if p.unschedulableQ.Get(qj) != nil { return nil } - if _, exists, _ := p.activeQ.Get(qj); exists { + if _, exists, err := p.activeQ.Get(qj); exists { + if err != nil { + klog.Errorf("[AddIfNotPresent] unable to check if pod exists, - error:%#v", err) + } return nil } err := p.activeQ.Add(qj) if err != nil { - klog.Errorf("Error adding pod %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err) + klog.Errorf("[AddIfNotPresent] Error adding pod %s/%s to the scheduling queue, - error:%#v", qj.Namespace, qj.Name, err) } else { p.cond.Broadcast() } @@ -218,7 +227,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro if p.unschedulableQ.Get(qj) != nil { return fmt.Errorf("pod is already present in unschedulableQ") } - if _, exists, _ := p.activeQ.Get(qj); exists { + if _, exists, err := p.activeQ.Get(qj); exists { + if err != nil { + klog.Errorf("[AddUnschedulableIfNotPresent] unable to check if pod exists, - error:%#v", err) + } return fmt.Errorf("pod is already present in the activeQ") } // if !p.receivedMoveRequest && isPodUnschedulable(qj) { @@ -227,7 +239,9 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro return nil } err := p.activeQ.Add(qj) - if err == nil { + if err != nil { + klog.Errorf("[AddUnschedulableIfNotPresent] Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err) + } else { p.cond.Broadcast() } return err @@ -271,8 +285,14 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error { p.lock.Lock() defer p.lock.Unlock() // If the pod is already in the active queue, just update it there. - if _, exists, _ := p.activeQ.Get(newQJ); exists { + if _, exists, errp := p.activeQ.Get(newQJ); exists { + if errp != nil { + klog.Errorf("[Update] unable to check if pod exists, - error:%#v", errp) + } err := p.activeQ.Update(newQJ) + if err != nil { + klog.Errorf("[Update] unable to update pod, - error: %#v", err) + } return err } // If the pod is in the unschedulable queue, updating it may make it schedulable. @@ -280,7 +300,9 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error { if p.isQJUpdated(oldQJ, newQJ) { p.unschedulableQ.Delete(usQJ) err := p.activeQ.Add(newQJ) - if err == nil { + if err != nil { + klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err) + } else { p.cond.Broadcast() } return err @@ -290,7 +312,9 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error { } // If pod is not in any of the two queue, we put it in the active queue. err := p.activeQ.Add(newQJ) - if err == nil { + if err != nil { + klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err) + } else { p.cond.Broadcast() } return err @@ -303,7 +327,10 @@ func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error { p.lock.Lock() defer p.lock.Unlock() p.unschedulableQ.Delete(qj) - if _, exists, _ := p.activeQ.Get(qj); exists { + if _, exists, err := p.activeQ.Get(qj); exists { + if err != nil { + klog.Errorf("[Delete] unable to check if pod exists - error: %#v", err) + } return p.activeQ.Delete(qj) } // p.unschedulableQ.Delete(qj) diff --git a/pkg/controller/queuejobdispatch/queuejobagent.go b/pkg/controller/queuejobdispatch/queuejobagent.go index 273908f18..e568175b5 100644 --- a/pkg/controller/queuejobdispatch/queuejobagent.go +++ b/pkg/controller/queuejobdispatch/queuejobagent.go @@ -118,7 +118,10 @@ func NewJobClusterAgent(config string, agentEventQueue *cache.FIFO) *JobClusterA qa.jobSynced = qa.jobInformer.Informer().HasSynced - qa.UpdateAggrResources(context.Background()) + err = qa.UpdateAggrResources(context.Background()) + if err != nil { + klog.Errorf("[NewJobClusterAgent] Unable to update aggr resources - error: %#v", err) + } return qa } @@ -161,7 +164,10 @@ func (qa *JobClusterAgent) Run(stopCh <-chan struct{}) { func (qa *JobClusterAgent) DeleteJob(ctx context.Context, cqj *arbv1.AppWrapper) { qj_temp := cqj.DeepCopy() klog.V(2).Infof("[Dispatcher: Agent] Request deletion of XQJ %s/%s to Agent %s\n", qj_temp.Namespace, qj_temp.Name, qa.AgentId) - qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{}) + err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("[DeleteJob] Unable to delete app wrapper, - error: %#v", err) + } } func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper) { @@ -183,7 +189,10 @@ func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper) agent_qj.Labels["IsDispatched"] = "true" klog.V(2).Infof("[Dispatcher: Agent] Create XQJ: %s/%s (Status: %+v) in Agent %s\n", agent_qj.Namespace, agent_qj.Name, agent_qj.Status, qa.AgentId) - qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{}) + _, err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("[CreateJob] Unable to create app wrapper, - error: %#v", err) + } } type ClusterMetricsList struct { @@ -228,7 +237,10 @@ func (qa *JobClusterAgent) UpdateAggrResources(ctx context.Context) error { clusterMetricType := res.Items[i].MetricLabels["cluster"] if strings.Compare(clusterMetricType, "cpu") == 0 || strings.Compare(clusterMetricType, "memory") == 0 { - val, units, _ := getFloatString(res.Items[i].Value) + val, units, err := getFloatString(res.Items[i].Value) + if err != nil { + klog.Errorf("[Dispatcher: UpdateAggrResources] Possible issue getting float string - error: %#v", err) + } num, err := strconv.ParseFloat(val, 64) if err != nil { klog.Warningf("[Dispatcher: UpdateAggrResources] Possible issue converting %s string value of %s due to error: %v\n", diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go index a1057f763..2da8cd107 100644 --- a/pkg/controller/queuejobresources/genericresource/genericresource.go +++ b/pkg/controller/queuejobresources/genericresource/genericresource.go @@ -169,6 +169,7 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName()) inEtcd, err := dclient.Resource(rsrc).Namespace(aw.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { + klog.Errorf("[Cleanup] unable to list resources, - error: %#v", err) return name, gvk, err } @@ -182,8 +183,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG err = deleteObject(namespaced, namespace, newName, rsrc, dclient) if err != nil { if !errors.IsNotFound(err) { - klog.Errorf("[Cleanup] Error deleting the object `%v`, the error is `%v`.", newName, errors.ReasonForError(err)) + klog.Errorf("[Cleanup] Error deleting the object `%v`, - error: `%v`.", newName, errors.ReasonForError(err)) } + klog.Errorf("[Cleanup] Error deleting the object `%v`, - error: `%v`.", newName, err) return name, gvk, err } } else { @@ -311,6 +313,7 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName()) inEtcd, err := dclient.Resource(rsrc).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { + klog.Errorf("[SyncQueueJob] Unable to list resources, - error: %#v", err) return []*v1.Pod{}, err } @@ -364,24 +367,36 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra // checks if object has pod template spec and add new labels func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) { - spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + spec, isFound, err := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + if err != nil { + klog.Errorf("[addLabelsToPodTemplateField] unable to return spec values, - error: %#v", err) + } if !isFound { klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec' field not found.") return false } - template, isFound, _ := unstructured.NestedMap(spec, "template") + template, isFound, err := unstructured.NestedMap(spec, "template") + if err != nil { + klog.Errorf("[addLabelsToPodTemplateField] unable to return template values, - error: %#v", err) + } if !isFound { klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template' field not found.") return false } - marshal, _ := json.Marshal(template) + marshal, err := json.Marshal(template) + if err != nil { + klog.Errorf("[addLabelsToPodTemplateField] unable to marshall json, - error: %#v", err) + } unmarshal := v1.PodTemplateSpec{} if err := json.Unmarshal(marshal, &unmarshal); err != nil { klog.Warning(err) return false } - existingLabels, isFound, _ := unstructured.NestedStringMap(template, "metadata", "labels") + existingLabels, isFound, err := unstructured.NestedStringMap(template, "metadata", "labels") + if err != nil { + klog.Errorf("[addLabelsToPodTemplateField] unable to return existing label values, - error: %#v", err) + } if !isFound { klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template.metadata.labels' field not found.") return false @@ -397,7 +412,7 @@ func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map } if err := unstructured.SetNestedStringMap(unstruct.Object, m, "spec", "template", "metadata", "labels"); err != nil { - klog.Warning(err) + klog.Errorf("[addLabelsToPodTemplateField] unable to set nested string map, - error: %#v", err) return false } @@ -414,37 +429,58 @@ func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, conta return false, 0, nil } unstruct.Object = blob.(map[string]interface{}) - spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + spec, isFound, err := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + if err != nil { + klog.Errorf("[hasFields] unable to return spec values, - error: %#v", err) + } if !isFound { klog.Warningf("[hasFields] No spec field found in raw object: %#v", unstruct.UnstructuredContent()) } - replicas, isFound, _ := unstructured.NestedFloat64(spec, "replicas") + replicas, isFound, err := unstructured.NestedFloat64(spec, "replicas") + if err != nil { + klog.Errorf("[hasFields] unable to return replica values, - error: %#v", err) + } // Set default to 1 if no replicas field is found (handles the case of a single pod creation without replicaset. if !isFound { replicas = 1 } - template, isFound, _ := unstructured.NestedMap(spec, "template") + template, isFound, err := unstructured.NestedMap(spec, "template") + if err != nil { + klog.Errorf("[hasFields] unable to return template values, - error: %#v", err) + } // If spec does not contain a podtemplate, check for pod singletons var subspec map[string]interface{} if !isFound { subspec = spec klog.V(6).Infof("[hasFields] No template field found in raw object: %#v", spec) } else { - subspec, isFound, _ = unstructured.NestedMap(template, "spec") + subspec, isFound, err = unstructured.NestedMap(template, "spec") + if err != nil { + klog.Errorf("[hasFields] unable to return subspec values, - error: %#v", err) + } } - containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers") + containerList, isFound, err := unstructured.NestedSlice(subspec, "containers") + if err != nil { + klog.Errorf("[hasFields] unable to return container list values, - error: %#v", err) + } if !isFound { klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec) return false, 0, nil } objContainers := make([]v1.Container, len(containerList)) for _, container := range containerList { - marshal, _ := json.Marshal(container) + marshal, err := json.Marshal(container) + if err != nil { + klog.Errorf("[hasFields] unable to marshal json, - error: %#v", err) + } unmarshal := v1.Container{} - _ = json.Unmarshal(marshal, &unmarshal) + err = json.Unmarshal(marshal, &unmarshal) + if err != nil { + klog.Errorf("[hasFields] unable to unmarshal json, - error: %#v", err) + } objContainers = append(objContainers, unmarshal) } return isFound, replicas, objContainers @@ -627,13 +663,13 @@ func (gr *GenericResources) IsItemCompleted(awgr *arbv1.AppWrapperGenericResourc dd := gr.clients.Discovery() apigroups, err := restmapper.GetAPIGroupResources(dd) if err != nil { - klog.Errorf("[IsItemCompleted] Error getting API resources, err=%#v", err) + klog.Errorf("[IsItemCompleted] Error getting API resources, - error: %#v", err) return false } restmapper := restmapper.NewDiscoveryRESTMapper(apigroups) _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(awgr.GenericTemplate.Raw, nil, nil) if err != nil { - klog.Errorf("[IsItemCompleted] Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err) + klog.Errorf("[IsItemCompleted] Decoding error, please check your CR! Aborting handling the resource creation, - error: `%v`", err) return false } diff --git a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr/quota_subtree_manager.go b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr/quota_subtree_manager.go index 0c0173192..aef75c92b 100644 --- a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr/quota_subtree_manager.go +++ b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr/quota_subtree_manager.go @@ -66,6 +66,7 @@ func newQuotaSubtreeManager(config *rest.Config, quotaManagerBackend *qmlib.Mana // QuotaSubtree informer setup qstClient, err := qst.NewForConfig(config) if err != nil { + klog.Errorf("[newQuotaSubtreeManager] error creating clientset for config, - error: %#v", err) return nil, err } diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/consumerinfo.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/consumerinfo.go index b5bfe08d7..42e151efe 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/consumerinfo.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/consumerinfo.go @@ -23,6 +23,7 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/core" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/utils" + "k8s.io/klog/v2" ) // ConsumerInfo : A consumer model including specifications @@ -51,10 +52,12 @@ func NewConsumerInfo(consumerStruct utils.JConsumer) (*ConsumerInfo, error) { func NewConsumerInfoFromFile(consumerFileName string) (*ConsumerInfo, error) { byteValue, err := os.ReadFile(consumerFileName) if err != nil { + klog.Errorf("[NewConsumerInfoFromFile] unable to read file, - error: %#v", err) return nil, err } var consumerStruct utils.JConsumer if err := json.Unmarshal(byteValue, &consumerStruct); err != nil { + klog.Errorf("[NewConsumerInfoFromFile] unable to unmarshal json, - error: %#v", err) return nil, err } return NewConsumerInfo(consumerStruct) @@ -65,6 +68,7 @@ func NewConsumerInfoFromString(consumerString string) (*ConsumerInfo, error) { byteValue := []byte(consumerString) var consumerStruct utils.JConsumer if err := json.Unmarshal(byteValue, &consumerStruct); err != nil { + klog.Errorf("[NewConsumerInfoFromString] unable to unmarshal json, - error: %#v", err) return nil, err } return NewConsumerInfo(consumerStruct) diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/allocation.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/allocation.go index f39951588..ab49ab2b2 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/allocation.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/allocation.go @@ -19,6 +19,7 @@ package core import ( "bytes" "fmt" + "k8s.io/klog/v2" ) // Allocation : an allocation of an (ordered) array of resources @@ -42,6 +43,7 @@ func NewAllocation(size int) (*Allocation, error) { func NewAllocationCopy(value []int) (*Allocation, error) { a, err := NewAllocation(len(value)) if err != nil { + klog.Errorf("[NewAllocationCopy] unable to create allocation, - error: %#v", err) return nil, err } copy(a.x, value) @@ -66,7 +68,10 @@ func (a *Allocation) SetValue(value []int) { // Clone : create a copy func (a *Allocation) Clone() *Allocation { - alloc, _ := NewAllocationCopy(a.x) + alloc, err := NewAllocationCopy(a.x) + if err != nil { + klog.Errorf("[Clone] unable to create allocation, - error: %#v", err) + } return alloc } diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestconsumer.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestconsumer.go index 288112f72..b932e6d47 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestconsumer.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestconsumer.go @@ -44,7 +44,7 @@ func NewForestConsumer(id string, consumers map[string]*Consumer) *ForestConsume func NewForestConsumerFromFile(consumerFileName string, resourceNames map[string][]string) (*ForestConsumer, error) { byteValue, err := os.ReadFile(consumerFileName) if err != nil { - return nil, err + return nil, fmt.Errorf("[NewForestConsumerFromFile] unable to read file, - error: %#v", err) } return NewForestConsumerFromString(string(byteValue), resourceNames) } @@ -54,7 +54,7 @@ func NewForestConsumerFromString(consumerString string, resourceNames map[string byteValue := []byte(consumerString) var jConsumerMulti utils.JConsumer if err := json.Unmarshal(byteValue, &jConsumerMulti); err != nil { - return nil, err + return nil, fmt.Errorf("[NewForestConsumerFromString] unable to unmarshal json, - error: %#v", err) } return NewForestConsumerFromStruct(jConsumerMulti, resourceNames) } diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go index c01e787e5..9d9adcbdf 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go @@ -45,7 +45,11 @@ func NewQuotaNode(id string, quota *Allocation) (*QuotaNode, error) { if len(id) == 0 || quota == nil { return nil, fmt.Errorf("invalid parameters") } - alloc, _ := NewAllocation(quota.GetSize()) + alloc, err := NewAllocation(quota.GetSize()) + if err != nil { + klog.Errorf("[NewQuotaNode] unable to create allocation, - error: %#v", err) + return nil, err + } qn := &QuotaNode{ Node: *tree.NewNode(id), quota: quota, @@ -60,6 +64,7 @@ func NewQuotaNode(id string, quota *Allocation) (*QuotaNode, error) { func NewQuotaNodeHard(id string, quota *Allocation, isHard bool) (*QuotaNode, error) { qn, err := NewQuotaNode(id, quota) if err != nil { + klog.Errorf("[NewQuotaNodeHard] unable to create quota node, - error:%#v", err) return nil, err } qn.isHard = isHard diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecache.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecache.go index 3a3479525..62f3d7389 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecache.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecache.go @@ -121,7 +121,7 @@ func (tc *TreeCache) AddTreeInfoFromString(treeInfo string) error { var jTreeInfo utils.JTreeInfo err := json.Unmarshal([]byte(treeInfo), &jTreeInfo) if err != nil { - return err + return fmt.Errorf("[AddTreeInfoFromString] unable to unmarshal json, - error: %#v", err) } tc.SetTreeName(jTreeInfo.Name) tc.AddResourceNames(jTreeInfo.ResourceNames) @@ -164,7 +164,7 @@ func (tc *TreeCache) AddNodeSpecsFromString(nodesInfo string) error { jNodes := make(map[string]utils.JNodeSpec) err := json.Unmarshal([]byte(nodesInfo), &jNodes) if err != nil { - return err + return fmt.Errorf("[AddNodeSpecsFromString] unable to unmarshal json, - error: %#v", err) } return tc.AddNodeSpecs(jNodes) } @@ -367,6 +367,7 @@ func (tc *TreeCache) CreateTree() (*QuotaTree, *TreeCacheCreateResponse) { hard, err := strconv.ParseBool(nodeSpec.Hard) if err != nil { + klog.Errorf("[CreateTree] unable to parse bool, - error: %#v", err) hard = false } fmt.Fprintf(&b, "hard="+strconv.FormatBool(hard)+"; ") diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go index 78d78c709..25c5a1b39 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go @@ -135,6 +135,7 @@ func (m *Manager) AddTreeByName(treeName string) (string, error) { } agent, err := newAgentByName(treeName) if err != nil { + klog.Errorf("[AddTreeByName] unable to create new treeAgent, - error: %#v", err) return treeName, err } m.agents[treeName] = agent @@ -147,6 +148,7 @@ func (m *Manager) AddTreeFromString(treeSring string) (string, error) { defer m.mutex.Unlock() treeCache := core.NewTreeCache() if err := treeCache.FromString(treeSring); err != nil { + klog.Errorf("[AddTreeFromString] unable to fill cache from string, - error: %#v", err) return "", err } return m.addTree(treeCache) @@ -158,6 +160,7 @@ func (m *Manager) AddTreeFromStruct(jQuotaTree utils.JQuotaTree) (string, error) defer m.mutex.Unlock() treeCache := core.NewTreeCache() if err := treeCache.FromStruct(jQuotaTree); err != nil { + klog.Errorf("[AddTreeFromStruct] unable to fill cache from struct, - error: %#v", err) return "", err } return m.addTree(treeCache) @@ -171,6 +174,7 @@ func (m *Manager) addTree(treeCache *core.TreeCache) (string, error) { } agent, err := newAgentFromCache(treeCache) if err != nil { + klog.Errorf("[addTree] unable to create agent from cache, - error: %#v", err) return treeName, err } m.agents[treeName] = agent @@ -274,6 +278,7 @@ func (m *Manager) Allocate(treeName string, consumerID string) (response *core.A err = fmt.Errorf("consumer %s already allocated on tree %s", consumerID, treeName) } if err != nil { + klog.Errorf("[Allocate] unable to prepare to allocate a consumer on a tree, - error: %#v", err) return nil, err } if m.mode == Normal { @@ -300,6 +305,7 @@ func (m *Manager) TryAllocate(treeName string, consumerID string) (response *cor err = fmt.Errorf("consumer %s already allocated on tree %s", consumerID, treeName) } if err != nil { + klog.Errorf("[TryAllocate] unable to prepare to allocate a consumer on a tree, - error: %#v", err) return nil, err } if response = agent.controller.TryAllocate(consumer); !response.IsAllocated() { @@ -315,6 +321,7 @@ func (m *Manager) UndoAllocate(treeName string, consumerID string) (err error) { agent, consumer, err := m.preAllocate(treeName, consumerID) if err != nil { + klog.Errorf("[UndoAllocate] unable to prepare to allocate a consumer on a tree, - error: %#v", err) return err } if !agent.controller.UndoAllocate(consumer) { @@ -474,6 +481,7 @@ func (m *Manager) AllocateForest(forestName string, consumerID string) (response err = fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) } if err != nil { + klog.Errorf("[AllocateForest] unable to prepare to allocate forest, - error: %#v", err) return nil, err } @@ -507,6 +515,7 @@ func (m *Manager) TryAllocateForest(forestName string, consumerID string) (respo err = fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) } if err != nil { + klog.Errorf("[TryAllocateForest] unable to prepare to allocate forest, - error: %#v", err) return nil, err } @@ -524,6 +533,7 @@ func (m *Manager) UndoAllocateForest(forestName string, consumerID string) (err forestController, forestConsumer, err := m.preAllocateForest(forestName, consumerID) if err != nil { + klog.Errorf("[UndoAllocateForest] unable to prepare to allocate forest, - error: %#v", err) return err } if !forestController.UndoAllocate(forestConsumer) {