diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go new file mode 100644 index 000000000..d04da8a3e --- /dev/null +++ b/cmd/kar-controllers/app/generic-server.go @@ -0,0 +1,107 @@ +package app + +import ( + "context" + "fmt" + "net" + "net/http" + "strconv" + "time" + + logger "k8s.io/klog/v2" +) + +type ServerOption func(*Server) + +// WithTimeout sets the shutdown timeout for the server. +func WithTimeout(timeout time.Duration) ServerOption { + return func(s *Server) { + s.shutdownTimeout = timeout + } +} + +type Server struct { + httpServer http.Server + listener net.Listener + endpoint string + shutdownTimeout time.Duration +} + +func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) { + addr := "0" + if port != 0 { + addr = ":" + strconv.Itoa(port) + } + + listener, err := newListener(addr) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(endpoint, handler) + + s := &Server{ + endpoint: endpoint, + listener: listener, + httpServer: http.Server{Handler: mux}, + shutdownTimeout: 30 * time.Second, // Default value + } + + for _, opt := range options { + opt(s) + } + + return s, nil +} + +func (s *Server) Start() (err error) { + if s.listener == nil { + logger.Infof("Serving endpoint %s is disabled", s.endpoint) + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r) + } + }() + + logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr()) + if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed { + return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e) + } + return +} + +func (s *Server) Shutdown() error { + if s.listener == nil { + return nil + } + + logger.Info("Stopping server") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Try graceful shutdown + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to shutdown server gracefully: %v", err) + } + return s.httpServer.Shutdown(shutdownCtx) +} + +// newListener creates a new TCP listener bound to the given address. +func newListener(addr string) (net.Listener, error) { + // Add a case to disable serving altogether + if addr == "0" { + return nil, nil + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to create listener: %v", err) + } + + return listener, nil +} diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 0cd339e17..7e8aa7971 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -53,8 +53,9 @@ type ServerOption struct { HeadOfLineHoldingTime int QuotaEnabled bool // Controller is to evaluate quota per request QuotaRestURL string - HealthProbeListenAddr string + HealthProbeListenPort int DispatchResourceReservationTimeout int64 + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. @@ -80,7 +81,9 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.") fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") - fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") + // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` + fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 8083, "Listen port for metrics. Defaults to ':8083'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index dc72179dd..784058df9 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -31,13 +31,18 @@ limitations under the License. package app import ( + "context" + "fmt" + "net/http" + + "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "net/http" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) @@ -49,41 +54,60 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return rest.InClusterConfig() } -func Run(opt *options.ServerOption) error { +func Run(ctx context.Context, opt *options.ServerOption) error { config, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { return err } - neverStop := make(chan struct{}) - config.QPS = 100.0 config.Burst = 200.0 jobctrl := queuejob.NewJobController(config, opt) if jobctrl == nil { - return nil + return fmt.Errorf("failed to create a job controller") } - jobctrl.Run(neverStop) - // This call is blocking (unless an error occurs) which equates to <-neverStop - err = listenHealthProbe(opt) + go jobctrl.Run(ctx.Done()) + + err = startHealthAndMetricsServers(ctx, opt) if err != nil { return err } + <-ctx.Done() return nil } +func healthHandler() http.Handler { + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + return healthHandler +} + // Starts the health probe listener -func listenHealthProbe(opt *options.ServerOption) error { - handler := http.NewServeMux() - handler.Handle("/healthz", &health.Handler{}) - err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) +func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { + g, ctx := errgroup.WithContext(ctx) + + // metrics server + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.Handler()) if err != nil { return err } + healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) + if err != nil { + return err + } + + g.Go(metricsServer.Start) + g.Go(healthServer.Start) + + go func() { + <-ctx.Done() + metricsServer.Shutdown() + healthServer.Shutdown() + }() + return nil } - diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index d66c4607e..09bdca13d 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -39,6 +39,8 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" + + "k8s.io/apiserver/pkg/server" ) func main() { @@ -49,8 +51,15 @@ func main() { s.AddFlags(flagSet) flag.Parse() - if err := app.Run(s); err != nil { + ctx := server.SetupSignalContext() + + // Run the server + if err := app.Run(ctx, s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } + + <-ctx.Done() + fmt.Println("Shutting down gracefully") + } diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index cfaa0a887..2adea3096 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -11,9 +11,25 @@ spec: - name: http port: 80 targetPort: 8080 + - name: metrics + port: 8083 + targetPort: 8083 selector: app: custom-metrics-apiserver --- +apiVersion: v1 +kind: Service +metadata: + name: metrics + namespace: kube-system +spec: + ports: + - name: metrics + port: 8083 + targetPort: 8083 + selector: + app: metrics +--- #{{ if .Values.configMap.quotaRestUrl }} apiVersion: v1 kind: Service @@ -352,6 +368,8 @@ spec: name: https - containerPort: 8080 name: http + - containerPort: 8083 + name: metrics volumeMounts: - mountPath: /tmp name: temp-vol diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 2146a4ac6..5eeb57f59 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -35,12 +35,13 @@ import ( "github.com/hashicorp/go-multierror" qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" dto "github.com/prometheus/client_model/go" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -124,6 +125,9 @@ type XController struct { // Metrics API Server metricsAdapter *adapter.MetricsAdapter + // Cluster Metrics Manager + clusterMetricsManager *metrics.ClusterMetricsManager + // EventQueueforAgent agentEventQueue *cache.FIFO @@ -171,6 +175,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * } cc.metricsAdapter = adapter.New(serverOption, config, cc.cache) + cc.clusterMetricsManager = metrics.NewClusterMetricsManager(cc.cache) + cc.genericresources = genericresource.NewAppWrapperGenericResource(config) cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{} diff --git a/pkg/metrics/cluster_metrics.go b/pkg/metrics/cluster_metrics.go new file mode 100644 index 000000000..8adcb5fd1 --- /dev/null +++ b/pkg/metrics/cluster_metrics.go @@ -0,0 +1,78 @@ +// ------------------------------------------------------ {COPYRIGHT-TOP} --- +// Copyright 2023 The Multi-Cluster App Dispatcher Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------ {COPYRIGHT-END} --- +package metrics + +import ( + "time" + + "k8s.io/klog/v2" + + clusterstatecache "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/cache" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + unallocatedCPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_cpu", + Help: "Unalocated CPU (in Milicores)", + }) + unallocatedMemoryGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_memory", + Help: "Unalocated Memory (in TBD)", + }) + unallocatedGPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_gpu", + Help: "Unalocated GPU (in TBD)", + }) +) + +// register the cluster metrics +func registerClusterMetrics() { + + globalPromRegistry.MustRegister(unallocatedCPUGauge) + globalPromRegistry.MustRegister(unallocatedMemoryGauge) + globalPromRegistry.MustRegister(unallocatedGPUGauge) +} + +type ClusterMetricsManager struct { + Message string +} + +func NewClusterMetricsManager(clusterStateCache clusterstatecache.Cache) *ClusterMetricsManager { + clusterMetricsManager := &ClusterMetricsManager{} + + // register cluster metrics + registerClusterMetrics() + + // update cluster metrics + go foreverUpdateClusterMetrics(clusterStateCache) + + return clusterMetricsManager +} + +// forever thread that updates the cluster metrics +func foreverUpdateClusterMetrics(clusterStateCache clusterstatecache.Cache) { + + for { + resources := clusterStateCache.GetUnallocatedResources() + klog.V(9).Infof("[GetExternalMetric] Cache resources: %f", resources) + + unallocatedCPUGauge.Set(float64(resources.MilliCPU)) + unallocatedMemoryGauge.Set(float64(resources.GPU)) + unallocatedGPUGauge.Set(float64(resources.GPU)) + time.Sleep(time.Second) + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..24b158149 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,42 @@ +// ------------------------------------------------------ {COPYRIGHT-TOP} --- +// Copyright 2023 The Multi-Cluster App Dispatcher Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------ {COPYRIGHT-END} --- +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + +// MetricsHandler returns a http.Handler that serves the prometheus metrics +func Handler() http.Handler { + + // register standrad metrics + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + } + + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) +}