diff --git a/go/cmd/doorman/doorman_server.go b/go/cmd/doorman/doorman_server.go index 3da939f..ff34533 100644 --- a/go/cmd/doorman/doorman_server.go +++ b/go/cmd/doorman/doorman_server.go @@ -155,7 +155,7 @@ func main() { masterElection = election.Trivial() } - dm, err := doorman.NewIntermediate(context.Background(), getServerID(*port), *parent, masterElection, + dm, err := doorman.New(context.Background(), getServerID(*port), *parent, masterElection, connection.MinimumRefreshInterval(*minimumRefreshInterval), connection.DialOpts( rpc.WithTimeout(*rpcDialTimeout))) @@ -227,10 +227,6 @@ func main() { http.Handle("/metrics", prometheus.Handler()) - if err := prometheus.Register(doorman.NewCollector(dm)); err != nil { - log.Exitf("prometheus.Register: %v", err) - } - go http.ListenAndServe(fmt.Sprintf(":%v", *debugPort), nil) // Waits for the server to get its initial configuration. This guarantees that diff --git a/go/server/doorman/collector.go b/go/server/doorman/collector.go deleted file mode 100644 index 87d8660..0000000 --- a/go/server/doorman/collector.go +++ /dev/null @@ -1,71 +0,0 @@ -package doorman - -import ( - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -type collector struct { - server *Server - mu sync.Mutex - has *prometheus.GaugeVec - wants *prometheus.GaugeVec - count *prometheus.GaugeVec -} - -// NewCollector returns a custom Prometheus collector that creates -// metrics for how much capacity has been assigned -// (doorman_server_sum_has), requested (doorman_server_sum_wants), and -// the total number of clients (doorman_server_client_count), with the -// resource id as the label. It has to be registered using -// prometheus.Register. -func NewCollector(server *Server) prometheus.Collector { - labels := []string{"resource"} - return &collector{ - server: server, - has: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "doorman", - Subsystem: "server", - Name: "sum_has", - Help: "All capacity assigned to clients for a resource.", - }, labels), - wants: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "doorman", - Subsystem: "server", - Name: "sum_wants", - Help: "All capacity requested by clients for a resource.", - }, labels), - count: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "doorman", - Subsystem: "server", - Name: "client_count", - Help: "Number of clients requesting this resource.", - }, labels), - } -} - -func (c *collector) Describe(ch chan<- *prometheus.Desc) { - c.has.Describe(ch) - c.wants.Describe(ch) - c.count.Describe(ch) -} - -func (c *collector) Collect(ch chan<- prometheus.Metric) { - status := c.server.Status() - c.mu.Lock() - defer c.mu.Unlock() - - for id, res := range status.Resources { - c.has.WithLabelValues(id).Set(res.SumHas) - c.wants.WithLabelValues(id).Set(res.SumWants) - c.count.WithLabelValues(id).Set(float64(res.Count)) - } - c.has.Collect(ch) - c.wants.Collect(ch) - c.count.Collect(ch) - - c.has.Reset() - c.wants.Reset() - c.count.Reset() -} diff --git a/go/server/doorman/collector_test.go b/go/server/doorman/collector_test.go deleted file mode 100644 index 050ce09..0000000 --- a/go/server/doorman/collector_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package doorman - -import ( - "testing" - - "github.com/prometheus/client_golang/prometheus" - - pb "github.com/youtube/doorman/proto/doorman" -) - -func TestCollector(t *testing.T) { - s, err := MakeTestServer(makeResourceTemplate("*", pb.Algorithm_FAIR_SHARE)) - if err != nil { - t.Fatal(err) - } - c := NewCollector(s) - prometheus.EnableCollectChecks(true) - if err := prometheus.Register(c); err != nil { - t.Fatal(err) - } -} diff --git a/go/server/doorman/server.go b/go/server/doorman/server.go index f1f9a3b..08c17b7 100644 --- a/go/server/doorman/server.go +++ b/go/server/doorman/server.go @@ -148,6 +148,14 @@ type Server struct { // quit is used to notify that the server is to be closed. quit chan bool + + // descs are metrics descriptions for use when the server's state + // is collected by Prometheus. + descs struct { + has *prometheus.Desc + wants *prometheus.Desc + subclients *prometheus.Desc + } } type updater func(server *Server, retryNumber int) (time.Duration, int) @@ -473,6 +481,36 @@ func (server *Server) triggerElection(ctx context.Context) error { return nil } +// New returns a new unconfigured server. parentAddr is the address of +// a parent, pass the empty string to create a root server. This +// function should be called only once, as it registers metrics. +func New(ctx context.Context, id string, parentAddr string, leader election.Election, opts ...connection.Option) (*Server, error) { + s, err := NewIntermediate(ctx, id, parentAddr, leader, opts...) + if err != nil { + return nil, err + } + + return s, prometheus.Register(s) +} + +// Describe implements prometheus.Collector. +func (server *Server) Describe(ch chan<- *prometheus.Desc) { + ch <- server.descs.has + ch <- server.descs.wants + ch <- server.descs.subclients +} + +// Collect implements prometheus.Collector. +func (server *Server) Collect(ch chan<- prometheus.Metric) { + status := server.Status() + + for id, res := range status.Resources { + ch <- prometheus.MustNewConstMetric(server.descs.has, prometheus.GaugeValue, res.SumHas, id) + ch <- prometheus.MustNewConstMetric(server.descs.wants, prometheus.GaugeValue, res.SumWants, id) + ch <- prometheus.MustNewConstMetric(server.descs.subclients, prometheus.GaugeValue, float64(res.Count), id) + } +} + // NewIntermediate creates a server connected to the lower level server. func NewIntermediate(ctx context.Context, id string, addr string, leader election.Election, opts ...connection.Option) (*Server, error) { var ( @@ -506,6 +544,28 @@ func NewIntermediate(ctx context.Context, id string, addr string, leader electio quit: make(chan bool), } + const ( + namespace = "doorman" + subsystem = "server" + ) + + labelNames := []string{"resource"} + server.descs.has = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "has"), + "All capacity assigned to clients for a resource.", + labelNames, nil, + ) + server.descs.wants = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "wants"), + "All capacity requested by clients for a resource.", + labelNames, nil, + ) + server.descs.subclients = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "subclients"), + "Number of clients requesting this resource.", + labelNames, nil, + ) + // For an intermediate server load the default config for "*" // resource. As for root server, this config will be loaded // from some external source.. @@ -524,11 +584,6 @@ func NewIntermediate(ctx context.Context, id string, addr string, leader electio return server, nil } -// New returns a new unconfigured server. -func New(ctx context.Context, id string, leader election.Election, opts ...connection.Option) (*Server, error) { - return NewIntermediate(ctx, id, "", leader, opts...) -} - // run is the server's main loop. It takes care of requesting new resources, // and managing ones already claimed. This is the only method that should be // performing RPC.